千家信息网

(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考

发表于:2024-09-23 作者:千家信息网编辑
千家信息网最后更新 2024年09月23日,hu本期内容:1、Kafka解密背景:目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作
千家信息网最后更新 2024年09月23日(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考

hu本期内容:

1、Kafka解密


背景:
目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的。

所以Spark Streaming就产生了自定义RDD -> KafkaRDD.


源码分析:

1、KafkaRDD源码

private[kafka]class KafkaRDD[K: ClassTag,V: ClassTag,U <: Decoder[_]: ClassTag,T <: Decoder[_]: ClassTag,R: ClassTag] private[spark] (    sc: SparkContext,kafkaParams: Map[String, String],val offsetRanges: Array[OffsetRange], //指定数据范围leaders: Map[TopicAndPartition, (String, Int)],messageHandler: MessageAndMetadata[K, V] => R) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {override def getPartitions: Array[Partition] = {    offsetRanges.zipWithIndex.map { case (o, i) =>val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)    }.toArray  }

2、HasOffsetRanges



/** * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the * offset ranges in RDDs generated by the direct Kafka DStream (see * [[KafkaUtils.createDirectStream()]]). * {{{*   KafkaUtils.createDirectStream(...).foreachRDD { rdd => *      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges *      ... *   } * }}}*/trait HasOffsetRanges {def offsetRanges: Array[OffsetRange]}

3、KafkaRDD中的compute


override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {val part = thePart.asInstanceOf[KafkaRDDPartition]assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))if (part.fromOffset == part.untilOffset) {    log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +s"skipping ${part.topic} ${part.partition}")Iterator.empty} else {new KafkaRDDIterator(part, context)  }}

SparkStreaming一般使用KafkaUtils的createDirectStream读取数据


def createDirectStream[K: ClassTag,V: ClassTag,KD <: Decoder[K]: ClassTag,VD <: Decoder[V]: ClassTag] (    ssc: StreamingContext,kafkaParams: Map[String, String],topics: Set[String]): InputDStream[(K, V)] = {val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)val kc = new KafkaCluster(kafkaParams)val fromOffsets = getFromOffsets(kc, kafkaParams, topics)new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](    ssc, kafkaParams, fromOffsets, messageHandler)}

4、通过getFromOffsets的方法获取topic的fromOffset值


[kafka] (    kc: KafkaClusterkafkaParams: []topics: []  ): [TopicAndPartition] = {reset = kafkaParams.get().map(_.toLowerCase)result = {    topicPartitions <- kc.getPartitions(topics).right    leaderOffsets <- ((reset == ()) {      kc.getEarliestLeaderOffsets(topicPartitions)    } {      kc.getLatestLeaderOffsets(topicPartitions)    }).right  } {    leaderOffsets.map { (tplo) =>        (tplo.offset)    }  }  KafkaCluster.(result)}

createDirectStream其实生成的是DirectKafkaInputDStream对象,通过compute方法会产生KafkaRDD


(validTime: Time): Option[KafkaRDD[]] = {untilOffsets = clamp(latestLeaderOffsets())rdd = [](    context.sparkContextkafkaParamsuntilOffsetsmessageHandler)offsetRanges = .map { (tpfo) =>uo = untilOffsets(tp)(tp.topictp.partitionfouo.offset)  }description = offsetRanges.filter { offsetRange =>offsetRange.fromOffset != offsetRange.untilOffset  }.map { offsetRange =>{offsetRange.topic}{offsetRange.partition}+{offsetRange.fromOffset}{offsetRange.untilOffset}}.mkString()metadata = (-> offsetRanges.toListStreamInputInfo.-> description)inputInfo = (rdd.countmetadata)  ssc...reportInfo(validTimeinputInfo)= untilOffsets.map(kv => kv._1 -> kv._2.offset)(rdd)}

采用Direct的好处?
1. Direct方式没有数据缓存,因此不会出现内存溢出,但是如果采用Receiver的话就需要缓存。
2. 如果采用Receiver的方式,不方便做分布式,而Direct方式默认数据就在多台机器上。
3. 在实际操作的时候如果采用Receiver的方式的弊端是假设数据来不及处理,但是Direct就不会,因为是直接读取数据。
4. 语义一致性,Direct的方式数据一定会被执行。

0