千家信息网

(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性

发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,本期内容:1、Executor的WAL容错机制2、消息重放Executor的安全容错主要是数据的安全容错,那为什么不考虑数据计算的安全容错呢?原因是计算的时候Spark Streaming是借助于Sp
千家信息网最后更新 2025年01月22日(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性

本期内容:

1、Executor的WAL容错机制

2、消息重放


Executor的安全容错主要是数据的安全容错,那为什么不考虑数据计算的安全容错呢?

原因是计算的时候Spark Streaming是借助于Spark Core上RDD的安全容错的,所以天然的安全可靠的。

Executor的安全容错主要有:

1、数据副本:

有两种方式:a.借助底层的BlockManager,BlockManager做备份,通过传入的StorageLevel进行备份。

b. WAL方式进行容错。

2、接受到数据之后,不做副本,但是数据源支持存放,所谓存放就是可以反复的读取源数据。

容错的弊端:耗时间、耗空间。

简单的看下源代码:

/** Store block and report it to driver */def pushAndReportBlock(    receivedBlock: ReceivedBlock,metadataOption: Option[Any],blockIdOption: Option[StreamBlockId]  ) {val blockId = blockIdOption.getOrElse(nextBlockId)val time = System.currentTimeMillisval blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)  logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")val numRecords = blockStoreResult.numRecordsval blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))  logDebug(s"Reported block $blockId")}


privateval receivedBlockHandler: ReceivedBlockHandler = {if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {if (checkpointDirOption.isEmpty) {throw new SparkException("Cannot enable receiver write-ahead log without checkpoint directory set. " +"Please use streamingContext.checkpoint() to set the checkpoint directory. " +"See documentation for more details.")    }new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) //通过WAL容错  } else {new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) //通过BlockManager进行容错  }}
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {var numRecords = None: Option[Long]val putResult: Seq[(BlockId, BlockStatus)] = block match {case ArrayBufferBlock(arrayBuffer) =>      numRecords = Some(arrayBuffer.size.toLong)      blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,tellMaster = true)case IteratorBlock(iterator) =>val countIterator = new CountingIterator(iterator)val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,tellMaster = true)      numRecords = countIterator.count      putResultcase ByteBufferBlock(byteBuffer) =>      blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)case o =>throw new SparkException(s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")  }if (!putResult.map { _._1 }.contains(blockId)) {throw new SparkException(s"Could not store $blockId to block manager with storage level $storageLevel")  }BlockManagerBasedStoreResult(blockId, numRecords)}

简单流程图:


参考博客:http://blog.csdn.net/hanburgud/article/details/51471089

0