千家信息网

第11课:Spark Streaming源码解读之Driver中的ReceiverTracker架构设计以及具体实现彻底研究

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,上节课将到了Receiver是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker,下面我们看看ReceiverTracker具体的功能及实现。一、 ReceiverT
千家信息网最后更新 2025年02月04日第11课:Spark Streaming源码解读之Driver中的ReceiverTracker架构设计以及具体实现彻底研究

上节课将到了Receiver是如何不断的接收数据的,并且接收到的数据的元数据会汇报给ReceiverTracker,下面我们看看ReceiverTracker具体的功能及实现。

一、 ReceiverTracker主要的功能:

  1. 在Executor上启动Receivers。

  2. 停止Receivers 。

  3. 更新Receiver接收数据的速率(也就是限流)

  4. 不断的等待Receivers的运行状态,只要Receivers停止运行,就重新启动Receiver。也就是Receiver的容错功能。

  5. 接受Receiver的注册。

  6. 借助ReceivedBlockTracker来管理Receiver接收数据的元数据。

  7. 汇报Receiver发送过来的错误信息


ReceiverTracker 管理了一个消息通讯体ReceiverTrackerEndpoint,用来与Receiver或者ReceiverTracker 进行消息通信。

在ReceiverTracker的start方法中,实例化了ReceiverTrackerEndpoint,并且在Executor上启动Receivers:

/** Start the endpoint and receiver execution thread. */def start(): Unit = synchronized {  if (isTrackerStarted) {    throw new SparkException("ReceiverTracker already started")  }  if (!receiverInputStreams.isEmpty) {    endpoint = ssc.env.rpcEnv.setupEndpoint(      "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))    if (!skipReceiverLaunch) launchReceivers()    logInfo("ReceiverTracker started")    trackerState = Started  }}

启动Receivr,其实是ReceiverTracker给ReceiverTrackerEndpoint发送了一个本地消息,ReceiverTrackerEndpoint将Receiver封装成RDD以job的方式提交给集群运行。

endpoint.send(StartAllReceivers(receivers))

这里的endpoint就是ReceiverTrackerEndpoint的引用。


Receiver启动后,会向ReceiverTracker注册,注册成功才算正式启动了。

override protected def onReceiverStart(): Boolean = {  val msg = RegisterReceiver(    streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)  trackerEndpoint.askWithRetry[Boolean](msg)}

当Receiver端接收到数据,达到一定的条件需要将数据写入BlockManager,并且将数据的元数据汇报给ReceiverTracker:

/** Store block and report it to driver */def pushAndReportBlock(    receivedBlock: ReceivedBlock,    metadataOption: Option[Any],    blockIdOption: Option[StreamBlockId]  ) {  val blockId = blockIdOption.getOrElse(nextBlockId)  val time = System.currentTimeMillis  val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")  val numRecords = blockStoreResult.numRecords  val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)  trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))  logDebug(s"Reported block $blockId")}


当ReceiverTracker收到元数据后,会在线程池中启动一个线程来写数据:

case AddBlock(receivedBlockInfo) =>  if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {    walBatchingThreadPool.execute(new Runnable {      override def run(): Unit = Utils.tryLogNonFatalError {        if (active) {          context.reply(addBlock(receivedBlockInfo))        } else {          throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")        }      }    })  } else {    context.reply(addBlock(receivedBlockInfo))  }

数据的元数据是交由ReceivedBlockTracker管理的。

数据最终被写入到streamIdToUnallocatedBlockQueues中:一个流对应一个数据块信息的队列。

private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]privateval streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]


每当Streaming 触发job时,会将队列中的数据分配成一个batch,并将数据写入timeToAllocatedBlocks数据结构。

privateval timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]....def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {  if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {    val streamIdToBlocks = streamIds.map { streamId =>        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))    }.toMap    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {      timeToAllocatedBlocks.put(batchTime, allocatedBlocks)      lastAllocatedBatchTime = batchTime    } else {      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")    }  } else {    // This situation occurs when:    // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,    // possibly processed batch job or half-processed batch job need to be processed again,    // so the batchTime will be equal to lastAllocatedBatchTime.    // 2. Slow checkpointing makes recovered batch time older than WAL recovered    // lastAllocatedBatchTime.    // This situation will only occurs in recovery time.    logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")  }}

可见一个batch会包含多个流的数据。


每当Streaming 的一个job运行完毕后:

private def handleJobCompletion(job: Job, completedTime: Long) {  val jobSet = jobSets.get(job.time)  jobSet.handleJobCompletion(job)  job.setEndTime(completedTime)  listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))  logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)  if (jobSet.hasCompleted) {    jobSets.remove(jobSet.time)    jobGenerator.onBatchCompletion(jobSet.time)    logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(      jobSet.totalDelay / 1000.0, jobSet.time.toString,      jobSet.processingDelay / 1000.0    ))    listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))  }  ...

JobScheduler会调用handleJobCompletion方法,最终会触发

jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)


这里的maxRememberDuration是DStream中每个时刻生成的RDD保留的最长时间。

def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {  require(cleanupThreshTime.milliseconds < clock.getTimeMillis())  val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq  logInfo("Deleting batches " + timesToCleanup)  if (writeToLog(BatchCleanupEvent(timesToCleanup))) {    timeToAllocatedBlocks --= timesToCleanup    writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))  } else {    logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")  }}

而最后

listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))

这个代码会调用

case batchCompleted: StreamingListenerBatchCompleted =>  listener.onBatchCompleted(batchCompleted)    ... 一路跟着下去...    /** * A RateController that sends the new rate to receivers, via the receiver tracker. */private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)    extends RateController(id, estimator) {  override def publish(rate: Long): Unit =    ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)}
/** Update a receiver's maximum ingestion rate */def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {  if (isTrackerStarted) {    endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))  }}
case UpdateReceiverRateLimit(streamUID, newRate) =>  for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {    eP.send(UpdateRateLimit(newRate))  }

发送调整速率的消息给Receiver,Receiver接到消息后,最终通过BlockGenerator来调整数据的写入的时间,而控制数据流的速率。

case UpdateRateLimit(eps) =>  logInfo(s"Received a new rate limit: $eps.")  registeredBlockGenerators.foreach { bg =>    bg.updateRate(eps)  }


备注:

1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains


0