千家信息网

九、MapReduce--input源码分析

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,当job提交至yarn之后,就会开始调度运行map任务,这里开始讲解map输入的源码分析。一个map任务的入口就是 MapTask.class 中的run() 方法1、首先看看MapTask.run(
千家信息网最后更新 2025年02月02日九、MapReduce--input源码分析

当job提交至yarn之后,就会开始调度运行map任务,这里开始讲解map输入的源码分析。
一个map任务的入口就是 MapTask.class 中的run() 方法

1、首先看看MapTask.run() 方法

MapTask.class

//---------------------------------MapTask.javapublic void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException {     this.umbilical = umbilical;    if (this.isMapTask()) {        if (this.conf.getNumReduceTasks() == 0) {            this.mapPhase = this.getProgress().addPhase("map", 1.0F);        } else {            this.mapPhase = this.getProgress().addPhase("map", 0.667F);            this.sortPhase = this.getProgress().addPhase("sort", 0.333F);        }    }    TaskReporter reporter = this.startReporter(umbilical);    boolean useNewApi = job.getUseNewMapper();    //进行map任务的初始化    this.initialize(job, this.getJobID(), reporter, useNewApi);    if (this.jobCleanup) {        this.runJobCleanupTask(umbilical, reporter);    } else if (this.jobSetup) {        this.runJobSetupTask(umbilical, reporter);    } else if (this.taskCleanup) {        this.runTaskCleanupTask(umbilical, reporter);    } else {        //启动map任务,判断是使用新的还是旧的api        if (useNewApi) {            this.runNewMapper(job, this.splitMetaInfo, umbilical, reporter);        } else {            this.runOldMapper(job, this.splitMetaInfo, umbilical, reporter);        }        this.done(umbilical, reporter);    }}

上面重点有两个方法,一个是 this.initialize()以及 this.runNewMapper()。

2、下面看看this.initialize()

//---------------------------------Task.javapublic void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException {    //创建task以及job上下文对象    this.jobContext = new JobContextImpl(job, id, reporter);    this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter);    //将task任务的状态改为正在运行    if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) {        this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING);    }    if (useNewApi) {        if (LOG.isDebugEnabled()) {            LOG.debug("using new api for output committer");        }        //获取job中配置的输出格式类,并通过反射获取该类的Class对象        this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job);        //通过outputformat类获取commiter        this.committer = this.outputFormat.getOutputCommitter(this.taskContext);    } else {        this.committer = this.conf.getOutputCommitter();    }    //从FileOutputFormat获取任务结果输出路径。    /*    可能有的人会奇怪,为啥mapper这里要获取outputformat 的输出路径。    首先我们要知道,一个MapReduce任务可以只有mapper,而没有reducer的,    那么这时候程序的输出是有mapper直接输出的,这时候自然就需要知道输出的路径,这里就派上用场了    */    Path outputPath = FileOutputFormat.getOutputPath(this.conf);    if (outputPath != null) {        if (this.committer instanceof FileOutputCommitter) {            FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext));        } else {            FileOutputFormat.setWorkOutputPath(this.conf, outputPath);        }    }    this.committer.setupTask(this.taskContext);    Class clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class);    this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf);    LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree);    if (this.pTree != null) {        this.pTree.updateProcessTree();        this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime();    }}

这个方法主要做了一些初始化工作,比如创建上下文对象,获取输出outputFormat类,以及路径等。

3、下面接着看看this.runNewMapper()

//---------------------------------MapTask.javaprivate  void runNewMapper(JobConf job, TaskSplitIndex splitIndex, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException {    TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter);    //通过反射获取job中配置的mapper实现类    Mapper mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);    //通过反射获取job中配置的输入格式类,默认是TextInputFormat    InputFormat inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);    org.apache.hadoop.mapreduce.InputSplit split = null;    //获取切片详细信息,传入输出路径以及偏移量作为参数.也就是当前mapper处理的某个切片    split = (org.apache.hadoop.mapreduce.InputSplit)this.getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset());    LOG.info("Processing split: " + split);    //获取输入的读取数据文件的 RecordReader 的对象,默认inputformat为TextInputFormat,对应默认的RecordReader为LineRecordReader    org.apache.hadoop.mapreduce.RecordReader input = new MapTask.NewTrackingRecordReader(split, inputFormat, reporter, taskContext);    job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());    RecordWriter output = null;    //获取RecordWriter输出对象    if (job.getNumReduceTasks() == 0) {        output = new MapTask.NewDirectOutputCollector(taskContext, job, umbilical, reporter);    } else {        output = new MapTask.NewOutputCollector(taskContext, job, umbilical, reporter);    }    MapContext mapContext = new MapContextImpl(job, this.getTaskID(), input, (RecordWriter)output, this.committer, reporter, split);    org.apache.hadoop.mapreduce.Mapper.Context mapperContext = (new WrappedMapper()).getMapContext(mapContext);    try {        //初始化RecordReader中的数据        input.initialize(split, mapperContext);        //运行mapper中的run方法,也就是Mapper类中的run方法,开始运行map任务        mapper.run(mapperContext);        this.mapPhase.complete();        this.setPhase(Phase.SORT);        this.statusUpdate(umbilical);        //map运行完,关闭输入、输出流        input.close();        input = null;        ((RecordWriter)output).close(mapperContext);        output = null;    } finally {        this.closeQuietly((org.apache.hadoop.mapreduce.RecordReader)input);        this.closeQuietly((RecordWriter)output, mapperContext);    }}

可以看到,这里就是整个map任务的核心流程,做了以下工作:
(1)获取mapper类对象,下面要执行里面的map方法
(2)获取InputFormat对象,默认是默认inputformat为TextInputFormat
(3)通过InputFormat对象获取RecordReader对象,后面用于读取数据文件
(4)获取用于输出map的结果的RecordWriter对象
(5)获取切片信息,比如切片所在文件的路径,起始偏移量等
(6)初始化切片数据
(7)开始运行mapper中的run()方法
(8)运行完毕,关闭输入流,将结果通过RecordWriter刷写。
(9)刷写完毕后,关闭输入流以及输出流
下面看看其中的核心方法

4、this.getSplitDetails() 获取切片信息

//---------------------------------MapTask.javaprivate  T getSplitDetails(Path file, long offset) throws IOException {    //获取文件系统对象,并打开文件输出流    FileSystem fs = file.getFileSystem(this.conf);    FSDataInputStream inFile = fs.open(file);    //跳过指定的偏移量,也就是从指定偏移量的位置开始读取数据,其实就是切片开始的偏移量    inFile.seek(offset);    String className = StringInterner.weakIntern(Text.readString(inFile));    Class cls;    try {        cls = this.conf.getClassByName(className);    } catch (ClassNotFoundException var13) {        IOException wrap = new IOException("Split class " + className + " not found");        wrap.initCause(var13);        throw wrap;    }    SerializationFactory factory = new SerializationFactory(this.conf);    //反序列化方式打开输入流    Deserializer deserializer = factory.getDeserializer(cls);    deserializer.open(inFile);    T split = deserializer.deserialize((Object)null);    long pos = inFile.getPos();    ((Counter)this.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)).increment(pos - offset);    inFile.close();    //返回切片经过反序列化之后的可读取对象    return split;}

可以看到这里主要是返回切片的反序列化之后可以读取的信息对象

5、接着看看 input.initialize()

在看这个方法之前,首先我们看看input这个对象是由哪个类创建的。它是由NewTrackingRecordReader 这个类创建的。这是个静态内部类

//---------------------------------MapTask.javastatic class NewTrackingRecordReader extends org.apache.hadoop.mapreduce.RecordReader {    private final org.apache.hadoop.mapreduce.RecordReader real;    private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;    private final org.apache.hadoop.mapreduce.Counter fileInputByteCounter;    private final TaskReporter reporter;    private final List fsStats;    NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split, InputFormat inputFormat, TaskReporter reporter, TaskAttemptContext taskContext) throws InterruptedException, IOException {        this.reporter = reporter;        this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);        this.fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ);        List matchedStats = null;        if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {            matchedStats = Task.getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit)split).getPath(), taskContext.getConfiguration());        }        this.fsStats = matchedStats;        long bytesInPrev = this.getInputBytes(this.fsStats);        //调用job任务中定义的inputformat类中的createRecordReader方法,获取RecordReader对象。返回的是 LineRecordReader        this.real = inputFormat.createRecordReader(split, taskContext);        long bytesInCurr = this.getInputBytes(this.fsStats);        this.fileInputByteCounter.increment(bytesInCurr - bytesInPrev);    }    ...........}

我们可以看到构造方法中,是调用 inputFormat对象的createRecordReader() 方法来创建RecordReader对象的,上面也说了默认inputFormat为 TextInputFormat。

//---------------------------TextInputFormat.javapublic class TextInputFormat extends FileInputFormat {    public TextInputFormat() {    }    public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) {        String delimiter = context.getConfiguration().get("textinputformat.record.delimiter");        byte[] recordDelimiterBytes = null;        if (null != delimiter) {            recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);        }        return new LineRecordReader(recordDelimiterBytes);    }

可以清楚看到,返回的就是 LineRecordReader 这个reader类。

接着我们继续看 input.initialize()

static class NewTrackingRecordReader extends org.apache.hadoop.mapreduce.RecordReader {    public void initialize(org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {        long bytesInPrev = this.getInputBytes(this.fsStats);        //调用 RecordReader对象的 initialize方法,初始化输入。上面说到默认的是LineRecordReader        //this.real已经在上面初始化了,就是LineRecordReader        this.real.initialize(split, context);        long bytesInCurr = this.getInputBytes(this.fsStats);        this.fileInputByteCounter.increment(bytesInCurr - bytesInPrev);    }}

可以看到,调用 RecordReader中的 initialize 方法,也就是调用LineRecordReader 中的 initialize() 方法,下面看看

//---------------------------------------LineRecordReader.javapublic void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {    FileSplit split = (FileSplit)genericSplit;    Configuration job = context.getConfiguration();    this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647);    //获取切片的数据开始位置以及终止位置    this.start = split.getStart();    this.end = this.start + split.getLength();    //获取切片对应的文件的输入流    Path file = split.getPath();    FileSystem fs = file.getFileSystem(job);    this.fileIn = fs.open(file);    //如果文件有压缩,则用压缩类解压    CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);    //以压缩方式读取切片    if (null != codec) {        this.isCompressedInput = true;        this.decompressor = CodecPool.getDecompressor(codec);        if (codec instanceof SplittableCompressionCodec) {            SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream(this.fileIn, this.decompressor, this.start, this.end, READ_MODE.BYBLOCK);            this.in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes);            this.start = cIn.getAdjustedStart();            this.end = cIn.getAdjustedEnd();            this.filePosition = cIn;        } else {            if (this.start != 0L) {                throw new IOException("Cannot seek in " + codec.getClass().getSimpleName() + " compressed stream");            }            this.in = new SplitLineReader(codec.createInputStream(this.fileIn, this.decompressor), job, this.recordDelimiterBytes);            this.filePosition = this.fileIn;        }    } else {        //无压缩方式读取切片        this.fileIn.seek(this.start);        //这里很重要,是真正用于读取数据的类        this.in = new UncompressedSplitLineReader(this.fileIn, job, this.recordDelimiterBytes, split.getLength());        this.filePosition = this.fileIn;    }    //对起始偏移量进行修正,并赋值给pos这个偏移量    if (this.start != 0L) {        this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start));    }    this.pos = this.start;}

这里的工作主要是给 RecordReader对象读取文件做初始化工作。主要就是获取切片的输入流对象。
this.in 这里就用于后面读取数据的对象,这里就是完成了这个输入流对象的初始化。

6、接着我们回到3中,看mapper.run() 方法

这个其实就是写的mapper 的run方法:

//------------------------Mapper.java   mapper.run(mapperContext);public void run(Mapper.Context context) throws IOException, InterruptedException {    this.setup(context);    try {        //这里循环读取key和value,给map方法处理        //关键在于 context这个对象,从上面runNewApi中可以看到,是MapContextImpl类型的        while(context.nextKeyValue()) {            this.map(context.getCurrentKey(), context.getCurrentValue(), context);        }    } finally {        this.cleanup(context);    }}

可以看到,这里是个while循环,通过context上下文对象获取KV,然后传入map方法中处理。

7、下面看看 context.nextKeyValue()

从3中可以看到,这个context是 MapContextImpl类型的,看看这个类

//-----------------------MapContextImpl.java..    public class MapContextImpl extends TaskInputOutputContextImpl implements MapContext {    private RecordReader reader;    private InputSplit split;    //构造方法中包括获取 RecordReader对象,以及split    public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader reader, RecordWriter writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) {        super(conf, taskid, writer, committer, reporter);        this.reader = reader;        this.split = split;    }    public InputSplit getInputSplit() {        return this.split;    }    //下面都是调用 RecordReader 中的get方法获取key value    public KEYIN getCurrentKey() throws IOException, InterruptedException {        return this.reader.getCurrentKey();    }    public VALUEIN getCurrentValue() throws IOException, InterruptedException {        return this.reader.getCurrentValue();    }    public boolean nextKeyValue() throws IOException, InterruptedException {       //这里就是调用reader 的方法        return this.reader.nextKeyValue();    }}

在它的构造方法中,主要从3中传入了 split切片,以及 RecordReader对象。下面就是三个获取KV的方法,也就是在 mapper.run() 中调用的方法。

下面看看 this.reader.nextKeyValue()

//----------------------------------LineRecordReader.javapublic boolean nextKeyValue() throws IOException {    if (this.key == null) {        this.key = new LongWritable();    }    //设置key为偏移量    this.key.set(this.pos);    if (this.value == null) {        this.value = new Text();    }    int newSize = 0;    while(this.getFilePosition() <= this.end || this.in.needAdditionalRecordAfterSplit()) {        if (this.pos == 0L) {            newSize = this.skipUtfByteOrderMark();        } else {            /*读取数据到value中。this.in是UncompressedSplitLineReader类型的,在LineRecordReader的initialize方法中初始化了。该类父类为LineReader。*/            //调用 LineRreader 的readline 方法。读一行数据            newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos));            this.pos += (long)newSize;        }        if (newSize == 0 || newSize < this.maxLineLength) {            break;        }        LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - (long)newSize));    }    if (newSize == 0) {        this.key = null;        this.value = null;        return false;    } else {        return true;    }}

可以看到,这里已经看到key和value的踪影了。key就是数据偏移量,value就是通过readLine读取的数据。如果有数据返回true,mapper.run() 通过getKey和getValue对应的KV。下面看看 this.in.readLine,也就是 LineReader.readLine()。

8、LineReader.readLine() 按行读取的reader

//---------------------------LineReader.javapublic int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {    return this.recordDelimiterBytes != null ? this.readCustomLine(str, maxLineLength, maxBytesToConsume) : this.readDefaultLine(str, maxLineLength, maxBytesToConsume);}private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException {    str.clear();    int txtLength = 0;    long bytesConsumed = 0L;    int delPosn = 0;    int ambiguousByteCount = 0;    do {        int startPosn = this.bufferPosn;        if (this.bufferPosn >= this.bufferLength) {            startPosn = this.bufferPosn = 0;            this.bufferLength = this.fillBuffer(this.in, this.buffer, ambiguousByteCount > 0);            if (this.bufferLength <= 0) {                if (ambiguousByteCount > 0) {                    str.append(this.recordDelimiterBytes, 0, ambiguousByteCount);                    bytesConsumed += (long)ambiguousByteCount;                }                break;            }        }        for(; this.bufferPosn < this.bufferLength; ++this.bufferPosn) {            if (this.buffer[this.bufferPosn] == this.recordDelimiterBytes[delPosn]) {                ++delPosn;                if (delPosn >= this.recordDelimiterBytes.length) {                    ++this.bufferPosn;                    break;                }            } else if (delPosn != 0) {                this.bufferPosn -= delPosn;                if (this.bufferPosn < -1) {                    this.bufferPosn = -1;                }                delPosn = 0;            }        }        int readLength = this.bufferPosn - startPosn;        bytesConsumed += (long)readLength;        int appendLength = readLength - delPosn;        if (appendLength > maxLineLength - txtLength) {            appendLength = maxLineLength - txtLength;        }        bytesConsumed += (long)ambiguousByteCount;        if (appendLength >= 0 && ambiguousByteCount > 0) {            //看到这里就很明显了,将数据追加到 value中            str.append(this.recordDelimiterBytes, 0, ambiguousByteCount);            ambiguousByteCount = 0;            this.unsetNeedAdditionalRecordAfterSplit();        }        if (appendLength > 0) {            str.append(this.buffer, startPosn, appendLength);            txtLength += appendLength;        }        if (this.bufferPosn >= this.bufferLength && delPosn > 0 && delPosn < this.recordDelimiterBytes.length) {            ambiguousByteCount = delPosn;            bytesConsumed -= (long)delPosn;        }    } while(delPosn < this.recordDelimiterBytes.length && bytesConsumed < (long)maxBytesToConsume);    if (bytesConsumed > 2147483647L) {        throw new IOException("Too many bytes before delimiter: " + bytesConsumed);    } else {        return (int)bytesConsumed;    }}

上面重要就是读取数据的过程了,过程过于长,抓住关键的看,其实就是将读取的一行数据追加到 this.value中。

9、总结

至此,map的整个输入流程涉及到两个重要的类
InputFormat -- 处理原始数据并切片;创建RecordReader 对象
RecordReader -- 读取切片中的数据,处理成KV,传递KV给map方法处理

这两个都是抽象类:

public abstract class RecordReader implements Closeable {    public RecordReader() {    }    public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;    public abstract boolean nextKeyValue() throws IOException, InterruptedException;    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;    public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;    public abstract float getProgress() throws IOException, InterruptedException;    public abstract void close() throws IOException;}
public abstract class InputFormat {    public InputFormat() {    }    public abstract List getSplits(JobContext var1) throws IOException, InterruptedException;    public abstract RecordReader createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;}

当我们想自定义inputformat类和recordreader类时,就需要继承这两个类,并实现其中的方法。

方法 对象 数据 就是 输出 输入 任务 偏移 文件 运行 也就是 路径 处理 两个 信息 重要 上下 上下文 位置 序列 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 上哪租服务器 天涯明月刀手游什么服务器好 数据库操作系统中间件 qq炫舞服务器断开连接 哪个牌子的服务器好 数据库中索引的负面影响 淘宝打开后显示找不到服务器 软件开发风险承担者有哪些 模拟人生4存档可上传到服务器吗 软件开发项目私活在哪接 怎么进台湾的数据库 腾讯云服务器的防护阀值 网络安全遭受的冲击 网络安全事关国家安全国际 kali服务器攻击 数据库逻辑数据模型有哪几种 网络安全日4月 校园网络安全培训学习体会 山西定制化国产服务器厂家 软件开发外包公司的任务 中化能源互联网科技青岛 模拟人生4存档可上传到服务器吗 金山区上门软件开发业务流程 互联网科技企业财务管理 电脑和网络技术利与弊 在线服务器压力测试工具网站 服务器挂着跑程序 web数据库概述外文翻译 数据库查看表名所代表的 网络安全系统漏洞问题
0