千家信息网

Driver容错安全性怎么实现

发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,这篇文章主要介绍"Driver容错安全性怎么实现",在日常操作中,相信很多人在Driver容错安全性怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Driver容
千家信息网最后更新 2025年01月22日Driver容错安全性怎么实现

这篇文章主要介绍"Driver容错安全性怎么实现",在日常操作中,相信很多人在Driver容错安全性怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Driver容错安全性怎么实现"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

  • · 第一、看ReceiverTracker的容错,主要是ReceiverTracker接收元数据的进入WAL,看ReceiverTracker的addBlock方法,代码如下

    def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

    try {

    val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))

    if (writeResult) {

    synchronized {

    getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo

    }

    logDebug(s"Stream ${receivedBlockInfo.streamId} received " +

    s"block ${receivedBlockInfo.blockStoreResult.blockId}")

    } else {

    logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +

    s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")

    }

    writeResult

    } catch {

    case NonFatal(e) =>

    logError(s"Error adding block $receivedBlockInfo", e)

    false

    }

    }

    writeToLog方法就是进行WAL的操作,看writeToLog的代码

    private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {

    if (isWriteAheadLogEnabled) {

    logTrace(s"Writing record: $record")

    try {

    writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),

    clock.getTimeMillis())

    true

    } catch {

    case NonFatal(e) =>

    logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)

    false

    }

    } else {

    true

    }

    }

    首先判断是否开启了WAL,根据一下isWriteAheadLogEnabled值

    private[streaming] def isWriteAheadLogEnabled: Boolean = writeAheadLogOption.nonEmpty

    接着看writeAheadLogOption

    privateval writeAheadLogOption = createWriteAheadLog()

    再看createWriteAheadLog()方法

    private def createWriteAheadLog(): Option[WriteAheadLog] = {

    checkpointDirOption.map { checkpointDir =>

    val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)

    WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)

    }

    }

    根据checkpoint的配置,获取checkpoint的目录,这里可以看出,checkpoint可以有多个目录。
    写完WAL才将receivedBlockInfo放到内存队列getReceivedBlockQueue中

    · 第二、看ReceivedBlockTracker的allocateBlocksToBatch方法,代码如下

    def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {

    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {

    val streamIdToBlocks = streamIds.map { streamId =>

    (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))

    }.toMap

    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

    timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

    lastAllocatedBatchTime = batchTime

    } else {

    logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

    }

    } else {

    // This situation occurs when:

    // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,

    // possibly processed batch job or half-processed batch job need to be processed again,

    // so the batchTime will be equal to lastAllocatedBatchTime.

    // 2. Slow checkpointing makes recovered batch time older than WAL recovered

    // lastAllocatedBatchTime.

    // This situation will only occurs in recovery time.

    logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

    }

    }

    首先从getReceivedBlockQueue中获取每一个receiver的ReceivedBlockQueue队列赋值给streamIdToBlocks,然后包装一下

    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

    allocatedBlocks就是根据时间获取的一批元数据,交给对应batchDuration的job,job在执行的时候就可以使用,在使用前先进行WAL,如果job出错恢复后,可以知道数据计算到什么位置

    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

    timeToAllocatedBlocks.put(batchTime, allocatedBlocks)

    lastAllocatedBatchTime = batchTime

    } else {

    logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")

    }

    · 第三、看cleanupOldBatches方法,cleanupOldBatches的功能是从内存中清楚不用的batches元数据,再删除WAL的数据,再删除之前把要删除的batches信息也进行WAL

    def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {

    require(cleanupThreshTime.milliseconds < clock.getTimeMillis())

    val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq

    logInfo("Deleting batches " + timesToCleanup)

    if (writeToLog(BatchCleanupEvent(timesToCleanup))) {

    timeToAllocatedBlocks --= timesToCleanup

    writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))

    } else {

    logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")

    }

    }

    · 总结一下上面的三种WAL,对应下面的三种事件,这就是ReceiverTracker的容错

    /** Trait representing any event in the ReceivedBlockTracker that updates its state. */

    private[streaming] sealed trait ReceivedBlockTrackerLogEvent

    private[streaming] case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo)

    extends ReceivedBlockTrackerLogEvent

    private[streaming] case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks)

    extends ReceivedBlockTrackerLogEvent

    private[streaming] case class BatchCleanupEvent(times: Seq[Time]) extends ReceivedBlockTrackerLogEvent

    · 看一下Dstream.graph和JobGenerator的容错,从开始

    private def generateJobs(time: Time) {

    SparkEnv has been removed.

    SparkEnv.set(ssc.env)

    Try {

    // allocate received blocks to batch

    // 分配接收到的数据给batch

    jobScheduler.receiverTracker.allocateBlocksToBatch(time)

    // 使用分配的块生成jobs

    graph.generateJobs(time) // generate jobs using allocated block

    } match {

    case Success(jobs) =>

    // 获取元数据信息

    val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

    // 提交jobSet

    jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

    case Failure(e) =>

    jobScheduler.reportError("Error generating jobs for time " + time, e)

    }

    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

    }

    jobs生成完成后发送DoCheckpoint消息,最终调用doCheckpoint方法,代码如下

    private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

    if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

    logInfo("Checkpointing graph for time " + time)

    ssc.graph.updateCheckpointData(time)

    checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

    }

    }

到此,关于"Driver容错安全性怎么实现"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0