千家信息网

(版本定制)第10课:Spark Streaming源码解读

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,本期内容:1、数据接收架构设计模式2、数据接收源码彻底研究1、Receiver接受数据的过程类似于MVC模式:Receiver,ReceiverSupervisor和Driver的关系相当于Model
千家信息网最后更新 2025年02月04日(版本定制)第10课:Spark Streaming源码解读

本期内容:

1、数据接收架构设计模式

2、数据接收源码彻底研究


1、Receiver接受数据的过程类似于MVC模式:

Receiver,ReceiverSupervisor和Driver的关系相当于Model,Control,View,也就是MVC。

Model就是Receiver,存储数据Control,就是ReceiverSupervisor,Driver是获得元数据,也就是View。

2、数据的位置信息会被封装到RDD里面。

3、Receiver接受数据,交给ReceiverSupervisor去存储数据。

4、ReceiverTracker是通过发送一个又一个的Job,每个Job只有一个Task,每个Task里面就只有一个ReceiverSupervisor,用这个函数启动每一个Receiver。


下面我们简单的看下Receiver启动流程,应用程序首先通过JobScheduler的start方法来启动receiverTracker的start方法:

def start(): Unit = synchronized {if (eventLoop != null) return // scheduler has already been startedlogDebug("Starting JobScheduler")eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)  }eventLoop.start()// attach rate controllers of input streams to receive batch completion updatesfor {    inputDStream <- ssc.graph.getInputStreams    rateController <- inputDStream.rateController} ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker = new ReceiverTracker(ssc)inputInfoTracker = new InputInfoTracker(ssc)receiverTracker.start() //receiver启动jobGenerator.start()  logInfo("Started JobScheduler")}

通过调用receiverTracker.start()方法来进行一系列的操作:

/** Start the endpoint and receiver execution thread. */def start(): Unit = synchronized {if (isTrackerStarted) {throw new SparkException("ReceiverTracker already started")  }if (!receiverInputStreams.isEmpty) {endpoint = ssc.env.rpcEnv.setupEndpoint("ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //Rpc消息通信,获取receiver的状态if (!skipReceiverLaunch) launchReceivers() //启动receiver    logInfo("ReceiverTracker started")trackerState = Started}}

下面通过画图简单的描述下Receiver启动的内部机制:


参考博客:http://blog.csdn.net/hanburgud/article/details/51471047

http://lqding.blog.51cto.com/9123978/1774426

数据 方法 也就是 只有 就是 模式 存储 源码 位置 信息 内容 函数 博客 应用程序 机制 架构 流程 消息 状态 程序 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 软件开发需要下载什么条件 郑州工商网络安全等级测评项目 网络安全手抄报怎么画里面的警察 创造与魔法服务器最高等级 宏名网络技术有限公司怎么样 网络运营者常见的网络安全威胁有 烟台市乾晟网络技术有限公司 国家一级协会网络安全证书 无现金社会网络安全 蓝盾网络技术是啥意思 天然药物数据库 腾讯服务器的质量怎么样 2021年春季高考网络技术 合肥电动汽车热管理软件开发公司 我的世界服务器需要先干嘛 网络安全遵循什么方针 哪个不是网络安全建设的驱动力 华为企业网络技术员待遇 网络技术讲师 招聘 数据库连接字符串加密java udp 服务器 客户端 天河网络安全运维费用 数据库 x轴的类型 中国电信的软件开发都做什么 数据库random 无线局域网网络技术有哪些 怎么写专利申请 网络技术 note3 无服务器 性价比好的即时通讯软件开发 宁夏视觉系统软件开发
0