千家信息网

Hadoo是怎么将作业提交给集群的

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要介绍"Hadoo是怎么将作业提交给集群的",在日常操作中,相信很多人在Hadoo是怎么将作业提交给集群的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Had
千家信息网最后更新 2025年01月23日Hadoo是怎么将作业提交给集群的

这篇文章主要介绍"Hadoo是怎么将作业提交给集群的",在日常操作中,相信很多人在Hadoo是怎么将作业提交给集群的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Hadoo是怎么将作业提交给集群的"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

一:MapReduce提交作业过程的流程图

通过图可知主要有三个部分,即: 1) JobClient:作业客户端。 2) JobTracker:作业的跟踪器。 3) TaskTracker:任务的跟踪器。

MapReduce将作业提交给JobClient,然后JobClient与JobTracker交互,JobTracker再去监控与分配TaskTracker,完成具体作业的处理。

以下分析的是Hadoop2.6.4的源码。请注意: 源码与之前Hadoop版本的略有差别,所以有些概念还是与上图有点差别。

二:MapReduce如何提交作业

2.1 完成作业的真正提交,即:

**job.waitForCompletion(true)**

跟踪waitForCompletion, 注意其中的submit(),如下:

/**   * Submit the job to the cluster and wait for it to finish.   */ public boolean waitForCompletion(boolean verbose                                   ) throws IOException, InterruptedException,                                            ClassNotFoundException {    if (state == JobState.DEFINE) {      submit();    }    if (verbose) {      monitorAndPrintJob();    } else {      // get the completion poll interval from the client.      int completionPollIntervalMillis =         Job.getCompletionPollInterval(cluster.getConf());      while (!isComplete()) {        try {          Thread.sleep(completionPollIntervalMillis);        } catch (InterruptedException ie) {        }      }    }    return isSuccessful();  }

参数 verbose ,如果想在控制台打印当前的任务执行进度,则设为true

**

2.2 submit()

** 在submit 方法中会把Job提交给对应的Cluster,然后不等待Job执行结束就立刻返回

同时会把Job实例的状态设置为JobState.RUNNING,从而来表示Job正在进行中

然后在Job运行过程中,可以调用getJobState()来获取Job的运行状态

 /**   * Submit the job to the cluster and return immediately.   */  public void submit()          throws IOException, InterruptedException, ClassNotFoundException {    ensureState(JobState.DEFINE);    setUseNewAPI();    connect();    final JobSubmitter submitter =         getJobSubmitter(cluster.getFileSystem(), cluster.getClient());    status = ugi.doAs(new PrivilegedExceptionAction() {      public JobStatus run() throws IOException, InterruptedException,       ClassNotFoundException {        return submitter.submitJobInternal(Job.this, cluster);      }    });    state = JobState.RUNNING;    LOG.info("The url to track the job: " + getTrackingURL());   }

而在任务提交前,会先通过connect()方法链接集群(Cluster):

private synchronized void connect()          throws IOException, InterruptedException, ClassNotFoundException {    if (cluster == null) {      cluster =         ugi.doAs(new PrivilegedExceptionAction() {                   public Cluster run()                          throws IOException, InterruptedException,                                  ClassNotFoundException {                     return new Cluster(getConfiguration());                   }                 });    }  }

这是一个线程保护方法。这个方法中根据配置信息初始化了一个Cluster对象,即代表集群

public Cluster(Configuration conf) throws IOException {    this(null, conf);  }  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)       throws IOException {    this.conf = conf;    this.ugi = UserGroupInformation.getCurrentUser();    initialize(jobTrackAddr, conf);  }  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)      throws IOException {    synchronized (frameworkLoader) {      for (ClientProtocolProvider provider : frameworkLoader) {        LOG.debug("Trying ClientProtocolProvider : "            + provider.getClass().getName());        ClientProtocol clientProtocol = null;         try {          if (jobTrackAddr == null) {            clientProtocol = provider.create(conf);          } else {            clientProtocol = provider.create(jobTrackAddr, conf);          }          if (clientProtocol != null) {            clientProtocolProvider = provider;            client = clientProtocol;            LOG.debug("Picked " + provider.getClass().getName()                + " as the ClientProtocolProvider");            break;          }          else {            LOG.debug("Cannot pick " + provider.getClass().getName()                + " as the ClientProtocolProvider - returned null protocol");          }        }         catch (Exception e) {          LOG.info("Failed to use " + provider.getClass().getName()              + " due to error: " + e.getMessage());        }      }    }    if (null == clientProtocolProvider || null == client) {      throw new IOException(          "Cannot initialize Cluster. Please check your configuration for "              + MRConfig.FRAMEWORK_NAME              + " and the correspond server addresses.");    }  }

而在上段代码之前,

 private static ServiceLoader frameworkLoader =      ServiceLoader.load(ClientProtocolProvider.class);

可以看出创建客户端代理阶段使用了java.util.ServiceLoader,包含LocalClientProtocolProvider(本地作业)和YarnClientProtocolProvider(yarn作业)(hadoop有一个Yarn参数mapreduce.framework.name用来控制你选择的应用框架。在MRv2里,mapreduce.framework.name有两个值:local和yarn),此处会根据mapreduce.framework.name的配置创建相应的客户端

