第13课:Spark Streaming源码解读之Drive
发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,本期内容:ReceivedBlockTracker容错安全性DStream和JobGenerator容错安全性Driver的容错有两个层面:1. Receiver接收数据的元数据 2. Driver管
千家信息网最后更新 2025年02月08日第13课:Spark Streaming源码解读之Drive
本期内容:
ReceivedBlockTracker容错安全性
DStream和JobGenerator容错安全性
Driver的容错有两个层面:1. Receiver接收数据的元数据 2. Driver管理的各组件信息(调度和驱动层面)
元数据采用了WAL的容错机制
case AddBlock(receivedBlockInfo) => if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) { walBatchingThreadPool.execute(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { if (active) { context.reply(addBlock(receivedBlockInfo)) } else { throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.") } } }) } else { context.reply(addBlock(receivedBlockInfo)) } ... /** Add new blocks for the given stream */private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { receivedBlockTracker.addBlock(receivedBlockInfo)}
元数据其实是交由ReceivedBlockTracker管理的。
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { try { val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) if (writeResult) { synchronized { getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo } logDebug(s"Stream ${receivedBlockInfo.streamId} received " + s"block ${receivedBlockInfo.blockStoreResult.blockId}") } else { logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " + s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.") } writeResult } catch { case NonFatal(e) => logError(s"Error adding block $receivedBlockInfo", e) false }}
首先会调用writeToLog方法:
/** Write an update to the tracker to the write ahead log */private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { if (isWriteAheadLogEnabled) { logTrace(s"Writing record: $record") try { writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)), clock.getTimeMillis()) true } catch { case NonFatal(e) => logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e) false } } else { true }}
然后再将数据写入streamIdToUnallocatedBlockQueue 队列中。
每隔batchInterval时间后,Streaming的job被触发运行。此时要将streamIdToUnallocatedBlockQueue队列中的数据分配给具体的某个time。
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { val streamIdToBlocks = streamIds.map { streamId => (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) }.toMap val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } else { logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") } } else { // This situation occurs when: // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, // possibly processed batch job or half-processed batch job need to be processed again, // so the batchTime will be equal to lastAllocatedBatchTime. // 2. Slow checkpointing makes recovered batch time older than WAL recovered // lastAllocatedBatchTime. // This situation will only occurs in recovery time. logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") }}
在此过程中也会写WAL日志
JobGenerator在每隔batchInterval时间,会被触发产生job
/** Generate jobs and perform checkpoint for the given `time`. */private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))}
最后往消息循环队列中放一个DoCheckpoint的消息。
JobGenerator接到消息后:
/** Processes all events */private def processEvent(event: JobGeneratorEvent) { logDebug("Got event " + event) event match { case GenerateJobs(time) => generateJobs(time) case ClearMetadata(time) => clearMetadata(time) case DoCheckpoint(time, clearCheckpointDataLater) => doCheckpoint(time, clearCheckpointDataLater) case ClearCheckpointData(time) => clearCheckpointData(time) }}
/** Perform checkpoint for the give `time`. */private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) { if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) { logInfo("Checkpointing graph for time " + time) ssc.graph.updateCheckpointData(time) checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) }}
根据ssc和time生成了一个Checkpoint对象。而ssc中有Driver的一切信息。所以当Driver崩溃后,能够根据Checkpoint数据来恢复Driver。
恢复的代码如下:
/** Restarts the generator based on the information in checkpoint */private def restart() { // If manual clock is being used for testing, then // either set the manual clock to the last checkpointed time, // or if the property is defined set it to that time if (clock.isInstanceOf[ManualClock]) { val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0) clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime) } val batchDuration = ssc.graph.batchDuration // Batches when the master was down, that is, // between the checkpoint and current restart timeval checkpointTime = ssc.initialCheckpoint.checkpointTimeval restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) logInfo("Batches during down time (" + downTimes.size + " batches): " + downTimes.mkString(", ")) // Batches that were unprocessed before failureval pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering) logInfo("Batches pending processing (" + pendingTimes.size + " batches): " + pendingTimes.mkString(", ")) // Reschedule jobs for these times val timesToReschedule = (pendingTimes ++ downTimes).filter { _ < restartTime } .distinct.sorted(Time.ordering) logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach { time => // Allocate the related blocks when recovering from failure, because some blocks that were // added but not allocated, are dangling in the queue after recovering, we have to allocate // those blocks to the next batch, which is the batch they were supposed to go. jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) } // Restart the timer timer.start(restartTime.milliseconds) logInfo("Restarted JobGenerator at " + restartTime)}
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
数据
容错
消息
队列
安全
安全性
层面
时间
管理
两个
中放
代码
件信息
信息
公众
内容
备注
大数
实战
对象
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
中华动漫数据库
网络安全法27条司法解释
网络安全绿盟科技
杭州岚森网络技术有限公司
国内最大的网络安全周公司
数据库调用函数比较慢
怎么进行数据库id转换
吉林智能网络技术服务哪家好
whn服务器安装宝塔
软件开发方面什么是黑盒子
绩效考核数据库设计
大学课程软件开发主要讲了什么
vf数据库开发
迁安辅助网络技术不二之选
网络安全迎检报告
无线网络技术原理课本
js如何操作数据库
幼儿网络安全中班
数据库指定
算法在数据库中的应用几个表格
网络安全绿盟科技
web检测mysql数据库
单路通和双路通服务器
服务器硬件怎么开机
软件开发工作室名录
龙头企业 网络安全
常用服务器类型
战地之王服务器怎么样
银行里面软件开发代码提交到哪
余姚租房网络安全