千家信息网

MapReduce的output输出过程是什么

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,本篇内容主要讲解"MapReduce的output输出过程是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"MapReduce的output输出过程是什
千家信息网最后更新 2025年02月01日MapReduce的output输出过程是什么

本篇内容主要讲解"MapReduce的output输出过程是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"MapReduce的output输出过程是什么"吧!

1、首先看 ReduceTask.run() 这个执行入口

//--------------------------ReduceTask.javapublic void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException {    job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());    if (this.isMapOrReduce()) {        this.copyPhase = this.getProgress().addPhase("copy");        this.sortPhase = this.getProgress().addPhase("sort");        this.reducePhase = this.getProgress().addPhase("reduce");    }    TaskReporter reporter = this.startReporter(umbilical);    boolean useNewApi = job.getUseNewReducer();    //reducetask初始化工作    this.initialize(job, this.getJobID(), reporter, useNewApi);    if (this.jobCleanup) {        this.runJobCleanupTask(umbilical, reporter);    } else if (this.jobSetup) {        this.runJobSetupTask(umbilical, reporter);    } else if (this.taskCleanup) {        this.runTaskCleanupTask(umbilical, reporter);    } else {        this.codec = this.initCodec();        RawKeyValueIterator rIter = null;        ShuffleConsumerPlugin shuffleConsumerPlugin = null;        Class combinerClass = this.conf.getCombinerClass();        CombineOutputCollector combineCollector = null != combinerClass ? new CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null;        Class clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class);        shuffleConsumerPlugin = (ShuffleConsumerPlugin)ReflectionUtils.newInstance(clazz, job);        LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);        Context shuffleContext = new Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles);        shuffleConsumerPlugin.init(shuffleContext);        rIter = shuffleConsumerPlugin.run();        this.mapOutputFilesOnDisk.clear();        this.sortPhase.complete();        this.setPhase(Phase.REDUCE);        this.statusUpdate(umbilical);        Class keyClass = job.getMapOutputKeyClass();        Class valueClass = job.getMapOutputValueClass();        RawComparator comparator = job.getOutputValueGroupingComparator();        //开始运行reducetask        if (useNewApi) {            this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);        } else {            this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);        }        shuffleConsumerPlugin.close();        this.done(umbilical, reporter);    }

和MapTask类似,主要有 this.initialize() 以及 this.runNewReducer() 这两个方法。做了初始化以及开始运行task的操作。

2、this.initialize()

//----------------------------------------ReduceTask.javapublic void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException {    //创建上下文对象    this.jobContext = new JobContextImpl(job, id, reporter);    this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter);    //修改reducetask的状态为运行中    if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) {        this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING);    }    if (useNewApi) {        if (LOG.isDebugEnabled()) {            LOG.debug("using new api for output committer");        }        //反射获取outputformat类对象。getOutputFormatClass这个方法在JobContextImpl中。        //默认是TextOutputFormat.class        this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job);        this.committer = this.outputFormat.getOutputCommitter(this.taskContext);    } else {        this.committer = this.conf.getOutputCommitter();    }    //获取输出路径    Path outputPath = FileOutputFormat.getOutputPath(this.conf);    if (outputPath != null) {        if (this.committer instanceof FileOutputCommitter) {            FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext));        } else {            FileOutputFormat.setWorkOutputPath(this.conf, outputPath);        }    }    this.committer.setupTask(this.taskContext);    Class clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class);    this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf);    LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree);    if (this.pTree != null) {        this.pTree.updateProcessTree();        this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime();    }}

主要就是初始化上下文对象,获取outputformat对象。

3、this.runNewReducer()

//-----------------------------------------------ReduceTask.javaprivate  void runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException, ClassNotFoundException {    //匿名内部类,用于构建key,value的迭代器    rIter = new RawKeyValueIterator() {        public void close() throws IOException {            rIter.close();        }        public DataInputBuffer getKey() throws IOException {            return rIter.getKey();        }        public Progress getProgress() {            return rIter.getProgress();        }        public DataInputBuffer getValue() throws IOException {            return rIter.getValue();        }        public boolean next() throws IOException {            boolean ret = rIter.next();            reporter.setProgress(rIter.getProgress().getProgress());            return ret;        }    };    TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter);    //反射获取Reducer对象    org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);    //获取RecordWriter对象,用于将结果写入到文件中    org.apache.hadoop.mapreduce.RecordWriter trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext);    job.setBoolean("mapred.skip.on", this.isSkipping());    job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());    //创建reduceContext对象,用于reduce任务    org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass);    //开始运行reduce    try {        reducer.run(reducerContext);    } finally {        //关闭输出流        trackedRW.close(reducerContext);    }}

可以看到,主要做了以下工作:
1)获取reducer对象,用于运行run() ,也就是运行reduce方法
2)创建 RecordWriter对象
3)创建reduceContext
4)开始运行reducer中的run

4、ReduceTask.NewTrackingRecordWriter()

//--------------------------------------NewTrackingRecordWriter.javastatic class NewTrackingRecordWriter extends org.apache.hadoop.mapreduce.RecordWriter {    private final org.apache.hadoop.mapreduce.RecordWriter real;    private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;    private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter;    private final List fsStats;    NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException {        this.outputRecordCounter = reduce.reduceOutputCounter;        this.fileOutputByteCounter = reduce.fileOutputByteCounter;        List matchedStats = null;        if (reduce.outputFormat instanceof FileOutputFormat) {            matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration());        }        this.fsStats = matchedStats;        long bytesOutPrev = this.getOutputBytes(this.fsStats);        //通过outputFormat创建RecordWriter对象        this.real = reduce.outputFormat.getRecordWriter(taskContext);        long bytesOutCurr = this.getOutputBytes(this.fsStats);        this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);    }    .....................}