mapred-site.xml:

            mapreduce.framework.name        yarn    

2.3 实例化Cluster后开始真正的任务提交

submitter.submitJobInternal(Job.this, cluster);
 JobStatus submitJobInternal(Job job, Cluster cluster)   throws ClassNotFoundException, InterruptedException, IOException {    //validate the jobs output specs     checkSpecs(job);    Configuration conf = job.getConfiguration();    addMRFrameworkToDistributedCache(conf);    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);    //configure the command line options correctly on the submitting dfs    InetAddress ip = InetAddress.getLocalHost();    if (ip != null) {      submitHostAddress = ip.getHostAddress();      submitHostName = ip.getHostName();      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);    }    JobID jobId = submitClient.getNewJobID();    job.setJobID(jobId);    Path submitJobDir = new Path(jobStagingArea, jobId.toString());    JobStatus status = null;    try {      conf.set(MRJobConfig.USER_NAME,          UserGroupInformation.getCurrentUser().getShortUserName());      conf.set("hadoop.http.filter.initializers",           "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());      LOG.debug("Configuring job " + jobId + " with " + submitJobDir           + " as the submit dir");      // get delegation token for the dir      TokenCache.obtainTokensForNamenodes(job.getCredentials(),          new Path[] { submitJobDir }, conf);      populateTokenCache(conf, job.getCredentials());      // generate a secret to authenticate shuffle transfers      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {        KeyGenerator keyGen;        try {          int keyLen = CryptoUtils.isShuffleEncrypted(conf)               ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,                   MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)              : SHUFFLE_KEY_LENGTH;          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);          keyGen.init(keyLen);        } catch (NoSuchAlgorithmException e) {          throw new IOException("Error generating shuffle secret key", e);        }        SecretKey shuffleKey = keyGen.generateKey();        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),            job.getCredentials());      }      copyAndConfigureFiles(job, submitJobDir);      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);      // Create the splits for the job      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));      int maps = writeSplits(job, submitJobDir);      conf.setInt(MRJobConfig.NUM_MAPS, maps);      LOG.info("number of splits:" + maps);      // write "queue admins of the queue to which job is being submitted"      // to job file.      String queue = conf.get(MRJobConfig.QUEUE_NAME,          JobConf.DEFAULT_QUEUE_NAME);      AccessControlList acl = submitClient.getQueueAdmins(queue);      conf.set(toFullPropertyName(queue,          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());      // removing jobtoken referrals before copying the jobconf to HDFS      // as the tasks don't need this setting, actually they may break      // because of it if present as the referral will point to a      // different job.      TokenCache.cleanUpTokenReferral(conf);      if (conf.getBoolean(          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {        // Add HDFS tracking ids        ArrayList trackingIds = new ArrayList();        for (Token t :            job.getCredentials().getAllTokens()) {          trackingIds.add(t.decodeIdentifier().getTrackingId());        }        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,            trackingIds.toArray(new String[trackingIds.size()]));      }      // Set reservation info if it exists      ReservationId reservationId = job.getReservationId();      if (reservationId != null) {        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());      }      // Write job file to submit dir      writeConf(conf, submitJobFile);      //      // Now, actually submit the job (using the submit name)      //      printTokens(jobId, job.getCredentials());      status = submitClient.submitJob(          jobId, submitJobDir.toString(), job.getCredentials());      if (status != null) {        return status;      } else {        throw new IOException("Could not launch job");      }    } finally {      if (status == null) {        LOG.info("Cleaning up the staging area " + submitJobDir);        if (jtFs != null && submitJobDir != null)          jtFs.delete(submitJobDir, true);      }    }  }

通过如下代码正式提交Job到Yarn

 status = submitClient.submitJob(          jobId, submitJobDir.toString(), job.getCredentials());

到最后,通过RPC的调用,最终会返回一个JobStatus对象,它的toString方法可以在JobClient端打印运行的相关日志信息。

if (status != null) {        return status;      }
public String toString() {    StringBuffer buffer = new StringBuffer();    buffer.append("job-id : " + jobid);    buffer.append("uber-mode : " + isUber);    buffer.append("map-progress : " + mapProgress);    buffer.append("reduce-progress : " + reduceProgress);    buffer.append("cleanup-progress : " + cleanupProgress);    buffer.append("setup-progress : " + setupProgress);    buffer.append("runstate : " + runState);    buffer.append("start-time : " + startTime);    buffer.append("user-name : " + user);    buffer.append("priority : " + priority);    buffer.append("scheduling-info : " + schedulingInfo);    buffer.append("num-used-slots" + numUsedSlots);    buffer.append("num-reserved-slots" + numReservedSlots);    buffer.append("used-mem" + usedMem);    buffer.append("reserved-mem" + reservedMem);    buffer.append("needed-mem" + neededMem);    return buffer.toString();  }

(到这里任务都给yarn了,这里就只剩下监控(如果设置为true的话)),即:

    if (verbose) {        monitorAndPrintJob();    }

这只是完成了作业Job的提交。

到此,关于"Hadoo是怎么将作业提交给集群的"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0