(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,hu本期内容:1、Kafka解密背景:目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作
千家信息网最后更新 2025年02月04日(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考
hu本期内容:
1、Kafka解密
背景:
目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的。
所以Spark Streaming就产生了自定义RDD -> KafkaRDD.
源码分析:
1、KafkaRDD源码
private[kafka]class KafkaRDD[K: ClassTag,V: ClassTag,U <: Decoder[_]: ClassTag,T <: Decoder[_]: ClassTag,R: ClassTag] private[spark] ( sc: SparkContext,kafkaParams: Map[String, String],val offsetRanges: Array[OffsetRange], //指定数据范围leaders: Map[TopicAndPartition, (String, Int)],messageHandler: MessageAndMetadata[K, V] => R) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) =>val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray }
2、HasOffsetRanges
/** * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the * offset ranges in RDDs generated by the direct Kafka DStream (see * [[KafkaUtils.createDirectStream()]]). * {{{* KafkaUtils.createDirectStream(...).foreachRDD { rdd => * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges * ... * } * }}}*/trait HasOffsetRanges {def offsetRanges: Array[OffsetRange]}
3、KafkaRDD中的compute
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {val part = thePart.asInstanceOf[KafkaRDDPartition]assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))if (part.fromOffset == part.untilOffset) { log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +s"skipping ${part.topic} ${part.partition}")Iterator.empty} else {new KafkaRDDIterator(part, context) }}
SparkStreaming一般使用KafkaUtils的createDirectStream读取数据
def createDirectStream[K: ClassTag,V: ClassTag,KD <: Decoder[K]: ClassTag,VD <: Decoder[V]: ClassTag] ( ssc: StreamingContext,kafkaParams: Map[String, String],topics: Set[String]): InputDStream[(K, V)] = {val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)val kc = new KafkaCluster(kafkaParams)val fromOffsets = getFromOffsets(kc, kafkaParams, topics)new DirectKafkaInputDStream[K, V, KD, VD, (K, V)]( ssc, kafkaParams, fromOffsets, messageHandler)}
4、通过getFromOffsets的方法获取topic的fromOffset值
[kafka] ( kc: KafkaClusterkafkaParams: []topics: [] ): [TopicAndPartition] = {reset = kafkaParams.get().map(_.toLowerCase)result = { topicPartitions <- kc.getPartitions(topics).right leaderOffsets <- ((reset == ()) { kc.getEarliestLeaderOffsets(topicPartitions) } { kc.getLatestLeaderOffsets(topicPartitions) }).right } { leaderOffsets.map { (tplo) => (tplo.offset) } } KafkaCluster.(result)}
createDirectStream其实生成的是DirectKafkaInputDStream对象,通过compute方法会产生KafkaRDD
(validTime: Time): Option[KafkaRDD[]] = {untilOffsets = clamp(latestLeaderOffsets())rdd = []( context.sparkContextkafkaParamsuntilOffsetsmessageHandler)offsetRanges = .map { (tpfo) =>uo = untilOffsets(tp)(tp.topictp.partitionfouo.offset) }description = offsetRanges.filter { offsetRange =>offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange =>{offsetRange.topic}{offsetRange.partition}+{offsetRange.fromOffset}{offsetRange.untilOffset}}.mkString()metadata = (-> offsetRanges.toListStreamInputInfo.-> description)inputInfo = (rdd.countmetadata) ssc...reportInfo(validTimeinputInfo)= untilOffsets.map(kv => kv._1 -> kv._2.offset)(rdd)}
采用Direct的好处?
1. Direct方式没有数据缓存,因此不会出现内存溢出,但是如果采用Receiver的话就需要缓存。
2. 如果采用Receiver的方式,不方便做分布式,而Direct方式默认数据就在多台机器上。
3. 在实际操作的时候如果采用Receiver的方式的弊端是假设数据来不及处理,但是Direct就不会,因为是直接读取数据。
4. 语义一致性,Direct的方式数据一定会被执行。
数据
方式
源码
一致
一致性
方法
来源
缓存
语义
企业
内存
内容
分布式
制度
多台
好处
实际
对象
弊端
时候
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
强网杯网络安全论坛
租房app软件开发多少钱
odbc连接多个数据库
赤峰定制软件开发咨询报价
软件开发和游戏运营
服务器主机管理
信用数据库安全
服务器安全管理制度
csgo 钓鱼岛服务器
2022年网络安全周宣传感悟
云南高校党建软件开发
实验8访问数据库应用实例
易语言超级列表框数据存入数据库
数据库控制业务流程
安卓软件开发培训机构哪个比较好
网站管理员数据库设计
宜都市思行网络技术工作室
网络技术常用的维护软件
通信网络安全与工程专业
北京屺屹网络技术有限公司
考数据库系统概论的专业
软件开发和游戏运营
英雄联盟扭曲森林服务器
网络安全第一重要
5g网络安全形势和应对
自己布置服务器
三星s21无法与谷歌服务器通信
网络安全网瘾课题研究
泰州企业软件开发答疑解惑
中职网络安全与信息安全课件