(版本定制)第10课:Spark Streaming源码解读
发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,本期内容:1、数据接收架构设计模式2、数据接收源码彻底研究1、Receiver接受数据的过程类似于MVC模式:Receiver,ReceiverSupervisor和Driver的关系相当于Model
千家信息网最后更新 2025年02月04日(版本定制)第10课:Spark Streaming源码解读
本期内容:
1、数据接收架构设计模式
2、数据接收源码彻底研究
1、Receiver接受数据的过程类似于MVC模式:
Receiver,ReceiverSupervisor和Driver的关系相当于Model,Control,View,也就是MVC。
Model就是Receiver,存储数据Control,就是ReceiverSupervisor,Driver是获得元数据,也就是View。
2、数据的位置信息会被封装到RDD里面。
3、Receiver接受数据,交给ReceiverSupervisor去存储数据。
4、ReceiverTracker是通过发送一个又一个的Job,每个Job只有一个Task,每个Task里面就只有一个ReceiverSupervisor,用这个函数启动每一个Receiver。
下面我们简单的看下Receiver启动流程,应用程序首先通过JobScheduler的start方法来启动receiverTracker的start方法:
def start(): Unit = synchronized {if (eventLoop != null) return // scheduler has already been startedlogDebug("Starting JobScheduler")eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) }eventLoop.start()// attach rate controllers of input streams to receive batch completion updatesfor { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController} ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker = new ReceiverTracker(ssc)inputInfoTracker = new InputInfoTracker(ssc)receiverTracker.start() //receiver启动jobGenerator.start() logInfo("Started JobScheduler")}
通过调用receiverTracker.start()方法来进行一系列的操作:
/** Start the endpoint and receiver execution thread. */def start(): Unit = synchronized {if (isTrackerStarted) {throw new SparkException("ReceiverTracker already started") }if (!receiverInputStreams.isEmpty) {endpoint = ssc.env.rpcEnv.setupEndpoint("ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //Rpc消息通信,获取receiver的状态if (!skipReceiverLaunch) launchReceivers() //启动receiver logInfo("ReceiverTracker started")trackerState = Started}}
下面通过画图简单的描述下Receiver启动的内部机制:
参考博客:http://blog.csdn.net/hanburgud/article/details/51471047
http://lqding.blog.51cto.com/9123978/1774426
数据
方法
也就是
只有
就是
模式
存储
源码
位置
信息
内容
函数
博客
应用程序
机制
架构
流程
消息
状态
程序
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发需要下载什么条件
郑州工商网络安全等级测评项目
网络安全手抄报怎么画里面的警察
创造与魔法服务器最高等级
宏名网络技术有限公司怎么样
网络运营者常见的网络安全威胁有
烟台市乾晟网络技术有限公司
国家一级协会网络安全证书
无现金社会网络安全
蓝盾网络技术是啥意思
天然药物数据库
腾讯服务器的质量怎么样
2021年春季高考网络技术
合肥电动汽车热管理软件开发公司
我的世界服务器需要先干嘛
网络安全遵循什么方针
哪个不是网络安全建设的驱动力
华为企业网络技术员待遇
网络技术讲师 招聘
数据库连接字符串加密java
udp 服务器 客户端
天河网络安全运维费用
数据库 x轴的类型
中国电信的软件开发都做什么
数据库random
无线局域网网络技术有哪些
怎么写专利申请 网络技术
note3 无服务器
性价比好的即时通讯软件开发
宁夏视觉系统软件开发