千家信息网

如何进行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,今天就跟大家聊聊有关如何进行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析,可能很多人都不太了解,为了让大家更加了解,小编给
千家信息网最后更新 2025年02月03日如何进行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析

今天就跟大家聊聊有关如何进行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

CoarseGrainedSchedulerBackend是Driver端用到的,CoarseGrainedExecutorBackend是Executor端用到的。他们都是Backend,什么是Backend?Backend其实就是负责端到端通信的,这两个CoarseGrained的Backend是负责Driver和Executor之间的通信的。

什么是Driver呢?

Driver就是我们编写的spark代码,里面的main函数就是Driver跑的代码。

什么是Executor呢?

Executor就是执行spark的Task任务的地方,Backend接收到Driver的LaunchTask消息后,调用Executor类的launchTask方法来执行任务。

Driver会启动CoarseGrainedSchedulerBackend,通过CoarseGrainedSchedulerBackend来向集群申请机器以便启动Executor,会找到一台机器,发送命令让机器启动一个ExecutorRunner,ExecutorRunner里启动CoarseGrainedExecutorBackend向Driver注册,并创建Executor来处理CoarseGrainedExecutorBackend接收到的请求。刚刚说的是Standalone部署下的流程,Yarn下大部分类似,只有向集群申请机器来启动Executor这一步不太一样,这个简单说一下吧。

Yarn环境下,是通过spark-yarn工程里的几个类一级yarn本身的功能来一起完成机器的部署和分区任务的分发。

spark-yarn包含两个文件:client.java和ApplicationMaster.java。

client.java功能是向yarn申请资源来执行ApplicationMaster.java的代码,所以这里主要看下ApplicationMaster.java的代码功能是什么。

ApplicationMaster首先干两件事,启动一个"/bin/mesos-master"和多个"/bin/mesos-slave",这都是向yarn申请资源然后部署上去执行的,都是yarn的功能部分,"/bin/mesos-master"和"/bin/mesos-slave"是yarn环境里自带的两个bin程序,可以看成是类似Standalone环境下的Master和Worker。

launchContainer方法是启动yarn的container,也就是前面说的在container上启动"/bin/mesos-slave",mesos-slave会向mesos-master注册的。等需要的slave节点资源全部申请启动完成后,调用startApplication()方法开始执行Driver。

startApplication()方法:

// Start the user's application  private void startApplication() throws IOException {    try {      String sparkClasspath = getSparkClasspath();      String jobJar = new File("job.jar").getAbsolutePath();      String javaArgs = "-Xms" + (masterMem - 128) + "m -Xmx" + (masterMem - 128) + "m";      javaArgs += " -Djava.library.path=" + mesosHome + "/lib/java";      String substitutedArgs = programArgs.replaceAll("\\[MASTER\\]", masterUrl);      if (mainClass.equals("")) {        javaArgs += " -cp " + sparkClasspath + " -jar " + jobJar + " " + substitutedArgs;       } else {        javaArgs += " -cp " + sparkClasspath + ":" + jobJar + " " + mainClass + " " + substitutedArgs;      }      String java = "java";      if (System.getenv("JAVA_HOME") != null) {        java = System.getenv("JAVA_HOME") + "/bin/java";      }      String bashCommand = java + " " + javaArgs +          " 1>" + logDirectory + "/application.stdout" +          " 2>" + logDirectory + "/application.stderr";      LOG.info("Command: " + bashCommand);      String[] command = new String[] {"bash", "-c", bashCommand};      String[] env = new String[] {"SPARK_HOME=" + sparkHome, "MASTER=" + masterUrl,           "SPARK_MEM=" + (slaveMem - 128) + "m"};      application = Runtime.getRuntime().exec(command, env);            new Thread("wait for user application") {        public void run() {          try {            appExitCode = application.waitFor();            appExited = true;            LOG.info("User application exited with code " + appExitCode);          } catch (InterruptedException e) {            e.printStackTrace();          }        }      }.start();    } catch (SparkClasspathException e) {            unregister(false);      System.exit(1);      return;    }  }

