(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性
发表于:2025-02-23 作者:千家信息网编辑
千家信息网最后更新 2025年02月23日,本期内容:1、Executor的WAL容错机制2、消息重放Executor的安全容错主要是数据的安全容错,那为什么不考虑数据计算的安全容错呢?原因是计算的时候Spark Streaming是借助于Sp
千家信息网最后更新 2025年02月23日(版本定制)第12课:Spark Streaming源码解读之Executor容错安全性
本期内容:
1、Executor的WAL容错机制
2、消息重放
Executor的安全容错主要是数据的安全容错,那为什么不考虑数据计算的安全容错呢?
原因是计算的时候Spark Streaming是借助于Spark Core上RDD的安全容错的,所以天然的安全可靠的。
Executor的安全容错主要有:
1、数据副本:
有两种方式:a.借助底层的BlockManager,BlockManager做备份,通过传入的StorageLevel进行备份。
b. WAL方式进行容错。
2、接受到数据之后,不做副本,但是数据源支持存放,所谓存放就是可以反复的读取源数据。
容错的弊端:耗时间、耗空间。
简单的看下源代码:
/** Store block and report it to driver */def pushAndReportBlock( receivedBlock: ReceivedBlock,metadataOption: Option[Any],blockIdOption: Option[StreamBlockId] ) {val blockId = blockIdOption.getOrElse(nextBlockId)val time = System.currentTimeMillisval blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")val numRecords = blockStoreResult.numRecordsval blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId")}
privateval receivedBlockHandler: ReceivedBlockHandler = {if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {if (checkpointDirOption.isEmpty) {throw new SparkException("Cannot enable receiver write-ahead log without checkpoint directory set. " +"Please use streamingContext.checkpoint() to set the checkpoint directory. " +"See documentation for more details.") }new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) //通过WAL容错 } else {new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) //通过BlockManager进行容错 }}
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {var numRecords = None: Option[Long]val putResult: Seq[(BlockId, BlockStatus)] = block match {case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,tellMaster = true)case IteratorBlock(iterator) =>val countIterator = new CountingIterator(iterator)val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,tellMaster = true) numRecords = countIterator.count putResultcase ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)case o =>throw new SparkException(s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") }if (!putResult.map { _._1 }.contains(blockId)) {throw new SparkException(s"Could not store $blockId to block manager with storage level $storageLevel") }BlockManagerBasedStoreResult(blockId, numRecords)}
简单流程图:
参考博客:http://blog.csdn.net/hanburgud/article/details/51471089
容错
安全
数据
副本
备份
方式
内容
博客
原因
天然
就是
底层
弊端
数据源
时候
机制
流程
流程图
消息
源代码
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络技术及网络安全法 论文
重庆日语软件开发
辽宁工程项目管控软件开发平台
黄石通天晓网络技术有限公司
网络安全常见的身份证技术
网络安全感最新报告
上海网络技术开发服务
资料库数据库
全国版工商数据库
php切换数据库
服务器主板怎么修复
美国服务器租用光算云在哪儿
火芽网络技术有限公司
数据库 对象
医院网络安全建设管理
数据库题库 系统概念
计算机网络技术男
云管理服务器搭建
中鸣机器人软件开发
性能测试与网络安全好找工作不
微服务数据库高并发解决方案
网站服务器数据会备份吗
日本最权威的经济数据库
软件开发服务公司怎么运作
小学生全网络安全管理制度
pi软件开发语言
自己家没有安全性的网络安全不
数据库的表的字段默认值怎么定义
数据库操作题设计视图例题
网络安全是哪本书