千家信息网

ReduceTask流程是怎样的

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要介绍"ReduceTask流程是怎样的",在日常操作中,相信很多人在ReduceTask流程是怎样的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Reduc
千家信息网最后更新 2025年02月03日ReduceTask流程是怎样的

这篇文章主要介绍"ReduceTask流程是怎样的",在日常操作中,相信很多人在ReduceTask流程是怎样的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"ReduceTask流程是怎样的"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

ReduceTask流程源码解读

1、最终的文件就是 file.out 和 file.out.index ,等待reduce的拷贝.

2、在LocalJobRunner$Job中的run方法中:              //LocalJobRunner类中555行if (numReduceTasks > 0) {                      //判断reduceTask的个数   //创建Runnable对象: LocalJobRunner$Job$ReduceTaskRunnable             List reduceRunnables = getReduceTaskRunnables(            jobId, mapOutputFiles);        //创建线程池        ExecutorService reduceService = createReduceExecutor();        //将所有的LocalJobRunner$Job$ReduceTaskRunnable 提交到线程池执行.        runTasks(reduceRunnables, reduceService, "reduce");       }
3、进入runTasks(reduceRunnables, reduceService, "reduce");方法     //559行for (Runnable r : runnables) {          //循环每个Runnable,提交给线程池去执行.    service.submit(r);}

4、线程执行的时候,要运行LocalJobRunner$Job$ReduceTaskRunnable 中run方法

5、创建ReduceTask对象            //LocalJobRunner类~332行ReduceTask reduce = new ReduceTask(systemJobFile.toString(),reduceId, taskId,mapIds.size(), 1);6、执行ReduceTask中的run方法//LocalJobRunner类 --> 347行reduce.run(localConf, Job.this); --> //进入run方法7、调到ReduceTask的run方法内                           //ReduceTask类~320行      initialize(job, getJobID(), reporter, useNewApi);        //初始化~333行sortPhase.complete();                                      //排序~382行RawComparator comparator = job.getOutputValueGroupingComparator();       //387行 获取分组比较器
8、进入下列代码(390行)runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);进入runNewReducer方法内              //ReduceTask~577行--获取job的相关信息 org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); --反射的操作创建reduce对象 ,例如: WordCountReducerorg.apache.hadoop.mapreduce.Reducer reducer =      (org.apache.hadoop.mapreduce.Reducer)        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);        --创建RecordWriter对象org.apache.hadoop.mapreduce.RecordWriter trackedRW =       new NewTrackingRecordWriter(this, taskContext);

9、向下走,定位到reducer.run(reducerContext);方法 --> 然后进入(Reducer的run方法)    //~628行setup(context);reduce(context.getCurrentKey(), context.getValues(), context); //执行到WordCountReducer中的reduce方法,是一个循环调用过程.context.write(key,outv);                //数据写出源码流程如下:①:reduceContext.write(key, value);②:output.write(key, value);//进入到ReduceTask的write方法         //557行③:real.write(key,value);           //real :TextOutputFormat$LineRecordWriter进入到real.write()方法                       //TextOutputFormat类~84行writeObject(key);   //写出keywriteObject(value); //写出value写出key的源码~简单看下:                  //TextOutputFormat类~75行private void writeObject(Object o) throws IOException {        if (o instanceof Text) {                Text to = (Text) o;                out.write(to.getBytes(), 0, to.getLength());        } else {                out.write(o.toString().getBytes(StandardCharsets.UTF_8));                //调用对象的toString方法,将返回的字符串转换成字节,通过流写出        }}
10、cleanup(context);                 //清除生相关的文件,生成分区文件

整体MR工作机制源码解读总结

源码总结说明:1. 看源码目的:       熟悉整个MR的流程,能够将我们讲解的知识点对应到源码中具体的位置.       为面试做准备.2. 在整个MR中 ,会有N个MapTask(按照切片数量决定个数)和 N个ReduceTask(自行设置个数)        --在集群中的效果是多个MapTask并行运行, 并行数由集群的资源来决定.         --多个ReduceTask并行运行,并行数由集群的资源来决定. 一般来说,ReduceTask的数量比较少,基本上都能够同时并行.

到此,关于"ReduceTask流程是怎样的"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0