重点的就是通过outputFormat.getRecordWriter来创建 RecordWriter 对象。
上面也说到,outputFormat默认就是 TextOutputFormat,所以下面看看
TextOutputFormat.getRecordWriter()

5、TextOutputFormat.getRecordWriter()

public class TextOutputFormat extends FileOutputFormat {    public TextOutputFormat() {    }    //可以看到,返回的是静态内部类TextOutputFormat.LineRecordWriter    public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {        boolean isCompressed = getCompressOutput(job);        //key和value的分隔符,默认是 \t        String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t");        //分为压缩和非压缩输出        if (!isCompressed) {            //获取输出路径            Path file = FileOutputFormat.getTaskOutputPath(job, name);            FileSystem fs = file.getFileSystem(job);            //创建输出流            FSDataOutputStream fileOut = fs.create(file, progress);            return new TextOutputFormat.LineRecordWriter(fileOut, keyValueSeparator);        } else {            Class codecClass = getOutputCompressorClass(job, GzipCodec.class);            CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job);            Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());            FileSystem fs = file.getFileSystem(job);            FSDataOutputStream fileOut = fs.create(file, progress);            //返回LineRecordWriter对象            return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);        }    }    //这里就是 LineRecordWriter 类    protected static class LineRecordWriter implements RecordWriter {        private static final byte[] NEWLINE;        protected DataOutputStream out;        private final byte[] keyValueSeparator;        public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {            this.out = out;            this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8);        }        public LineRecordWriter(DataOutputStream out) {            this(out, "\t");        }        private void writeObject(Object o) throws IOException {            if (o instanceof Text) {                Text to = (Text)o;                this.out.write(to.getBytes(), 0, to.getLength());            } else {                this.out.write(o.toString().getBytes(StandardCharsets.UTF_8));            }        }        //将KV输出        public synchronized void write(K key, V value) throws IOException {            boolean nullKey = key == null || key instanceof NullWritable;            boolean nullValue = value == null || value instanceof NullWritable;            if (!nullKey || !nullValue) {                //先写key                if (!nullKey) {                    this.writeObject(key);                }                //接着写入key和value之间的分隔符                if (!nullKey && !nullValue) {                    this.out.write(this.keyValueSeparator);                }                //最后写入value                if (!nullValue) {                    this.writeObject(value);                }                //接着写入新的一行                this.out.write(NEWLINE);            }        }        public synchronized void close(Reporter reporter) throws IOException {            this.out.close();        }        static {            NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);        }    }}

可以看到,最终返回的RecordWriter对象是 LineRecordWriter 类型的。
接着回到3中,看 reduceContext这个对象的类

6、reduceContext = ReduceTask.createReduceContext()

protected static  Reducer.Context createReduceContext(Reducer reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws IOException, InterruptedException {    ReduceContext reduceContext = new ReduceContextImpl(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass);    Reducer.Context reducerContext = (new WrappedReducer()).getReducerContext(reduceContext);    return reducerContext;}

可以看到reducerContext是一个ReduceContextImpl类对象。
下面看看ReduceContextImpl 这个类的构造方法

//---------------------------------ReduceContextImpl.javapublic ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws InterruptedException, IOException {    //父类是 TaskInputOutputContextImpl,把outputformat对象传递进去了    super(conf, taskid, output, committer, reporter);    this.input = input;    this.inputKeyCounter = inputKeyCounter;    this.inputValueCounter = inputValueCounter;    this.comparator = comparator;    this.serializationFactory = new SerializationFactory(conf);    this.keyDeserializer = this.serializationFactory.getDeserializer(keyClass);    this.keyDeserializer.open(this.buffer);    this.valueDeserializer = this.serializationFactory.getDeserializer(valueClass);    this.valueDeserializer.open(this.buffer);    this.hasMore = input.next();    this.keyClass = keyClass;    this.valueClass = valueClass;    this.conf = conf;    this.taskid = taskid;}

这里面,它继续调用了父类的构造方法,把outputformat对象传递进去了。
继续看看父类 TaskInputOutputContextImpl

public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriter output, OutputCommitter committer, StatusReporter reporter) {    //可以看到这里的output就是recordWriter对象    super(conf, taskid, reporter);    this.output = output;    this.committer = committer;}//这里的逻辑其实就是先读取KV到 this.key和this.value中,如果没有KV就返回false,否则返回truepublic abstract boolean nextKeyValue() throws IOException, InterruptedException;public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;//调用recordWriter的write方法,将KV输出,默认是LineRecordWriter这个类public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException {    this.output.write(key, value);

可以看到,这里有3个抽象方法(在子类ReduceContextImpl中实现了逻辑,和RecordWriter无关),以及write这个具体方法。分别用于获取KV以及将结果KV写入。write这个写入方法,就是调用的 recordWriter的write方法,也就是5中创建的LineRecordWriter对象中的write方法,将KV输出。

7、reducer.run()

public void run(Reducer.Context context) throws IOException, InterruptedException {        this.setup(context);        try {            while(context.nextKey()) {                this.reduce(context.getCurrentKey(), context.getValues(), context);                Iterator iter = context.getValues().iterator();                if (iter instanceof ValueIterator) {                    ((ValueIterator)iter).resetBackupStore();                }            }        } finally {            this.cleanup(context);        }    }

可以看到,这里就是调用6中创建的 reduceContext中的方法来获取KV。而且在reduce方法中,我们会通过 context.write(key,value)来将结果KV输出。调用的其实就是 LineRecordWriter对象中的write方法。

到此,相信大家对"MapReduce的output输出过程是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0