千家信息网

Yarn中如何实现ScheduleBackend

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,这篇文章将为大家详细讲解有关Yarn中如何实现ScheduleBackend,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。Yarn方式下的Schedu
千家信息网最后更新 2025年02月05日Yarn中如何实现ScheduleBackend

这篇文章将为大家详细讲解有关Yarn中如何实现ScheduleBackend,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

Yarn方式下的ScheduleBackend是用的啥?

在SparkContext中创建ScheduleBackend时,会根据指定的"master"参数的前缀决定创建哪种ScheduleBackend,对于"yarn://host:port"这样的URL来说,如果是cluster模式,就是创建YarnClusterSchedulerBackend,如果是client模式,就是创建YarnClientSchedulerBackend。

我们还是先看看YarnClusterSchedulerBackend的代码结构把。

YarnClusterSchedulerBackend继承了YarnSchedulerBackend,没有太多的发挥代码,我们直接看YarnSchedulerBackend把。估计client模式下也差不多。

YarnSchedulerBackend又继承了CoarseGrainedSchedulerBackend,我们看看不同点在哪里。

覆写了doRequestTotalExecutors和doKillExecutors方法,一个申请Executor,一个杀死Executor。

override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {    yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))  }    override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {    yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds))  }

yarnSchedulerEndpointRef就是同一个文件里的endpoint端,看看具体的执行代码是什么:

      case r: RequestExecutors =>        amEndpoint match {          case Some(am) =>            am.ask[Boolean](r).andThen {              case Success(b) => context.reply(b)              case Failure(NonFatal(e)) =>                logError(s"Sending $r to AM was unsuccessful", e)                context.sendFailure(e)            }(ThreadUtils.sameThread)                 }      case k: KillExecutors =>        amEndpoint match {          case Some(am) =>            am.ask[Boolean](k).andThen {              case Success(b) => context.reply(b)              case Failure(NonFatal(e)) =>                logError(s"Sending $k to AM was unsuccessful", e)                context.sendFailure(e)            }(ThreadUtils.sameThread)                  }

我们看到它又将消息转给了amEndpoint,就是转给了yarn工程里的ApplicationManager。又要跳到ApplicationManager去看看里面的实现逻辑了,真是一波三折啊。

ApplicationManager里是怎么处理RequestExecutors和KillExecutors两个消息的呢?

      case r: RequestExecutors =>        Option(allocator) match {          case Some(a) =>            if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,              r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {              resetAllocatorInterval()            }            context.reply(true)        }      case KillExecutors(executorIds) =>        Option(allocator) match {          case Some(a) => executorIds.foreach(a.killExecutor)        }        context.reply(true)

调用allocator的killExecutor和requestTotalExecutorsWithPreferredLocalities方法。allocator又是啥?这里是不是类有的太多了啊。。

allocator = client.createAllocator(      yarnConf,      _sparkConf,      appAttemptId,      driverUrl,      driverRef,      securityMgr,      localResources)

是client的createAllocator方法创建出来的,client是啥?是YarnRMClient,我们就要先看看YarnRMClient了,看名字就大概能猜到,YarnRMClient就是来向Yarn机器申请Executor和杀死Executor的。

createAllocator方法返回下面的YarnAllocator:

return new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, appAttemptId, securityMgr,
localResources, SparkRackResolver.get(conf))

来到YarnAllocator。

YarnAllocator的killExecutor方法很好理解,就是释放Yarn中的Container:

 def killExecutor(executorId: String): Unit = synchronized {    executorIdToContainer.get(executorId) match {      case Some(container) if !releasedContainers.contains(container.getId) =>        internalReleaseContainer(container)        runningExecutors.remove(executorId)      case _ => logWarning(s"Attempted to kill unknown executor $executorId!")    }  }

申请Executor其实最终是在runAllocatedContainers方法中实现的。

核心代码看一下把,完整的可以看源码:

    if (runningExecutors.size() < targetNumExecutors) {        numExecutorsStarting.incrementAndGet()        if (launchContainers) {          launcherPool.execute(() => {            try {              new ExecutorRunnable(                Some(container),                conf,                sparkConf,                driverUrl,                executorId,                executorHostname,                executorMemory,                executorCores,                appAttemptId.getApplicationId.toString,                securityMgr,                localResources              ).run()              updateInternalState()            } catch {                          }          })        }

申请targetNumExecutors个ExecutorRunner,这样就和Standalone的申请Executor对应起来了。好了,整个过程就是这样了。

最终就会在Yarn集群中申请了所需数目的Container,并且在Container中启动ExecutorRunner,来向Driver汇报成绩。

这里的ExecutorRunner就是YarnCoarseGrainedExecutorBackend线程,在ExecutorRunner类中可以看到。

关于Yarn中如何实现ScheduleBackend就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

就是 方法 代码 模式 内容 文章 更多 消息 知识 篇文章 不同 不错 一波三折 差不多 不同点 两个 前缀 参数 又是 名字 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 哪些数据库不包含期刊论文全文 华为手机安全定位服务器 雷昶卓景网络技术 达梦数据库公司上市 广西网络安全特训营白盒测试 富凌服务器 电子类软件开发专业 网络安全因素有哪4种 计算机网络技术大专招 如何关闭手机网络安全检测 崩坏三两个服务器可以在一起玩吗 小米手机软件开发者 玉树州网络安全活动 无线电视网络安全性选哪个 sqoop自动创建数据库 idea数据库生成对象 二道区网络技术咨询诚信合作 怎么设定代理服务器 数据库查找固定车牌指令 与服务器建立连接时出来问题 网络安全面对的危险有哪几类 MR260服务器管理网口 网易邮箱服务器配置怎么填 当前软件开发用的是哪种语言 火柴盒软件开发服务工作室 哪个公司的软件开发公司电话 核酸检测系统数据库在哪里 python快速读取数据库 java直连数据库 电脑加载管理器服务器
0