java中整体MR工作机制是怎样的
本篇内容主要讲解"java中整体MR工作机制是怎样的",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"java中整体MR工作机制是怎样的"吧!
1、整体MR工作机制源码解读(job提交流程)
1.1、job提交流程
--以wordCount案例为例,进行断点调试1、在WordCountDriver类中的job.waitForCompletion(true);处打上断点(入口),以debug模式运行 a.在Configuration conf = new Configuration();conf中做的操作是读取所有相关的配置文件 b.并将job对象创建出来,通过--Job job = Job.getInstance(conf);完成
2、进入waitForCompletion()方法if (state == JobState.DEFINE) { // --确定job的当前状态,如果是state,则进行提交 submit();}3、waitForCompletion()方法中的参数boolean verbose ~ verbose:true(默认值)if (verbose) { monitorAndPrintJob(); --对当前的job进行监控,并打印job的信息} 4、进入submit()方法 ~位置为Job.java~1562行 --ensureState(JobState.DEFINE); 再次确认Job的状态 --setUseNewAPI(); 设置使用新的API --hadoop中提供了2套API --connect(); 明确当前提交的Job运行的环境是本地还是集群4.1、进入connect()方法 --Job.java~1534行 --cluster理解为当前job运行所需的一个环境对象,开始cluster为null,通过匿名内部类进行对象的创建4.2 进入return new Cluster(getConfiguration())方法 --Job.java~1540行4.3 进入Cluster.java类,查看Cluster的有参构造 --Cluster.java~105行4.4 进入initialize(jobTrackAddr, conf);方法,定位到initProviderList();//获取Job运行的环境列表4.5 进入initProviderList()方法 //获取job运行的环境列表 --Cluster.java~75行4.5 查看Cluster.java类中的124行,查看遍历providerList有2种运行环境 YarnClientProtocolProvider ==>集群环境 LocalClientProtocolProvider==>本地环境4.6 进入Cluster.java类130行,clientProtocol = provider.create(conf)方法,进入可以看到4.7 YarnClientProtocolProvider.class 类19行
4.7 clientProtocol = null,继续向下走,可以看到下面的操作是对当前运行环境的判断 根据Provider结合当前的conf判断是哪个环境 YarnClientProtocolProvider ==> YarnRunner --yarn的运行对象 LocalClientProtocolProvider==> LocalJobRunner --本地的运行对象 5、connect()执行完毕,继续向下执行,看Job.java 1565行, //构造job的提交器对象 final JobSubmitter submitter = this.getJobSubmitter(this.cluster.getFileSystem(), this.cluster.getClient()); --1565行,使用的是当前构造器的文件系统对象及客户端对象 6、继续向下走,到Job.java的1570行,该行代码才是真正进行job的提交return submitter.submitJobInternal(Job.this, cluster); 通过JobSubmitter提交Job7、job的状态转变为可执行,this.state = Job.JobState.RUNNING; --Job.java类的1573行8、从1570行打断点进入方法,进入JobSubmitter.java类中,定位到该类139行,submitJobInternal()方法,向下走断点9、定位到 checkSpecs(job); 方法,用于校验输出路径进入该方法
10、进入到output.checkOutputSpeces(job),查看源码 --进入到FileOutPutFormat.java类中,定位到151行。由此可以得到的一个结果是:输出路径的校验是在job提交之前完成的
11、跳出checkSpecs(job);方法,继续向下走 --JobSubPath jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);//获取Job临时工作目录 --D:/tmp/hadoop/mapred/staging/Administrator1590679188/.staging12、继续向下走,定位到代码157行,submitClient.getNewJobId(); //获取提交的job的jobId JobID jobId = submitClient.getNewJobID(); //jobId=job_local11590679188_001本地模式下,我们知道每个job都有对应一个jobId,不管程序在本地还是yarn 13、Path submitJobDir = new Path(jobStagingArea, jobId.toString()); //生成Job提交路径--D:/tmp/hadoop/mapred/staging/Administrator1590679188/.staging/job_local11590679188_001 job14、copyAndConfigureFiles(job, submitJobDir);//拷贝Job相关的配置信息,并将job的提交路径在磁盘中创建出来
15、进入uploadResourcesInternal(job,submitJobDir);方法 从JobSubmitter类99行进入
16、进入uploadResourcesInternal(job,submitJobDir)方法,读取配置项
17、进入writeSplits(job,submitJobDir);方法 writeSplits(job, submitJobDir); //生成切片信息
18、定位到 maps = writeNewSplits(job, jobSubmitDir); ,进入该方法 //生成切片进入
切片对象splits内容为: file:///D:/input/inputWord/JaneEyre.txt:0+36306679 (文件,读取位置从0到36306679) 切片是逻辑上的说法,记录的就是读取文件从什么位置到什么位置
19、切片对象splits中记录的内容是:读取的是那个文件,从文件的0位置开始读取到那个位置
20、return array.length; //返回切片的个数回到200行位置,将writeSplits(job, submitJobDir)返回的数值赋给maps
21、conf.setInt(MRJobConfig.NUM_MAPS, maps); //根据切片的个数设置启动多少个MapTask 并最终在job的提交路径中有两个文件:
--job.split 切片具体信息
--job.splitmetainfo 切片描述信息
22、writeConf(conf, submitJobFile); //把job的所有配置信息写到job的提交路径下 最终在job的提交路径下生成一个文件:job.xml。该文件记录所有的xml配置信息(包括自己设置的)
23、根据切片信息(确定启动mapTask的个数)和配置信息,真正开始执行job的任务
24、status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); // 真正将job提交进行执行
到此,相信大家对"java中整体MR工作机制是怎样的"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!