spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据
发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通
千家信息网最后更新 2025年02月07日spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据
spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
package hgs.spark.streamingimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.streaming.StreamingContextimport org.apache.spark.streaming.Secondsimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.kafka.KafkaClusterimport scala.collection.immutable.Mapimport java.util.NoSuchElementExceptionimport org.apache.spark.SparkExceptionimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport org.codehaus.jackson.map.deser.std.PrimitiveArrayDeserializers.StringDeserimport kafka.serializer.StringDecoderimport org.apache.spark.streaming.kafka.DirectKafkaInputDStreamimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.kafka.HasOffsetRangesimport org.apache.spark.HashPartitionerobject SparkStreamingKafkaDirectWordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[5]") conf.set("spark.streaming.kafka.maxRatePerPartition", "1") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(1)) ssc.checkpoint("d:\\checkpoint") val kafkaParams = Map[String,String]( "metadata.broker.list"->"bigdata01:9092,bigdata02:9092,bigdata03:9092", "group.id"->"group_hgs", "zookeeper.connect"->"bigdata01:2181,bigdata02:2181,bigdata03:2181") val kc = new KafkaCluster(kafkaParams) val topics = Set[String]("test") //每个rdd返回的数据是(K,V)类型的,该函数规定了函数返回数据的类型 val mmdFunct = (mmd: MessageAndMetadata[String, String])=>(mmd.topic+" "+mmd.partition,mmd.message()) val rds = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc, kafkaParams, getOffsets(topics,kc,kafkaParams),mmdFunct) val updateFunc=(iter:Iterator[(String,Seq[Int],Option[Int])])=>{ //iter.flatMap(it=>Some(it._2.sum+it._3.getOrElse(0)).map((it._1,_)))//方式一 //iter.flatMap{case(x,y,z)=>{Some(y.sum+z.getOrElse(0)).map((x,_))}}//方式二 iter.flatMap(it=>Some(it._1,(it._2.sum.toInt+it._3.getOrElse(0))))//方式三 } val words = rds.flatMap(x=>x._2.split(" ")).map((_,1)) //val wordscount = words.map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(sc.defaultMinPartitions), true) //println(getOffsets(topics,kc,kafkaParams)) rds.foreachRDD(rdd=>{ if(!rdd.isEmpty()){ //对每个dataStreamoffset进行更新 upateOffsets(topics,kc,rdd,kafkaParams) } } ) words.print() ssc.start() ssc.awaitTermination() } def getOffsets(topics : Set[String],kc:KafkaCluster,kafkaParams:Map[String,String]):Map[TopicAndPartition, Long]={ val topicAndPartitionsOrNull = kc.getPartitions(topics) if(topicAndPartitionsOrNull.isLeft){ throw new SparkException(s"$topics in the set may not found") } else{ val topicAndPartitions = topicAndPartitionsOrNull.right.get val groups = kafkaParams.get("group.id").get val offsetOrNull = kc.getConsumerOffsets(groups, topicAndPartitions) if(offsetOrNull.isLeft){ println(s"$groups you assignment may not exists!now redirect to zero!") //如果没有消费过,则从最开始的位置消费 val erliestOffset = kc.getEarliestLeaderOffsets(topicAndPartitions) if(erliestOffset.isLeft) throw new SparkException(s"Topics and Partions not definded not found!") else erliestOffset.right.get.map(x=>(x._1,x._2.offset)) } else{ //如果消费组已经存在则从记录的地方开始消费 offsetOrNull.right.get } } } //每次拉取数据后存储offset到ZK def upateOffsets(topics : Set[String],kc:KafkaCluster,directRDD:RDD[(String,String)],kafkaParams:Map[String,String]){ val offsetRanges = directRDD.asInstanceOf[HasOffsetRanges].offsetRanges for(offr <-offsetRanges){ val topicAndPartitions = TopicAndPartition(offr.topic,offr.partition) val yesOrNo = kc.setConsumerOffsets(kafkaParams.get("group.id").get, Map(topicAndPartitions->offr.untilOffset)) if(yesOrNo.isLeft){ println(s"Error when update offset of $topicAndPartitions") } } } }/* val conf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(4)) val kafkaParams = Map[String,String]( "metadata.broker.list"->"bigdata01:9092,bigdata02:9092,bigdata03:9092") val kc = new KafkaCluster(kafkaParams) //获取topic与paritions的信息 //val tmp = kc.getPartitions(Set[String]("test7")) //结果:topicAndPartitons=Set([test7,0], [test7,1], [test7,2]) //val topicAndPartitons = tmp.right.get //println(topicAndPartitons) //每个分区对应的leader信息 //val tmp = kc.getPartitions(Set[String]("test7")) //val topicAndPartitons = tmp.right.get //结果:leadersPerPartitions= Right(Map([test7,0] -> (bigdata03,9092), [test7,1] -> (bigdata01,9092), [test7,2] -> (bigdata02,9092))) //val leadersPerPartitions = kc.findLeaders(topicAndPartitons) //println(leadersPerPartitions) //每增加一条消息,对应的partition的offset都会加1,即LeaderOffset(bigdata02,9092,23576)第三个参数会加一 //val tmp = kc.getPartitions(Set[String]("test")) //val topicAndPartitons = tmp.right.get //结果t= Right(Map([test7,0] -> LeaderOffset(bigdata03,9092,23568), [test7,2] -> LeaderOffset(bigdata02,9092,23576), [test7,1] -> LeaderOffset(bigdata01,9092,23571))) //val t = kc.getLatestLeaderOffsets(topicAndPartitons) // println(t) //findLeader需要两个参数 topic 分区编号 //val tmp = kc.findLeader("test7",0) //结果leader=RightProjection(Right((bigdata03,9092))) //val leader = tmp.right //val tp = leader.flatMap(x=>{Either.cond(false, None,(x._1,x._2))}) val tmp = kc.getPartitions(Set[String]("test")) val ttp = tmp.right.get while(true){ try{ val tp = kc.getConsumerOffsets("group_test1", ttp) val maps = tp.right.get println(maps) Thread.sleep(2000) } catch{ case ex:NoSuchElementException=>{println("test")} } }*/
看完上述内容,你们掌握spark-streaming-kafka怎样通过KafkaUtils.createDirectStream的方式处理数据的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
数据
方式
结果
消费
处理
信息
内容
函数
参数
方法
更多
类型
问题
加一
束手无策
为此
三个
两个
位置
原因
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全启动仪式主持词
服务器中内存作用
你对网络安全有什么看法英语
软件开发实施方案报告
软件开发雷达系统
武汉艾迪互联网科技有限公司
2020国家网络安全答案
java网络安全编码
网易我的世界hy服务器闪退
如何保存到桌面的数据库
网络安全产品属于什么经营范围
长沙智能云控软件开发商
服务器停止响应怎么办
用友网络工业软件开发
wmi连接远程服务器
互联网科技公司招聘海报
租衣软件开发思路
北京推荐的软件开发材料
互联网科技创新的标语
甲骨文数据库软件是谁开发的
烽火服务器集中管理软件
有杀气童话2选择哪个服务器好
pc桌面软件开发平台
98网络安全团队
hp刀片服务器价格
刀片式服务器支架安装
网络安全训练营43讲后门连接
赣州高性价比服务器费用多少
数据库合并字段
ibm数据库教程