千家信息网

SPARK的MAster资源调度原理(源码)分析

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,SPARK的MAster资源分配算法(SPARK1.3)master资调度通过源码中的 org.apache.spark.deploy.master包下的schedule()方法实现步骤如下:首先判断
千家信息网最后更新 2025年02月02日SPARK的MAster资源调度原理(源码)分析

SPARK的MAster资源分配算法(SPARK1.3)

master资调度通过源码中的 org.apache.spark.deploy.master包下的schedule()方法实现

步骤如下:

  1. 首先判断master是否是alive状态,如果不是alive则返回,也就是只有活动的master才会进行资源调度,standby master是不会进行资源调度的

  2. 把之前注册的worker中的alive状态的worker传入 Random.shuffer方法,该方法主要是把worker顺序打乱,返回一个数组

  3. 获取返回的worker的数量

  4. 用for循环进行driver调度,只有启用yarn-cluster模式提交application才会进行driver调度,因为yarn-client和 standalone模式都是在提交的客户端启动driver,不需要调度

  5. for循环遍历WaittingDrivers ArrayBuffer,里面用while循环判断如果有alive的worker没有遍历,并且driver为为启动状态,则继续遍历

  6. 如果这个worker的内存>=driver需要的内存并且CPU>=driver需要的CPU,则启动driver,将driver从WaittingDrivers队列中移除

  7. 启动driver的方法为launchDriver,将driver加入worker的内部缓存,将worker剩余的内存、CPU减去driver需要的内存、CPU,worker也被加入到driver缓存结构中,然后调用worker的actor方法,给worker发送LaunchDriver消息,让它把driver启动起来,然后将driver状态改为RUNNING

  8. driver启动后,进行application的调度,这里有两个算法,spreadOutApps和非spreadOutApps算法,这个在代码的SparkConf里可以设置, ("spark.deploy.spreadOut", true),默认是为true,启用spreadoutApps

  9. for遍历WaittingApps中的application,并且用if守卫过滤出还需要进行CPU分配的application,for循环里再次过滤状态为alive并且可以被application使用的worker,然后按照其剩余的CPU数量倒序排序(可以被application使用的worker必须是可用内存大于等于application最小executor需要的需要的内存,并且没有被application启用过)

  10. 把需要分配的application数量放入一个数组,然后获取最终需要分配的CPU数量=application需要分配的CPU和worker总CPU的最小值

  11. while遍历worker,如果worker还有可分配的CPU,将总的需要分配的CPU-1,给这个worker分配的CPU+1,指针移到下一个CPU。循环一直到CPU分配完,这种分配算法的结果是application的CPU尽可能的平均分配到了各个worker上,应用程序尽可能多的运行在所有的Node上

  12. 给worker分配完CPU后,遍历分配到CPU的worker,在每个application内部缓存结构中,添加executor,创建executorDSC对象,其中封装了给这个executor分配多少 CPU core,然后在worker上启动executor,将application状态改为RUNNING

  13. 如果是非spreadOutApps算法,刚好相反,先把每个worker的CPU全部分配完,在分配下一个worker的CPU,

    以下是核心源码:


private def schedule() {

/*

* 首先判断 master是否是alive状态

*/

if (state != RecoveryState.ALIVE) { return }


// First schedule drivers, they take strict precedence over applications

// Randomization helps balance drivers

//把alive状态的worker(之前注册的)传入Random.shuffle方法,把worker随机打乱

val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))

//获取当前可用worker的数量(随机打乱后)

val numWorkersAlive = shuffledAliveWorkers.size

var curPos = 0

/*

* driver的调度机制,遍历waitingDrivers这个ArrayBuffer

* 只有用 yarn-cluster模式提交的时候,才会注册driver,并导致driver被调度,因为standalone和yarn-client模式

* 都会在本地启动driver,而不会注册,更不会调度

*/

for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers

// We assign workers to each waiting driver in a round-robin fashion. For each driver, we

// start from the last worker that was assigned a driver, and continue onwards until we have

// explored all alive workers.

var launched = false

var numWorkersVisited = 0

//当还有alive的worker没有遍历,并且driver没有启动,则继续遍历worker

while (numWorkersVisited < numWorkersAlive && !launched) {

val worker = shuffledAliveWorkers(curPos)

numWorkersVisited += 1

//如果这个worker空闲内存>=driver需要的内存并且worker的空闲CPU>=driver需要的CPU

if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {

//启动driver

launchDriver(worker, driver)

//并且经driver从waitingDrivers队列中移除

waitingDrivers -= driver

launched = true

}

//指针指向下一个worker

curPos = (curPos + 1) % numWorkersAlive

}

}


// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app

// in the queue, then the second app, etc.

/*

* application的调度机制

* 这里两种算法,可以在sparkconf设置,默认为true(spreadOutApps算法)

*/

if (spreadOutApps) {

// Try to spread out each app among all the nodes, until it has all its cores

//遍历waitingApps 中的application,并且用if守卫过滤出还需要进行CPU分配的application

for (app <- waitingApps if app.coresLeft > 0) {

//再次过滤状态为alive并且可以被application使用的worker,然后按照其剩余的CPU数量倒序排序

//可以被application使用的worker必须是可用内存大于application最小Executor需要的内存,并且没有被该application启用过

val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)

.filter(canUse(app, _)).sortBy(_.coresFree).reverse

//创建一个数组,存储需要分配的CPU数量

val numUsable = usableWorkers.length

val assigned = new Array[Int](numUsable) // Number of cores to give on each node

//获取到底需要分配多少CPU,取application需要分配的CPU和worker总共CPU数量的最小值

var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

var pos = 0

while (toAssign > 0) { }

// Now that we've decided how many cores to give on each node, let's actually give them

//给worker分配完CPU后,遍历worker

for (pos <- 0 until numUsable) {

//只要worker分配到了CPU

if (assigned(pos) > 0) {

//首先在每个application内部缓存结构中,添加executor,

//创建executorDSC对象,其中封装了给这个executor分配多少 CPU core

val exec = app.addExecutor(usableWorkers(pos), assigned(pos))

//那么就在worker上启动Executor

launchExecutor(usableWorkers(pos), exec)

//将application状态设置为RUNNING

app.state = ApplicationState.RUNNING

}

}

}

} else {

// Pack each app into as few nodes as possible until we've assigned all its cores

for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {

for (app <- waitingApps if app.coresLeft > 0) {

if (canUse(app, worker)) {

val coresToUse = math.min(worker.coresFree, app.coresLeft)

if (coresToUse > 0) {

val exec = app.addExecutor(worker, coresToUse)

launchExecutor(worker, exec)

app.state = ApplicationState.RUNNING

}

}

}

}

}

}


//executor的启动

def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {

logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)

//将executor加入worker内部缓存

worker.addExecutor(exec)

//向worker发送LaunchExecutor消息

worker.actor ! LaunchExecutor(masterUrl,

exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)

//向executor对应的application发送ExecutorAdded消息

exec.application.driver ! ExecutorAdded(

exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)

}


//Driver的启动

def launchDriver(worker: WorkerInfo, driver: DriverInfo) {

logInfo("Launching driver " + driver.id + " on worker " + worker.id)

//将driver加入到worker的内部缓存结构

//将worker剩余的内存、CPU减去driver使用的内存和CPU

worker.addDriver(driver)

//worker也被加入到driver内部缓存结构中

driver.worker = Some(worker)

//然后调用worker的actor方法,给worker发送LaunchDriver消息,让它把driver启动起来

worker.actor ! LaunchDriver(driver.id, driver.desc)

//将driver状态改为RUNNING

driver.state = DriverState.RUNNING

}






0