如何进行DAGScheduler源码解读
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,如何进行DAGScheduler源码解读,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。当构建完TaskScheduler之后,我们需
千家信息网最后更新 2025年01月24日如何进行DAGScheduler源码解读
如何进行DAGScheduler源码解读,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
当构建完TaskScheduler之后,我们需要构建DAGScheduler这个核心对象:
进入其构造函数中:
可以看出构建DAGScheduler实例的时候需要把TaskScheduler实例对象作为参数传入。
LiveListenerBus:
BlockManagerMaster:
通过阅读代码,我们可以发现DAGScheduler实例化的时候,调用了initializeEventProcessActor()方法
private def initializeEventProcessActor() { // blocking the thread until supervisor is started, which ensures eventProcessActor is // not null before any job is submitted // 阻塞当前线程,等待supervisor启动,这样可以确保Job提交时,eventProcessActor not null implicit val timeout = Timeout(30 seconds) val initEventActorReply = dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) eventProcessActor = Await.result(initEventActorReply, timeout.duration). asInstanceOf[ActorRef]}initializeEventProcessActor()
DAGSchedulerEventProcessActor:
private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) extends Actor with Logging { override def preStart() { // set DAGScheduler for taskScheduler to ensure eventProcessActor is always // valid when the messages arrive // 设置taskScheduler对DAGScheduler的引用句柄。在此处设置保证了Job提交时候 // eventProcessActor已经准备就绪 dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) } /** * The main event loop of the DAG scheduler. */ def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId, fetchFailed = false) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => dagScheduler.handleTaskSetFailed(taskSet, reason) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() } override def postStop() { // Cancel any active jobs in postStop hook dagScheduler.cleanUpAfterSchedulerStop() }}
可以看出核心在于实例化eventProcessActor对象,eventProcessActor会负责接收和发送DAGScheduler的消息,是DAGScheduler的通信载体。
关于如何进行DAGScheduler源码解读问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。
实例
对象
时候
问题
源码
方法
更多
核心
帮助
解答
易行
简单易行
代码
内容
函数
参数
句柄
小伙
小伙伴
消息
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
外文全文数据库有哪些
excel数据库如何查询
战术小队去哪个服务器玩
服务器12800r内存
山东软件开发解决方案推荐
代理服务器代码分享
查找专业图书用哪个数据库
北京移搜网络技术有限公司
dns服务器 全球
朝花夕拾思维导图软件开发
dicex2软件开发者
java创建数据库
602网络安全教育班队
网络安全政策的应对措施
数据库原理oracle
查看全部数据库代码
铁路网络安全智能威胁
计算机网络技术班旗设计
数据库物品名字是乱码
软件开发岗笔试题库
易语言利用网页做激活码服务器
cctv网络安全平均工资
django多个数据库
注册信息是怎么计入数据库的
周村工资管理hr软件开发
成都网络技术培训公司招聘
三明公安局网络安全保卫支队郑
主播的服务器还遭管理吗
车型数据库
全国网络安全教育征文