千家信息网

第13课:Spark Streaming源码解读之Drive

发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,本期内容:ReceivedBlockTracker容错安全性DStream和JobGenerator容错安全性Driver的容错有两个层面:1. Receiver接收数据的元数据 2. Driver管
千家信息网最后更新 2025年02月08日第13课:Spark Streaming源码解读之Drive

本期内容:

  1. ReceivedBlockTracker容错安全性

  2. DStream和JobGenerator容错安全性


Driver的容错有两个层面:1. Receiver接收数据的元数据 2. Driver管理的各组件信息(调度和驱动层面)


元数据采用了WAL的容错机制

case AddBlock(receivedBlockInfo) =>  if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {    walBatchingThreadPool.execute(new Runnable {      override def run(): Unit = Utils.tryLogNonFatalError {        if (active) {          context.reply(addBlock(receivedBlockInfo))        } else {          throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")        }      }    })  } else {    context.reply(addBlock(receivedBlockInfo))  }    ...    /** Add new blocks for the given stream */private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {  receivedBlockTracker.addBlock(receivedBlockInfo)}


元数据其实是交由ReceivedBlockTracker管理的。

def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {  try {    val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))    if (writeResult) {      synchronized {        getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo      }      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")    } else {      logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +        s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")    }    writeResult  } catch {    case NonFatal(e) =>      logError(s"Error adding block $receivedBlockInfo", e)      false  }}

首先会调用writeToLog方法:

/** Write an update to the tracker to the write ahead log */private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {  if (isWriteAheadLogEnabled) {    logTrace(s"Writing record: $record")    try {      writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),        clock.getTimeMillis())      true    } catch {      case NonFatal(e) =>        logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)        false    }  } else {    true  }}


然后再将数据写入streamIdToUnallocatedBlockQueue 队列中。


每隔batchInterval时间后,Streaming的job被触发运行。此时要将streamIdToUnallocatedBlockQueue队列中的数据分配给具体的某个time。

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {  if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {    val streamIdToBlocks = streamIds.map { streamId =>        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))    }.toMap    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {      timeToAllocatedBlocks.put(batchTime, allocatedBlocks)      lastAllocatedBatchTime = batchTime    } else {      logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")    }  } else {    // This situation occurs when:    // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,    // possibly processed batch job or half-processed batch job need to be processed again,    // so the batchTime will be equal to lastAllocatedBatchTime.    // 2. Slow checkpointing makes recovered batch time older than WAL recovered    // lastAllocatedBatchTime.    // This situation will only occurs in recovery time.    logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")  }}

在此过程中也会写WAL日志


JobGenerator在每隔batchInterval时间,会被触发产生job

/** Generate jobs and perform checkpoint for the given `time`.  */private def generateJobs(time: Time) {  // Set the SparkEnv in this thread, so that job generation code can access the environment  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.  SparkEnv.set(ssc.env)  Try {    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch    graph.generateJobs(time) // generate jobs using allocated block  } match {    case Success(jobs) =>      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))    case Failure(e) =>      jobScheduler.reportError("Error generating jobs for time " + time, e)  }  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))}

最后往消息循环队列中放一个DoCheckpoint的消息。

JobGenerator接到消息后:

/** Processes all events */private def processEvent(event: JobGeneratorEvent) {  logDebug("Got event " + event)  event match {    case GenerateJobs(time) => generateJobs(time)    case ClearMetadata(time) => clearMetadata(time)    case DoCheckpoint(time, clearCheckpointDataLater) =>      doCheckpoint(time, clearCheckpointDataLater)    case ClearCheckpointData(time) => clearCheckpointData(time)  }}
/** Perform checkpoint for the give `time`. */private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {  if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {    logInfo("Checkpointing graph for time " + time)    ssc.graph.updateCheckpointData(time)    checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)  }}

根据ssc和time生成了一个Checkpoint对象。而ssc中有Driver的一切信息。所以当Driver崩溃后,能够根据Checkpoint数据来恢复Driver。

恢复的代码如下:

/** Restarts the generator based on the information in checkpoint */private def restart() {  // If manual clock is being used for testing, then  // either set the manual clock to the last checkpointed time,  // or if the property is defined set it to that time  if (clock.isInstanceOf[ManualClock]) {    val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds    val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)    clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)  }  val batchDuration = ssc.graph.batchDuration  // Batches when the master was down, that is,  // between the checkpoint and current restart timeval checkpointTime = ssc.initialCheckpoint.checkpointTimeval restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))  val downTimes = checkpointTime.until(restartTime, batchDuration)  logInfo("Batches during down time (" + downTimes.size + " batches): "    + downTimes.mkString(", "))  // Batches that were unprocessed before failureval pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)  logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +    pendingTimes.mkString(", "))  // Reschedule jobs for these times  val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime }    .distinct.sorted(Time.ordering)  logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +    timesToReschedule.mkString(", "))  timesToReschedule.foreach { time =>    // Allocate the related blocks when recovering from failure, because some blocks that were    // added but not allocated, are dangling in the queue after recovering, we have to allocate    // those blocks to the next batch, which is the batch they were supposed to go.    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch    jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))  }  // Restart the timer  timer.start(restartTime.milliseconds)  logInfo("Restarted JobGenerator at " + restartTime)}



备注:

1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains


0