spark 与flume 1.6.0的示例代码
发表于:2025-01-17 作者:千家信息网编辑
千家信息网最后更新 2025年01月17日,小编给大家分享一下spark 与flume 1.6.0的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!package hgs.spark.streamingimport or
千家信息网最后更新 2025年01月17日spark 与flume 1.6.0的示例代码
小编给大家分享一下spark 与flume 1.6.0的示例代码,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
package hgs.spark.streamingimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.storage.StorageLevelimport org.apache.spark.HashPartitioner/* pom.xml中加入如下配置 ** *//*flume的conf文件a1.sources=r1a1.sinks=k1a1.channels=c1a1.sources.r1.type=spooldira1.sources.r1.spoolDir=/home/logsa1.sources.r1.fileHeader=truea1.sinks.k1.type=avroa1.sinks.k1.hostname= 192.168.1.9a1.sinks.k1.port= 8888a1.channels.c1.type=memorya1.channels.c1.capacity=1000a1.channels.c1.transactionCapacity=100a1.sources.r1.channels=c1a1.sinks.k1.channel=c1#the command to start a agent#bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template*/object SparkStreamingFlumePush { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]"); val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint("d:\\checkpoint") val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{ //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一 //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二 iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三 } //总共有两种获取数据的方式,push和poll,这种是push即flume将数据推送给spark 该出的ip、port是spark的ip地址和port val rds = FlumeUtils.createStream(ssc, "192.168.1.9", 8888, StorageLevel.MEMORY_ONLY) val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" ")) .map(x=>(x,1)) .updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true) result.print() ssc.start() ssc.awaitTermination() }} org.apache.spark spark-streaming-flume_2.11 2.1.0
package hgs.spark.streamingimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.flume.FlumeUtilsimport java.net.InetAddressimport java.net.InetSocketAddressimport org.apache.spark.storage.StorageLevelimport org.apache.spark.HashPartitioner//spark支持1.6.0的flume版本/* pom.xml中加入如下配置 ** *//* * flume配置a1.sources = r1a1.sinks = k1a1.channels = c1a1.sources.r1.type=spooldira1.sources.r1.spoolDir = /home/logsa1.sources.r1.fileHeader = truea1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSinka1.sinks.k1.hostname=192.168.6.129a1.sinks.k1.port = 8888a1.channels.c1.type=memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity=100a1.sources.r1.channels=c1a1.sinks.k1.channel = c1#the command to start a agent#bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template*///同时需要如下三个包 将三个包放到flume的classpath下面/* groupId = org.apache.spark artifactId = spark-streaming-flume-sink_2.11 version = 2.1.0 groupId = org.scala-lang artifactId = scala-library version = 2.11.7 groupId = org.apache.commons artifactId = commons-lang3 version = 3.5*/ object SparkStreamingFlumePoll { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("flume-push").setMaster("local[2]"); val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(5)) ssc.checkpoint("d:\\checkpoint") val ipSeq = Seq(new InetSocketAddress("192.168.6.129",8888)) //这种方式通过spark从flume拉取数据 val rds = FlumeUtils.createPollingStream(ssc, ipSeq, StorageLevel.MEMORY_AND_DISK) val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{ //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一 //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二 iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三 } val result = rds.flatMap(x=>(new String(x.event.getBody.array())).split(" ")) .map(x=>(x,1)) .updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true) result.print() ssc.start() ssc.awaitTermination() }}//遇到的错误 scala-library包在flume 的lib下面本来就有,包重复导致的冲突,删除一个/*18 Oct 2018 20:58:32,123 WARN [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80) - Error while processing transaction.java.lang.IllegalStateException: begin() called when transaction is OPEN! at com.google.common.base.Preconditions.checkState(Preconditions.java:145) at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114) at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113) at scala.Option.foreach(Option.scala:236) at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243) at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)18 Oct 2018 20:58:32,128 WARN [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:59) - Spark was unable to successfully process the events. Transaction is being rolled back.18 Oct 2018 20:58:32,128 WARN [New I/O worker #1] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:59) - Received an error batch - no events were received from channel! */ org.apache.spark spark-streaming-flume_2.11 2.1.0
看完了这篇文章,相信你对"spark 与flume 1.6.0的示例代码"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!
方式
数据
配置
代码
示例
三个
篇文章
中加
同时
地址
完了
文件
更多
版本
知识
行业
资讯
资讯频道
错误
频道
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
美版苹果手机浏览器找不到服务器
主要数据库技术
win7ftp服务器
怎么监测服务器安全性
大学 网络技术应用
江苏省信息网络安全协会赵和平
第六空间网络技术服务有限公司
宁波品牌网络技术哪个好
澳洲网络安全工作
出入境网络安全宣传活动简报
央企调岗到软件开发
怎样写网络安全宣讲稿
数据库建表中的说明怎么写
北京调度服务器品牌虚拟主机
hutool 数据库性能
公安机关网络安全工作调研报告
交通银行软件开发中心子公司
京东代挂什么配置服务器
免费https备案域名服务器
代理服务器的登陆软件
万集软件的数据库的信息
大学 网络技术应用
阿里云专用网络安全组
查看服务器上下载的软件
公共数据库怎么发布
数据库封装库
计算机网络技术涉及编程吗
服务器设置端口
国服服务器信号差怎么办
进入网络安全模式还是黑屏