(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析
发表于:2025-01-25 作者:千家信息网编辑
千家信息网最后更新 2025年01月25日,本期内容:1、Spark Streaming资源动态分配2、Spark Streaming动态控制消费速率为什么需要动态?a)Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Stre
千家信息网最后更新 2025年01月25日(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析
本期内容:
1、Spark Streaming资源动态分配
2、Spark Streaming动态控制消费速率
为什么需要动态?
a)Spark默认情况下粗粒度的,先分配好资源再计算。对于Spark Streaming而言有高峰值和低峰值,但是他们需要的资源是不一样的,如果按照高峰值的角度的话,就会有大量的资源浪费。
b) Spark Streaming不断的运行,对资源消耗和管理也是我们要考虑的因素。
Spark Streaming资源动态调整的时候会面临挑战:
Spark Streaming是按照Batch Duration运行的,Batch Duration需要很多资源,下一次Batch Duration就不需要那么多资源了,调整资源的时候还没调整完Batch Duration运行就已经过期了。这个时候调整时间间隔。
Spark Streaming资源动态申请
1. 在SparkContext中默认是不开启动态资源分配的,但是可以通过手动在SparkConf中配置。
// Optionally scale number of executors dynamically based on workload. Exposed for testing.val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) { logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")}_executorAllocationManager =if (dynamicAllocationEnabled) {Some(new ExecutorAllocationManager(this, listenerBus, _conf)) } else { None }_executorAllocationManager.foreach(_.start())
设置spark.dynamicAllocation.enabled参数为true
这里会通过实例化ExecutorAllocationManager对象来动态分配资源,其内部是有定时器会不断的去扫描Executor的情况,通过线程池的方式调用schedule()来完成资源动态分配。
/** * Register for scheduler callbacks to decide when to add and remove executors, and start * the scheduling task. */def start(): Unit = { listenerBus.addListener(listener)val scheduleTask = new Runnable() {override def run(): Unit = {try { schedule() //动态调整Executor分配数量 } catch {case ct: ControlThrowable =>throw ctcase t: Throwable => logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) } } }executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)}
private def schedule(): Unit = synchronized {val now = clock.getTimeMillis updateAndSyncNumExecutorsTarget(now) //更新Executor数量removeTimes.retain { case (executorId, expireTime) =>val expired = now >= expireTimeif (expired) {initializing = falseremoveExecutor(executorId) } !expired }}
/** * Updates our target number of executors and syncs the result with the cluster manager. * * Check to see whether our existing allocation and the requests we've made previously exceed our * current needs. If so, truncate our target and let the cluster manager know so that it can * cancel pending requests that are unneeded. * * If not, and the add time has expired, see if we can request new executors and refresh the add * time. * * @return the delta in the target number of executors. */private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {val maxNeeded = maxNumExecutorsNeededif (initializing) {// Do not change our target while we are still initializing, // Otherwise the first job may have to ramp up unnecessarily0} else if (maxNeeded < numExecutorsTarget) {// The target number exceeds the number we actually need, so stop adding new // executors and inform the cluster manager to cancel the extra pending requestsval oldNumExecutorsTarget = numExecutorsTarget numExecutorsTarget = math.max(maxNeeded, minNumExecutors)numExecutorsToAdd = 1// If the new target has not changed, avoid sending a message to the cluster managerif (numExecutorsTarget < oldNumExecutorsTarget) { client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +s"$oldNumExecutorsTarget) because not all requested executors are actually needed") }numExecutorsTarget - oldNumExecutorsTarget } else if (addTime != NOT_SET && now >= addTime) {val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " +s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")addTime += sustainedSchedulerBacklogTimeoutS * 1000delta } else {0}}
动态控制消费速率:
Spark Streaming提供了一种弹性机制,流进来的速度和处理速度的关系,是否来得及处理数据。如果不能来得及的话,他会自动动态控制数据流进来的速度,spark.streaming.backpressure.enabled参数设置。
资源
动态
分配
调整
控制
时候
速度
运行
速率
消费
不断
参数
峰值
情况
数据
数量
处理
内容
可以通过
因素
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
微信提示服务器未检测到
郴州网络安全公司
广联先锋网络技术 济南
舟山手机游戏软件开发公司
开展节前网络安全检查
南山区正规网络技术开发商家
js与html连接数据库
服务器缓存设置时间有什么用
物联网解决网络安全问题
冷服务器启动
服务器时间在哪里更改
云服务器迁移群晖
ei数据库的文章如何下载
数据库迁移需要写端口
公安大学网络安全保卫专业
企业如何落实网络安全责任
合事达网络技术有限公司
小学国家网络安全宣传模板
vscodehtml数据库
软件开发失效模式分析
数据库token实现幂等方案
电脑的数据库在
最常见的数据库是
网络安全保密怎么保证
数据库信息专员
网络安全基地建设工程项目
做三年级网络安全手抄报
网络安全主要是指哪些方面
服务器机柜怎么买
小学国家网络安全宣传模板