(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考
发表于:2024-09-23 作者:千家信息网编辑
千家信息网最后更新 2024年09月23日,hu本期内容:1、Kafka解密背景:目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作
千家信息网最后更新 2024年09月23日(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考
hu本期内容:
1、Kafka解密
背景:
目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的。
所以Spark Streaming就产生了自定义RDD -> KafkaRDD.
源码分析:
1、KafkaRDD源码
private[kafka]class KafkaRDD[K: ClassTag,V: ClassTag,U <: Decoder[_]: ClassTag,T <: Decoder[_]: ClassTag,R: ClassTag] private[spark] ( sc: SparkContext,kafkaParams: Map[String, String],val offsetRanges: Array[OffsetRange], //指定数据范围leaders: Map[TopicAndPartition, (String, Int)],messageHandler: MessageAndMetadata[K, V] => R) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) =>val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray }
2、HasOffsetRanges
/** * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the * offset ranges in RDDs generated by the direct Kafka DStream (see * [[KafkaUtils.createDirectStream()]]). * {{{* KafkaUtils.createDirectStream(...).foreachRDD { rdd => * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges * ... * } * }}}*/trait HasOffsetRanges {def offsetRanges: Array[OffsetRange]}
3、KafkaRDD中的compute
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {val part = thePart.asInstanceOf[KafkaRDDPartition]assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))if (part.fromOffset == part.untilOffset) { log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +s"skipping ${part.topic} ${part.partition}")Iterator.empty} else {new KafkaRDDIterator(part, context) }}
SparkStreaming一般使用KafkaUtils的createDirectStream读取数据
def createDirectStream[K: ClassTag,V: ClassTag,KD <: Decoder[K]: ClassTag,VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext,kafkaParams: Map[String, String],topics: Set[String]): InputDStream[(K, V)] = {val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)val kc = new KafkaCluster(kafkaParams)val fromOffsets = getFromOffsets(kc, kafkaParams, topics)new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler)}
4、通过getFromOffsets的方法获取topic的fromOffset值
[kafka] ( kc: KafkaClusterkafkaParams: []topics: [] ): [TopicAndPartition] = {reset = kafkaParams.get().map(_.toLowerCase)result = { topicPartitions <- kc.getPartitions(topics).right leaderOffsets <- ((reset == ()) { kc.getEarliestLeaderOffsets(topicPartitions) } { kc.getLatestLeaderOffsets(topicPartitions) }).right } { leaderOffsets.map { (tplo) => (tplo.offset) } } KafkaCluster.(result)}
createDirectStream其实生成的是DirectKafkaInputDStream对象,通过compute方法会产生KafkaRDD
(validTime: Time): Option[KafkaRDD[]] = {untilOffsets = clamp(latestLeaderOffsets())rdd = []( context.sparkContextkafkaParamsuntilOffsetsmessageHandler)offsetRanges = .map { (tpfo) =>uo = untilOffsets(tp)(tp.topictp.partitionfouo.offset) }description = offsetRanges.filter { offsetRange =>offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange =>{offsetRange.topic}{offsetRange.partition}+{offsetRange.fromOffset}{offsetRange.untilOffset}}.mkString()metadata = (-> offsetRanges.toListStreamInputInfo.-> description)inputInfo = (rdd.countmetadata) ssc...reportInfo(validTimeinputInfo)= untilOffsets.map(kv => kv._1 -> kv._2.offset)(rdd)}
采用Direct的好处?
1. Direct方式没有数据缓存,因此不会出现内存溢出,但是如果采用Receiver的话就需要缓存。
2. 如果采用Receiver的方式,不方便做分布式,而Direct方式默认数据就在多台机器上。
3. 在实际操作的时候如果采用Receiver的方式的弊端是假设数据来不及处理,但是Direct就不会,因为是直接读取数据。
4. 语义一致性,Direct的方式数据一定会被执行。
数据
方式
源码
一致
一致性
方法
来源
缓存
语义
企业
内存
内容
分布式
制度
多台
好处
实际
对象
弊端
时候
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络公司网络服务器型号
网络安全验证码软件检测
软件开发活动记录
联想服务器 带外管理
战地服务器管理工具
数据库软件的关键技术
服务器设计与开发实例
负责中心网络安全总体工作的
spark连接第三方数据库
软件开发业务需求手册
服务器管理器无法开始服务器
sql数据库知识点总结
网络数据库开发教程
概述网络安全管理的必要性
电大计算机专网络安全论文
网络安全中_恢复是指
数据库运维内容
计算机网络技术三级保过
福州天睿网络技术
屏幕共享软件开发的几大特点
山东宏富网络技术有限公司
附加xskc数据库
服务器离职
软件开发怎么找代理
数据库索引原理图解 磁盘
怎么选手机软件开发公司
医药软件开发策划书
储存图片数据库
ro 检查服务器
软件开发的生命周期过程