千家信息网

spark 与flume 1.6.0的示例代码

发表于:2025-01-17 作者:千家信息网编辑
千家信息网最后更新 2025年01月17日,小编给大家分享一下spark 与flume 1.6.0的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!package hgs.spark.streamingimport or
千家信息网最后更新 2025年01月17日spark 与flume 1.6.0的示例代码

小编给大家分享一下spark 与flume 1.6.0的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

package hgs.spark.streamingimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.storage.StorageLevelimport org.apache.spark.HashPartitioner/*      pom.xml中加入如下配置 *                         org.apache.spark                    spark-streaming-flume_2.11                         2.1.0                                *                 *//*flume的conf文件a1.sources=r1a1.sinks=k1a1.channels=c1a1.sources.r1.type=spooldira1.sources.r1.spoolDir=/home/logsa1.sources.r1.fileHeader=truea1.sinks.k1.type=avroa1.sinks.k1.hostname= 192.168.1.9a1.sinks.k1.port= 8888a1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100a1.sources.r1.channels=c1a1.sinks.k1.channel=c1#the command to start a agent#bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template*/object SparkStreamingFlumePush {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]");    val sc = new SparkContext(conf)    val ssc = new StreamingContext(sc,Seconds(5))    ssc.checkpoint("d:\\checkpoint")    val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{    //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一    //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二    iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三    }    //总共有两种获取数据的方式,push和poll,这种是push即flume将数据推送给spark 该出的ip、port是spark的ip地址和port    val rds = FlumeUtils.createStream(ssc, "192.168.1.9", 8888, StorageLevel.MEMORY_ONLY)    val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" "))    .map(x=>(x,1))    .updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true)        result.print()        ssc.start()    ssc.awaitTermination()          }}
package hgs.spark.streamingimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.flume.FlumeUtilsimport java.net.InetAddressimport java.net.InetSocketAddressimport org.apache.spark.storage.StorageLevelimport org.apache.spark.HashPartitioner//spark支持1.6.0的flume版本/*      pom.xml中加入如下配置 *                         org.apache.spark                    spark-streaming-flume_2.11                         2.1.0                                *                 *//* * flume配置a1.sources = r1a1.sinks = k1a1.channels = c1a1.sources.r1.type=spooldira1.sources.r1.spoolDir = /home/logsa1.sources.r1.fileHeader = truea1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSinka1.sinks.k1.hostname=192.168.6.129a1.sinks.k1.port = 8888a1.channels.c1.type=memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity=100a1.sources.r1.channels=c1a1.sinks.k1.channel = c1#the command to start a agent#bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template*///同时需要如下三个包 将三个包放到flume的classpath下面/* groupId = org.apache.spark artifactId = spark-streaming-flume-sink_2.11 version = 2.1.0  groupId = org.scala-lang artifactId = scala-library version = 2.11.7  groupId = org.apache.commons artifactId = commons-lang3 version = 3.5*/ object SparkStreamingFlumePoll {  def main(args: Array[String]): Unit = {   val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]");   val sc = new SparkContext(conf)   val ssc = new StreamingContext(sc,Seconds(5))   ssc.checkpoint("d:\\checkpoint")   val ipSeq =  Seq(new InetSocketAddress("192.168.6.129",8888))   //这种方式通过spark从flume拉取数据   val rds = FlumeUtils.createPollingStream(ssc, ipSeq, StorageLevel.MEMORY_AND_DISK)      val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{    //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一    //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二    iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三    }   val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" "))    .map(x=>(x,1))    .updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true)        result.print()        ssc.start()    ssc.awaitTermination()  }}//遇到的错误 scala-library包在flume 的lib下面本来就有,包重复导致的冲突,删除一个/*18 Oct 2018 20:58:32,123 WARN  [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error while processing transaction.java.lang.IllegalStateException: begin() called when transaction is OPEN!        at com.google.common.base.Preconditions.checkState(Preconditions.java:145)        at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)        at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)        at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)        at scala.Option.foreach(Option.scala:236)        at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)        at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)        at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)        at java.util.concurrent.FutureTask.run(FutureTask.java:266)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)        at java.lang.Thread.run(Thread.java:748)18 Oct 2018 20:58:32,128 WARN  [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:59)  - Spark was unable to successfully process the events. Transaction is being rolled back.18 Oct 2018 20:58:32,128 WARN  [New I/O  worker #1] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:59)  - Received an error batch - no events were received from channel! */

看完了这篇文章,相信你对"spark 与flume 1.6.0的示例代码"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

0