(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析
发表于:2024-11-30 作者:千家信息网编辑
千家信息网最后更新 2024年11月30日,本期内容:1、Spark Streaming资源动态分配2、Spark Streaming动态控制消费速率为什么需要动态?a)Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Stre
千家信息网最后更新 2024年11月30日(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析
本期内容:
1、Spark Streaming资源动态分配
2、Spark Streaming动态控制消费速率
为什么需要动态?
a)Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Streaming而言有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费。
b) Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素。
Spark Streaming资源动态调整的时候会面临挑战:
Spark Streaming是按照Batch Duration运行的,Batch Duration需要很多资源,下一次Batch Duration就不需要那么多资源了,调整资源的时候还没调整完Batch Duration运行就已经过期了。这个时候调整时间间隔。
Spark Streaming资源动态申请
1. 在SparkContext中默认是不开启动态资源分配的,但是可以通过手动在SparkConf中配置。
// Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) { logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")}_executorAllocationManager =if (dynamicAllocationEnabled) {Some(new ExecutorAllocationManager(this, listenerBus, _conf)) } else { None }_executorAllocationManager.foreach(_.start())
设置spark.dynamicAllocation.enabled参数为true
这里会通过实例化ExecutorAllocationManager对象来动态分配资源,其内部是有定时器会不断的去扫描Executor的情况,通过线程池的方式调用schedule()来完成资源动态分配。
/** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */def start(): Unit = { listenerBus.addListener(listener)val scheduleTask = new Runnable() {override def run(): Unit = {try { schedule() //动态调整Executor分配数量 } catch {case ct: ControlThrowable =>throw ctcase t: Throwable => logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) } } }executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)}
private def schedule(): Unit = synchronized {val now = clock.getTimeMillis updateAndSyncNumExecutorsTarget(now) //更新Executor数量removeTimes.retain { case (executorId, expireTime) =>val expired = now >= expireTimeif (expired) {initializing = falseremoveExecutor(executorId) } !expired }}
/** * Updates our target number of executors and syncs the result with the cluster manager. * * Check to see whether our existing allocation and the requests we've made previously exceed our * current needs. If so, truncate our target and let the cluster manager know so that it can * cancel pending requests that are unneeded. * * If not, and the add time has expired, see if we can request new executors and refresh the add * time. * * @return the delta in the target number of executors. */private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {val maxNeeded = maxNumExecutorsNeededif (initializing) {// Do not change our target while we are still initializing, // Otherwise the first job may have to ramp up unnecessarily0} else if (maxNeeded < numExecutorsTarget) {// The target number exceeds the number we actually need, so stop adding new // executors and inform the cluster manager to cancel the extra pending requestsval oldNumExecutorsTarget = numExecutorsTarget numExecutorsTarget = math.max(maxNeeded, minNumExecutors)numExecutorsToAdd = 1// If the new target has not changed, avoid sending a message to the cluster managerif (numExecutorsTarget < oldNumExecutorsTarget) { client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +s"$oldNumExecutorsTarget) because not all requested executors are actually needed") }numExecutorsTarget - oldNumExecutorsTarget } else if (addTime != NOT_SET && now >= addTime) {val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " +s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")addTime += sustainedSchedulerBacklogTimeoutS * 1000delta } else {0}}
动态控制消费速率:
Spark Streaming提供了一种弹性机制,流进来的速度和处理速度的关系,是否来得及处理数据。如果不能来得及的话,他会自动动态控制数据流进来的速度,spark.streaming.backpressure.enabled参数设置。
资源
动态
分配
调整
控制
时候
速度
运行
速率
消费
不断
参数
峰值
情况
数据
数量
处理
内容
可以通过
因素
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库新建一个库
初级软件开发培训学校
数据库flage什么意思
科技创新大赛互联网
广西icp许可证本地服务器
有网络软件显示无法连接服务器
计算机信息网络技术基础课程
keil5软件开发平台
服务器凭证管理员默认密码
数据库 多项
服务器域限制
软件开发c和c哪个好
什么软件开发培训机构好
中学生怎样做到网络安全
网络安全教育主题班会简介
EI数据库叙词检索可以提高
云服务器基本防护
国内 dns服务器
天津潮流软件开发过程
关于学习数据库java的建议
局域网和dns服务器
黄冈邮储银行网络安全宣传
一个网页服务器运算有多快
如何开展服务器可靠性设计
全区网络安全普查
吴碧刚软件开发
软件开发笔试怎么看
mc如何用手机开服务器
cs怎么改成国服服务器
腾讯软件开发技术待遇