这就是启动Driver了,masterUrl就是"bin/mesos-master"的地址,设置成了环境变量"MASTER"来用了,yarn下的master的地址格式是"mesos://host:port",Standalone下是"spark://host:port"。

在SparkContext下会根据master地址格式,做不同的处理,这段代码是这样:

 master match {      case "local" =>        checkResourcesPerTask(clusterMode = false, Some(1))        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)        scheduler.initialize(backend)        (backend, scheduler)      case LOCAL_N_REGEX(threads) =>        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.        val threadCount = if (threads == "*") localCpuCount else threads.toInt        if (threadCount <= 0) {          throw new SparkException(s"Asked to run locally with $threadCount threads")        }        checkResourcesPerTask(clusterMode = false, Some(threadCount))        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)        scheduler.initialize(backend)        (backend, scheduler)      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>        def localCpuCount: Int = Runtime.getRuntime.availableProcessors()        // local[*, M] means the number of cores on the computer with M failures        // local[N, M] means exactly N threads with M failures        val threadCount = if (threads == "*") localCpuCount else threads.toInt        checkResourcesPerTask(clusterMode = false, Some(threadCount))        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)        val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)        scheduler.initialize(backend)        (backend, scheduler)      case SPARK_REGEX(sparkUrl) =>        checkResourcesPerTask(clusterMode = true, None)        val scheduler = new TaskSchedulerImpl(sc)        val masterUrls = sparkUrl.split(",").map("spark://" + _)        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)        scheduler.initialize(backend)        (backend, scheduler)      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>        checkResourcesPerTask(clusterMode = true, Some(coresPerSlave.toInt))        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.        val memoryPerSlaveInt = memoryPerSlave.toInt        if (sc.executorMemory > memoryPerSlaveInt) {          throw new SparkException(            "Asked to launch cluster with %d MiB RAM / worker but requested %d MiB/worker".format(              memoryPerSlaveInt, sc.executorMemory))        }        val scheduler = new TaskSchedulerImpl(sc)        val localCluster = new LocalSparkCluster(          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)        val masterUrls = localCluster.start()        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)        scheduler.initialize(backend)        backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {          localCluster.stop()        }        (backend, scheduler)      case masterUrl =>        checkResourcesPerTask(clusterMode = true, None)        val cm = getClusterManager(masterUrl) match {          case Some(clusterMgr) => clusterMgr          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")        }        try {          val scheduler = cm.createTaskScheduler(sc, masterUrl)          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)          cm.initialize(scheduler, backend)          (backend, scheduler)        } catch {          case se: SparkException => throw se          case NonFatal(e) =>            throw new SparkException("External scheduler cannot be instantiated", e)        }    }  }

如果是yarn,会落到最后一个case语句:

   case masterUrl =>        checkResourcesPerTask(clusterMode = true, None)        val cm = getClusterManager(masterUrl) match {          case Some(clusterMgr) => clusterMgr          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")        }        try {          val scheduler = cm.createTaskScheduler(sc, masterUrl)          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)          cm.initialize(scheduler, backend)          (backend, scheduler)        } catch {          case se: SparkException => throw se          case NonFatal(e) =>            throw new SparkException("External scheduler cannot be instantiated", e)        }

这里会用到ClusterManager的类,这又是什么东东呢?spark难就难在这,涉及的概念太多。

 private def getClusterManager(url: String): Option[ExternalClusterManager] = {    val loader = Utils.getContextOrSparkClassLoader    val serviceLoaders =      ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))    if (serviceLoaders.size > 1) {      throw new SparkException(        s"Multiple external cluster managers registered for the url $url: $serviceLoaders")    }    serviceLoaders.headOption  }

找到所有的ExternalClusterManager类及子类,看哪个类的canCreate方法对url返回true,我们这里就是找满足"mesos://host:port"的类。

看完上述内容,你们对如何进行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0