(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性
发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,本期内容:1、Executor的WAL容错机制2、消息重放Executor的安全容错主要是数据的安全容错,那为什么不考虑数据计算的安全容错呢?原因是计算的时候Spark Streaming是借助于Sp
千家信息网最后更新 2025年01月22日(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性
本期内容:
1、Executor的WAL容错机制
2、消息重放
Executor的安全容错主要是数据的安全容错,那为什么不考虑数据计算的安全容错呢?
原因是计算的时候Spark Streaming是借助于Spark Core上RDD的安全容错的,所以天然的安全可靠的。
Executor的安全容错主要有:
1、数据副本:
有两种方式:a.借助底层的BlockManager,BlockManager做备份,通过传入的StorageLevel进行备份。
b. WAL方式进行容错。
2、接受到数据之后,不做副本,但是数据源支持存放,所谓存放就是可以反复的读取源数据。
容错的弊端:耗时间、耗空间。
简单的看下源代码:
/** Store block and report it to driver */def pushAndReportBlock( receivedBlock: ReceivedBlock,metadataOption: Option[Any],blockIdOption: Option[StreamBlockId] ) {val blockId = blockIdOption.getOrElse(nextBlockId)val time = System.currentTimeMillisval blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")val numRecords = blockStoreResult.numRecordsval blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId")}
privateval receivedBlockHandler: ReceivedBlockHandler = {if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {if (checkpointDirOption.isEmpty) {throw new SparkException("Cannot enable receiver write-ahead log without checkpoint directory set. " +"Please use streamingContext.checkpoint() to set the checkpoint directory. " +"See documentation for more details.") }new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) //通过WAL容错 } else {new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) //通过BlockManager进行容错 }}
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {var numRecords = None: Option[Long]val putResult: Seq[(BlockId, BlockStatus)] = block match {case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,tellMaster = true)case IteratorBlock(iterator) =>val countIterator = new CountingIterator(iterator)val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,tellMaster = true) numRecords = countIterator.count putResultcase ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)case o =>throw new SparkException(s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") }if (!putResult.map { _._1 }.contains(blockId)) {throw new SparkException(s"Could not store $blockId to block manager with storage level $storageLevel") }BlockManagerBasedStoreResult(blockId, numRecords)}
简单流程图:
参考博客:http://blog.csdn.net/hanburgud/article/details/51471089
容错
安全
数据
副本
备份
方式
内容
博客
原因
天然
就是
底层
弊端
数据源
时候
机制
流程
流程图
消息
源代码
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全国防新闻
sunlight 数据库
DNS服务器设置为什么
35岁软件开发
农产品追溯平台软件开发公司
select语句选数据库
桌面版本和服务器版本什么区别
贵州微丰软件开发公司电话
4卡服务器
桂林发展高科技互联网
svn远程服务器管理
电脑怎么设置局域网服务器
软件开发开发什么东西
维护网络安全刻不容缓
数据库建的表修改后不允许保存
服务器安全狗 云锁
苏州戴尔服务器排行榜
房屋销售管理系统数据库单表查询
召开网络安全警示教育大会
苏州喔噻互联网科技薪金
张家口软件开发
腾讯云服务器设置tcp
foxpro虚拟数据库
信息技术网络技术会考
计算机网络技术最前沿的
海关数据库架构
北京北天信通网络技术
5G网络技术考证
vf中删除指定数据库
网络安全域指