千家信息网

八、MapReduce--job提交源码分析

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,一、源码分析1、提交job的入口通过 job.waitForCompletion(true)完成job的提交以及运行,下面从这个方法入手分析源码。//-----------------job.java
千家信息网最后更新 2025年01月23日八、MapReduce--job提交源码分析

一、源码分析

1、提交job的入口

通过 job.waitForCompletion(true)完成job的提交以及运行,下面从这个方法入手分析源码。

//-----------------job.javapublic boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {    //如果job的状态为未运行,则提交任务    if (this.state == Job.JobState.DEFINE) {        this.submit();    }    if (verbose) {        //监控并打印运行信息        this.monitorAndPrintJob();    } else {        int completionPollIntervalMillis = getCompletionPollInterval(this.cluster.getConf());        while(!this.isComplete()) {            try {                Thread.sleep((long)completionPollIntervalMillis);            } catch (InterruptedException var4) {            }        }    }    return this.isSuccessful();}

2、this.submit() 提交job

//-----------------job.javapublic void submit() throws IOException, InterruptedException, ClassNotFoundException {    //确定job状态为未运行    this.ensureState(Job.JobState.DEFINE);    //使用新api    this.setUseNewAPI();    //主要就是初始化cluster对象中的client,用于和集群连接通信。分为yarn client和local client    this.connect();    //通过cluster对象获取job提交器,将存储job信息的文件系统以及client作为参数    final JobSubmitter submitter = this.getJobSubmitter(this.cluster.getFileSystem(), this.cluster.getClient());    //提交job,并运行    this.status = (JobStatus)this.ugi.doAs(new PrivilegedExceptionAction() {        public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {            //这里是提交job,运行,返回状态            return submitter.submitJobInternal(Job.this, Job.this.cluster);        }    });    this.state = Job.JobState.RUNNING;    LOG.info("The url to track the job: " + this.getTrackingURL());}

上面这里涉及到三个重要过程方法:
this.connect() 主要初始化了提交job的client
this.getJobSubmitter() 给job封装了很多api
submitter.submitJobInternal(Job.this, Job.this.cluster) 提交job,并运行
下面详细看看这三个方法具体做了啥

3、this.connect()初始化client

//-----------------job.javaprivate synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException {    //创建cluster连接对象,用于连接集群,提供了很多api    if (this.cluster == null) {        this.cluster = (Cluster)this.ugi.doAs(new PrivilegedExceptionAction() {            public Cluster run() throws IOException, InterruptedException, ClassNotFoundException {                return new Cluster(Job.this.getConfiguration());            }        });    }}

这代码最重要的就是创建了一个 Cluster对象,下面看看这个类的构造方法。

//----------------------------Cluster.javapublic Cluster(Configuration conf) throws IOException {    this((InetSocketAddress)null, conf);}public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {    this.fs = null;    this.sysDir = null;    //job工作目录    this.stagingAreaDir = null;    this.jobHistoryDir = null;    //客户端和server通信协议提供者    this.providerList = null;    //将job的配置conf保存    this.conf = conf;    //获取当前用户    this.ugi = UserGroupInformation.getCurrentUser();    //对job提交器client进行初始化    this.initialize(jobTrackAddr, conf);}//这里就是初始化client的方法了,主要就是获得 this.clientprivate void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {    this.initProviderList();    Iterator i$ = this.providerList.iterator();    while(i$.hasNext()) {        /*        provider这里也有分 YarnClientProtocolProvider 以及LocalClientProtocolProvider        即本地和yarn两种provider        */        ClientProtocolProvider provider = (ClientProtocolProvider)i$.next();        LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName());        ClientProtocol clientProtocol = null;        try {            /*判断jobTrackAddr是否为空,也就是以远程集群还是本地的方式运行job.              远程集群的话,就创建yarn 提交器,:YARNRunner,通过YarnClientProtocolProvider创建              本地的话,就创建本地local 提交器:LocalRunner,通过 LocalClientProtocolProvider创建            主要是根据 mapreduce.framework.name 在conf中的值是local还是yarn来创建对应的runner            */            if (jobTrackAddr == null) {                clientProtocol = provider.create(conf);            } else {                clientProtocol = provider.create(jobTrackAddr, conf);            }            if (clientProtocol != null) {                this.clientProtocolProvider = provider;                //可以看到这里client就是上面通过provider创建的                this.client = clientProtocol;                LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider");                //只要成功创建了client 和 provider就退出                break;            }            LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol");        } catch (Exception var7) {            LOG.info("Failed to use " + provider.getClass().getName() + " due to error: ", var7);        }    }    if (null == this.clientProtocolProvider || null == this.client) {        throw new IOException("Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.");    }}

