千家信息网

第4课:Spark Streaming的Exactly-One的事务处理

发表于:2025-02-24 作者:千家信息网编辑
千家信息网最后更新 2025年02月24日,Spark Streaming的事务处理和关系型数据库的事务的概念有所不同,关系型数据库事务关注的是语句级别的一致性,例如银行转账。而Spark Streaming的事务关注的是某次job执行的一致性
千家信息网最后更新 2025年02月24日第4课:Spark Streaming的Exactly-One的事务处理

Spark Streaming的事务处理和关系型数据库的事务的概念有所不同,关系型数据库事务关注的是语句级别的一致性,例如银行转账。而Spark Streaming的事务关注的是某次job执行的一致性。也就是如何保证Job在处理数据的过程中做到如下两点:

  • 不丢失数据

  • 不重复处理数据


SparkStreaming程序执行架构大致如下:


一、我们先来说说丢失数据的情况:

  1. Receiver接收到数据后,首先会在Executor级别上保存数据(根据StorageLevel的设置),例如socketTextStream的Receiver。在内存和磁盘上保留2份副本数据

def socketTextStream(    hostname: String,    port: Int,    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)}

如果StorageLevel设置的是只进行内存级别的存储,那么当程序崩溃后,即便对Driver进行了Checkpoint,然后重新启动程序。该部分数据也会丢失。因为Driver的Checkpoint并不对计算数据进行保存。

我们假设StorageLevel设置了磁盘级别的存储,也不能完全保证数据不被丢失,因为Receiver并不是接收一条数据写一次磁盘,而是按照数据块为单位写数据。然后将数据块的元数据信息发送给Driver,Driver的Checkpoint记录的数Block的元数据信息。当数据块写到一半的时候,或者是元数据还没有发送给Driver的时候,Executor崩溃了,数据也就丢失啦。

解决方案:为了减少这种情况的发送,可以在Receiver端引入WAL写机制,因为WAL写的频率要比数据块的频率高的多。这样,当Executor恢复的时候,可以读取WAL日志恢复数据块。

但是通过WAL方式会极大的损伤Spark Streaming中Receivers接受数据的性能;


WAL也不能完全的解决数据丢失的问题,就像Oracle一样,日志文件的写,也是先写到内存中,然后根据一定的触发条件再将数据写到磁盘。如果还没有来的及写WAL日志,此时数据也会有不一致的情况(数据已经接收,但是还没有写到WAL的这部分数据是恢复不出来的。)。


Spark Streaming 1.3的时候为了避免WAL的性能损失和实现Exactly Once而提供了Kafka Direct API,把Kafka作为文件存储系统!!!此时兼具有流的优势和文件系统的优势,至此,Spark Streaming+Kafka就构建了完美的流处理世界!!!所有的Executors通过Kafka API直接消费数据,直接管理Offset,所以也不会重复消费数据;事务实现啦!!!



2. Driver崩溃,此时Job正在处理的数据,包括Receiver已经接收到还未被处理的数据将全部丢失。

解决方案:对Driver进行Checkpoint,此处的Checkpoint和RDD的Checkpoint并不一样。

我们看看Checkpoint都包含哪些属性:

private[streaming]class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)  extends Logging with Serializable {  val master = ssc.sc.master  val framework = ssc.sc.appNameval jars = ssc.sc.jars  val graph = ssc.graph  val checkpointDir = ssc.checkpointDir  val checkpointDuration = ssc.checkpointDuration  val pendingTimes = ssc.scheduler.getPendingTimes().toArray  val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)  val sparkConfPairs = ssc.conf.getAll

其中graph是DStreamGraph的实例化,它里面包含了InputDStream

privateval inputStreams = new ArrayBuffer[InputDStream[_]]()

我们以DirectKafkaInputDStream为例,其中包含了checkpointData

protected[streaming] overrideval checkpointData =  new DirectKafkaInputDStreamCheckpointData

其中只是包含:

class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {  def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {    data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]  }

就是每个batch 的唯一标识 time 对象,以及每个KafkaRDD对应的的Kafka偏移信息。

所以:

checkpoint 是非常高效的。没有涉及到实际数据的存储。一般大小只有几十K,因为只存了Kafka的偏移量等信息。

checkpoint 采用的是序列化机制,尤其是DStreamGraph的引入,里面包含了可能如ForeachRDD等,而ForeachRDD里面的函数应该也会被序列化。如果采用了CheckPoint机制,而你的程序包做了做了变更,恢复后可能会有一定的问题。



二、关于数据重复处理涉及两个方面:

  1. 数据被重复读取:在使用Kafka的情况下,Receiver收到数据且保存到了HDFS等持久化引擎但是没有来得及进行updateOffsets,此时Receiver崩溃后重新启动就会通过管理Kafka的ZooKeeper中元数据再次重复读取数据,但是此时SparkStreaming认为是成功的,但是Kafka认为是失败的(因为没有更新offset到ZooKeeper中),此时就会导致数据重新消费的情况。

  2. 数据输出多次重写

    为什么会有这个问题,因为Spark Streaming在计算的时候基于Spark Core,Spark Core天生会做以下事情导致Spark Streaming的部分结果重复输出(例如数据输出后,该Task的后续程序发生错误,而任务发生错误,Spark Core会进入如下程序):

    Task重试;慢任务推测(两个相同任务可能会同时执行),Stage重复;Job重试;

具体解决方案:

设置spark.task.maxFailures次数为1;

设置spark.speculation为关闭状态(因为慢任务推测其实非常消耗性能,所以关闭后可以显著提高Spark Streaming处理性能)

Spark Streaming on Kafka的话,Job失败后可以设置auto.offset.reset为"largest"的方式;


Exactly Once的事务处理必须满足:

  1. Receiver数据零丢失:必须有可靠的数据来源和可靠的Receiver,且通过WAL来保证数据安全。

  2. 整个应用程序的metadata必须进行checkpoint;


最后再次强调可以通过transform和foreachRDD基于业务逻辑代码进行逻辑控制来实现数据不重复消费和输出不重复!这两个方式类似于Spark Streaming的后门,可以做任意想象的控制操作!


备注:

1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains




数据 处理 程序 事务 情况 时候 任务 信息 性能 磁盘 级别 存储 消费 输出 一致 两个 内存 文件 方式 方案 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全d模块 redis读取数据库数据并缓存 五华区软件开发价格信息 无线传感器节点网络技术 网络安全法手抄报图片 唯一艺术服务器内部错误 网络安全日志管理服务器 网络技术员面试注意什么 成人大专计算机网络技术专业 茂名通信软件开发市价 数据库基本表实验报告总结 数据库范式主码是什么 海曙手机软件开发企业 计算机网络技术工程师中级证 松江区创新数据库服务内容 全球网络安全公司有哪些 北京芒果城宽网络技术有限公司 常州数据库审计服务 汉中智微网络技术有限公司 重复数据库损坏怎么修复 组织参观网络安全教育基地 项目管理 数据库表单 徐汇区进口网络技术卖价 网络安全机构包括访问控制机构 数据库表的分类 软件开发委外开发合同备案 服务器手机端口 无法与连接服务器安全连接 鄞州应用软件开发系统 山东时代网络技术五星服务
0