可以看到Cluster对象主要就是初始化了 clientProtocolProvider 以及 client 两个对象。
也就是provider和client,client是通过provider.create创建的。

下面可以看看ClientProtocolProvider和 ClientProtocol这两个类。这两个类都是抽象类,那么看他们对应有哪些实现子类。

ClientProtocolProvider:    YarnClientProtocolProvider    LocalClientProtocolProviderClientProtocol:    YARNRunner    LocalJobRunner

可以看看YarnClientProtocolProvider 以及LocalClientProtocolProvider的create方法

public class LocalClientProtocolProvider extends ClientProtocolProvider {   .........    public ClientProtocol create(Configuration conf) throws IOException {        String framework = conf.get("mapreduce.framework.name", "local");        if (!"local".equals(framework)) {            return null;        } else {            conf.setInt("mapreduce.job.maps", 1);            //创建LocalJobRunner            return new LocalJobRunner(conf);        }    }    .....................}//===============================================================public class YarnClientProtocolProvider extends ClientProtocolProvider {...................................    public ClientProtocol create(Configuration conf) throws IOException {        //创建 YARNRunner        return "yarn".equals(conf.get("mapreduce.framework.name")) ? new YARNRunner(conf) : null;    }...........................}

总的来说,就是provider分为YarnClientProtocolProvider 以及LocalClientProtocolProvider,分别用于创建client中的 YARNRunner 和 LocalJobRunner。表示job运行方式有本地和yarn两种。

至此,this.client以及this.provider这两个在Cluster对象中的对象初始化完成。

4、this.getJobSubmitter()封装submitter

//-------------------job.javapublic JobSubmitter getJobSubmitter(FileSystem fs, ClientProtocol submitClient) throws IOException {        return new JobSubmitter(fs, submitClient);}

创建个 JobSubmitter对象,看看构造方法

//------------------JobSubmitter.javaJobSubmitter(FileSystem submitFs, ClientProtocol submitClient) throws IOException {        this.submitClient = submitClient;        this.jtFs = submitFs;    }

看起来,没啥特别, 就是把文件系统fs以及 上面cluster中初始化的client保存起来。但是其实这个类中有很多方法后面会调用。后面讲

5、submitter.submitJobInternal()提交job

这个方法是整个job提交过程中的核心,要注意看

//------------------JobSubmitter.javaJobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {    //检查配置的输出是否已存在,已存在会抛出异常    this.checkSpecs(job);    Configuration conf = job.getConfiguration();    addMRFrameworkToDistributedCache(conf);    //获取所有job工作总目录    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);    //获取ip地址对象    InetAddress ip = InetAddress.getLocalHost();    //设置提交job的主机名和ip    if (ip != null) {        this.submitHostAddress = ip.getHostAddress();        this.submitHostName = ip.getHostName();        conf.set("mapreduce.job.submithostname", this.submitHostName);        conf.set("mapreduce.job.submithostaddress", this.submitHostAddress);    }    //通过client向集群申请运行job,获取到对应的jobid.这个submitclient是前面cluster初始化完成的    JobID jobId = this.submitClient.getNewJobID();    job.setJobID(jobId);    //创建存储job相关资源数据的目录对象.存储job配置文件、切片信息文件、程序jar包等    Path submitJobDir = new Path(jobStagingArea, jobId.toString());    JobStatus status = null;    JobStatus var24;    try {        conf.set("mapreduce.job.user.name", UserGroupInformation.getCurrentUser().getShortUserName());        conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");        conf.set("mapreduce.job.dir", submitJobDir.toString());        LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir");        //获取访问namenode中特定目录授权        TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{submitJobDir}, conf);        this.populateTokenCache(conf, job.getCredentials());        //验证token相关        if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {            KeyGenerator keyGen;            try {                keyGen = KeyGenerator.getInstance("HmacSHA1");                keyGen.init(64);            } catch (NoSuchAlgorithmException var19) {                throw new IOException("Error generating shuffle secret key", var19);            }            SecretKey shuffleKey = keyGen.generateKey();            TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials());        }        if (CryptoUtils.isEncryptedSpillEnabled(conf)) {            conf.setInt("mapreduce.am.max-attempts", 1);            LOG.warn("Max job attempts set to 1 since encrypted intermediatedata spill is enabled");        }        //复制job的临时文件,以及运行的jar包到submitJobDir下        this.copyAndConfigureFiles(job, submitJobDir);        //获取存储job配置信息文件路径,一般命名为:submitJobDir/job.xml        Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);        LOG.debug("Creating splits at " + this.jtFs.makeQualified(submitJobDir));        //将切片信息存储到submitJobDir下,并返回切片数目。会调用 InputFormat.getSplits()来获取规划的切片信息        //切片信息会写入到 submitJobDir/job.split,切片信息条目的元信息写入到 submitJobDir/job.splitmetainfo        int maps = this.writeSplits(job, submitJobDir);        conf.setInt("mapreduce.job.maps", maps);        LOG.info("number of splits:" + maps);        //传输队列名称            String queue = conf.get("mapreduce.job.queuename", "default");        //submitClient其实就是cluster的client        AccessControlList acl = this.submitClient.getQueueAdmins(queue);        conf.set(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());        TokenCache.cleanUpTokenReferral(conf);        if (conf.getBoolean("mapreduce.job.token.tracking.ids.enabled", false)) {            ArrayList trackingIds = new ArrayList();            Iterator i$ = job.getCredentials().getAllTokens().iterator();            while(i$.hasNext()) {                Token t = (Token)i$.next();                trackingIds.add(t.decodeIdentifier().getTrackingId());            }            conf.setStrings("mapreduce.job.token.tracking.ids", (String[])trackingIds.toArray(new String[trackingIds.size()]));        }        ReservationId reservationId = job.getReservationId();        if (reservationId != null) {            conf.set("mapreduce.job.reservation.id", reservationId.toString());        }        //将job的configuration信息写入到 submitJobDir/job.xml        this.writeConf(conf, submitJobFile);        this.printTokens(jobId, job.getCredentials());        //通过client提交job,包括job资源目录,验证信息.        //这里要看使用的client是YARNRunner还是LocalRunner        //最后返回提交job的状态        status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());        if (status == null) {            throw new IOException("Could not launch job");        }        var24 = status;    } finally {        //如果提交任务失败,则删除jobdir        if (status == null) {            LOG.info("Cleaning up the staging area " + submitJobDir);            if (this.jtFs != null && submitJobDir != null) {                this.jtFs.delete(submitJobDir, true);            }        }    }    return var24;}

总结一下上面的主要流程:
(1)Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
获取job总的工作目录

(2)JobID jobId = this.submitClient.getNewJobID();
job.setJobID(jobId);
通过处理client向集群申请jobid,并保持到job的配置信息中。

(3)Path submitJobDir = new Path(jobStagingArea, jobId.toString());
获取当前job的工作目录,以及jobid命名

(4)this.copyAndConfigureFiles(job, submitJobDir);
复制job的临时文件,运行的jar包到submitJobDir下

(5)Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
获取job配置信息文件的路径。命名为:submitJobDir/job.xml

(6)int maps = this.writeSplits(job, submitJobDir);
将切片信息存储到submitJobDir下,并返回切片数目。会调用 InputFormat.getSplits()来获取规划的切片信息。切片信息会写入到 submitJobDir/job.split,切片信息条目的元信息写入到 submitJobDir/job.splitmetainfo。

(7)this.writeConf(conf, submitJobFile);
将job配置信息写入到 submitJobDir/job.xml 中

(8)status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
正式提交job,获取job的提交状态

下面挑比较复杂的看看这些的具体实现。

重点在于job任务的资源的生成,如切片文件的生成。

=================================================================

(1)this.copyAndConfigureFiles(job, submitJobDir);

复制job的临时文件,运行的jar包到submitJobDir下

//------------------JobSubmitter.javaprivate void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException {        JobResourceUploader rUploader = new JobResourceUploader(this.jtFs);        rUploader.uploadFiles(job, jobSubmitDir);        job.getWorkingDirectory();    }//----------------------JobResourceUploader.javapublic void uploadFiles(Job job, Path submitJobDir) throws IOException {......................        String files = conf.get("tmpfiles");        String libjars = conf.get("tmpjars");        String archives = conf.get("tmparchives");        String jobJar = job.getJar();..................代码长,就截取一点,这些就是要复制到job目录的文件类型}

可以看到主要复制jar包以及相关的文件到job工作目录下。

(2)Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

获取job配置信息文件的路径。命名为:submitJobDir/job.xml

//-----------------------------JobSubmissionFiles.javapublic static Path getJobConfPath(Path jobSubmitDir) {        return new Path(jobSubmitDir, "job.xml");}

(3)int maps = this.writeSplits(job, submitJobDir);

将切片信息存储到submitJobDir下,并返回切片数目。会调用 InputFormat.getSplits()来获取规划的切片信息。切片信息会写入到 submitJobDir/job.split,切片信息条目的元信息写入到 submitJobDir/job.splitmetainfo。返回的是切片数目

//------------------------JobSubmitter.javaprivate int writeSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {    JobConf jConf = (JobConf)job.getConfiguration();    int maps;    if (jConf.getUseNewMapper()) {        maps = this.writeNewSplits(job, jobSubmitDir);    } else {        maps = this.writeOldSplits(jConf, jobSubmitDir);    }    return maps;}

没什么特别的,主要就是区分新旧api,我们看 this.writeNewSplits

//------------------------JobSubmitter.javaprivate  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException {    Configuration conf = job.getConfiguration();    //反射获取指定的inputformat对象,默认TextInputFormat    InputFormat input = (InputFormat)ReflectionUtils.newInstance(job.getInputFormatClass(), conf);    //通过inputformat的getSplits() 生成获取规划切片信息    List splits = input.getSplits(job);    T[] array = (InputSplit[])((InputSplit[])splits.toArray(new InputSplit[splits.size()]));    Arrays.sort(array, new JobSubmitter.SplitComparator());    //创建切片文件原始数据文件,以及元数据文件    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);    return array.length;}

获取 inputformat对象,通过inputformat的getSplits() 获取规划切片信息,然后JobSplitWriter.createSplitFiles()创建切片信息文件。下面最后这个方法

//------------------JobSplitWriter.createSplitFilespublic static  void createSplitFiles(Path jobSubmitDir, Configuration conf, FileSystem fs, T[] splits) throws IOException, InterruptedException {    //创建切片输出流,文件命名为 jobSubmitDir/job.split    FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf);    //将数组中的每个切片元信息进行序列化,并将切片信息写入到jobSubmitDir/job.split中    //返回的是每个切片条目的元信息,比如每条切片信息在 job.split中的起始位置,长度等    SplitMetaInfo[] info = writeNewSplits(conf, splits, out);    out.close();    //将切片信息文件的元信息写入到文件 jobSubmitDir/job.splitmetainfo 中    writeJobSplitMetaInfo(fs, JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), 1, info);}

这里主要生成两个主要文件
jobSubmitDir/job.split:切片信息文件,记录每个切片的信息,比如路径,block位置,偏移量等
jobSubmitDir/job.splitmetainfo:切片信息文件中每个信息条目的索引位置,如每条切片信息在 job.split中的起始位置,长度等

下面看看这两个文件的生成
首先是jobSubmitDir/job.split

private static  SplitMetaInfo[] writeNewSplits(Configuration conf, T[] array, FSDataOutputStream out) throws IOException, InterruptedException {    SplitMetaInfo[] info = new SplitMetaInfo[array.length];    if (array.length != 0) {        SerializationFactory factory = new SerializationFactory(conf);        int i = 0;        int maxBlockLocations = conf.getInt("mapreduce.job.max.split.locations", 10);        long offset = out.getPos();        InputSplit[] arr$ = array;        int len$ = array.length;        //循环将切片信息中每一条切片信息写入到文件中,并生成每条切片信息的元信息        for(int i$ = 0; i$ < len$; ++i$) {            T split = arr$[i$];            long prevCount = out.getPos();            Text.writeString(out, split.getClass().getName());            Serializer serializer = factory.getSerializer(split.getClass());            serializer.open(out);            //将切片信息对象序列化存储到文件中            serializer.serialize(split);            long currCount = out.getPos();            String[] locations = split.getLocations();            if (locations.length > maxBlockLocations) {                LOG.warn("Max block location exceeded for split: " + split + " splitsize: " + locations.length + " maxsize: " + maxBlockLocations);                locations = (String[])Arrays.copyOf(locations, maxBlockLocations);            }            //生成每条切片信息的元信息            info[i++] = new SplitMetaInfo(locations, offset, split.getLength());            offset += currCount - prevCount;        }    }    return info;}

主要就是将split中的切片信息条目对象序列化写入到文件中,并生成jobSubmitDir/job.splitmetainfo中要写入的信息,也就是切片文件的索引信息
接着看看 writeJobSplitMetaInfo()

private static void writeJobSplitMetaInfo(FileSystem fs, Path filename, FsPermission p, int splitMetaInfoVersion, SplitMetaInfo[] allSplitMetaInfo) throws IOException {    //写入切片信息条目的元信息,创建一个输出流    FSDataOutputStream out = FileSystem.create(fs, filename, p);    out.write(JobSplit.META_SPLIT_FILE_HEADER);    WritableUtils.writeVInt(out, splitMetaInfoVersion);    WritableUtils.writeVInt(out, allSplitMetaInfo.length);    SplitMetaInfo[] arr$ = allSplitMetaInfo;    int len$ = allSplitMetaInfo.length;    //逐条写入    for(int i$ = 0; i$ < len$; ++i$) {        SplitMetaInfo splitMetaInfo = arr$[i$];        splitMetaInfo.write(out);    }    out.close();}

这里其实很明显了,就是将切片文件索引信息写入到 jobSubmitDir/job.splitmetainfo

(4)status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

正式提交job,获取job的提交状态

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException {    this.addHistoryToken(ts);    //这里就是将job配置,以及job资源的hdfs目录路径传入    ApplicationSubmissionContext appContext = this.createApplicationSubmissionContext(this.conf, jobSubmitDir, ts);    try {        //提交job,返回的appid        ApplicationId applicationId = this.resMgrDelegate.submitApplication(appContext);        //根据appid创建appMaster        ApplicationReport appMaster = this.resMgrDelegate.getApplicationReport(applicationId);        String diagnostics = appMaster == null ? "application report is null" : appMaster.getDiagnostics();        if (appMaster != null && appMaster.getYarnApplicationState() != YarnApplicationState.FAILED && appMaster.getYarnApplicationState() != YarnApplicationState.KILLED) {            return this.clientCache.getClient(jobId).getJobStatus(jobId);        } else {            throw new IOException("Failed to run job : " + diagnostics);        }    } catch (YarnException var8) {        throw new IOException(var8);    }}

这里主要就是提交job,创建appMaster。最后获取job状态。

二、总结

一个job提交流程主要如下:
1、和MapReduce集群建立连接 this.connect()
这里面最重要就是创建了 client,有 YARNRunner和LocalJobRunner两种方式。后续用来和server端通信、提交job等。

2、正式提交job ,submitter.submitJobInternal(Job.this, cluster)
(1)Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
获取job总的工作目录

(2)JobID jobId = this.submitClient.getNewJobID();
job.setJobID(jobId);
通过处理client向集群申请jobid,并保持到job的配置信息中。

(3)Path submitJobDir = new Path(jobStagingArea, jobId.toString());
获取当前job的工作目录,以及jobid命名

(4)this.copyAndConfigureFiles(job, submitJobDir);
复制job的临时文件,运行的jar包到submitJobDir下

(5)Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
获取job配置信息文件的路径。命名为:submitJobDir/job.xml

(6)int maps = this.writeSplits(job, submitJobDir);
将切片信息存储到submitJobDir下,并返回切片数目。会调用 InputFormat.getSplits()来获取规划的切片信息。切片信息会写入到 submitJobDir/job.split,切片信息条目的元信息写入到 submitJobDir/job.splitmetainfo。

(7)this.writeConf(conf, submitJobFile);
将job配置信息写入到 submitJobDir/job.xml 中

(8)status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
正式提交job,获取job的提交状态

0