九、MapReduce--input源码分析
当job提交至yarn之后,就会开始调度运行map任务,这里开始讲解map输入的源码分析。
一个map任务的入口就是 MapTask.class 中的run() 方法
1、首先看看MapTask.run() 方法
MapTask.class
//---------------------------------MapTask.javapublic void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; if (this.isMapTask()) { if (this.conf.getNumReduceTasks() == 0) { this.mapPhase = this.getProgress().addPhase("map", 1.0F); } else { this.mapPhase = this.getProgress().addPhase("map", 0.667F); this.sortPhase = this.getProgress().addPhase("sort", 0.333F); } } TaskReporter reporter = this.startReporter(umbilical); boolean useNewApi = job.getUseNewMapper(); //进行map任务的初始化 this.initialize(job, this.getJobID(), reporter, useNewApi); if (this.jobCleanup) { this.runJobCleanupTask(umbilical, reporter); } else if (this.jobSetup) { this.runJobSetupTask(umbilical, reporter); } else if (this.taskCleanup) { this.runTaskCleanupTask(umbilical, reporter); } else { //启动map任务,判断是使用新的还是旧的api if (useNewApi) { this.runNewMapper(job, this.splitMetaInfo, umbilical, reporter); } else { this.runOldMapper(job, this.splitMetaInfo, umbilical, reporter); } this.done(umbilical, reporter); }}
上面重点有两个方法,一个是 this.initialize()以及 this.runNewMapper()。
2、下面看看this.initialize()
//---------------------------------Task.javapublic void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException { //创建task以及job上下文对象 this.jobContext = new JobContextImpl(job, id, reporter); this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter); //将task任务的状态改为正在运行 if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) { this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING); } if (useNewApi) { if (LOG.isDebugEnabled()) { LOG.debug("using new api for output committer"); } //获取job中配置的输出格式类,并通过反射获取该类的Class对象 this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job); //通过outputformat类获取commiter this.committer = this.outputFormat.getOutputCommitter(this.taskContext); } else { this.committer = this.conf.getOutputCommitter(); } //从FileOutputFormat获取任务结果输出路径。 /* 可能有的人会奇怪,为啥mapper这里要获取outputformat 的输出路径。 首先我们要知道,一个MapReduce任务可以只有mapper,而没有reducer的, 那么这时候程序的输出是有mapper直接输出的,这时候自然就需要知道输出的路径,这里就派上用场了 */ Path outputPath = FileOutputFormat.getOutputPath(this.conf); if (outputPath != null) { if (this.committer instanceof FileOutputCommitter) { FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext)); } else { FileOutputFormat.setWorkOutputPath(this.conf, outputPath); } } this.committer.setupTask(this.taskContext); Class extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class); this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf); LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree); if (this.pTree != null) { this.pTree.updateProcessTree(); this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime(); }}
这个方法主要做了一些初始化工作,比如创建上下文对象,获取输出outputFormat类,以及路径等。
3、下面接着看看this.runNewMapper()
//---------------------------------MapTask.javaprivate void runNewMapper(JobConf job, TaskSplitIndex splitIndex, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException { TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter); //通过反射获取job中配置的mapper实现类 Mapper mapper = (Mapper)ReflectionUtils.newInstance(taskContext.getMapperClass(), job); //通过反射获取job中配置的输入格式类,默认是TextInputFormat InputFormat inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); org.apache.hadoop.mapreduce.InputSplit split = null; //获取切片详细信息,传入输出路径以及偏移量作为参数.也就是当前mapper处理的某个切片 split = (org.apache.hadoop.mapreduce.InputSplit)this.getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split); //获取输入的读取数据文件的 RecordReader 的对象,默认inputformat为TextInputFormat,对应默认的RecordReader为LineRecordReader org.apache.hadoop.mapreduce.RecordReader input = new MapTask.NewTrackingRecordReader(split, inputFormat, reporter, taskContext); job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); RecordWriter output = null; //获取RecordWriter输出对象 if (job.getNumReduceTasks() == 0) { output = new MapTask.NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new MapTask.NewOutputCollector(taskContext, job, umbilical, reporter); } MapContext mapContext = new MapContextImpl(job, this.getTaskID(), input, (RecordWriter)output, this.committer, reporter, split); org.apache.hadoop.mapreduce.Mapper.Context mapperContext = (new WrappedMapper()).getMapContext(mapContext); try { //初始化RecordReader中的数据 input.initialize(split, mapperContext); //运行mapper中的run方法,也就是Mapper类中的run方法,开始运行map任务 mapper.run(mapperContext); this.mapPhase.complete(); this.setPhase(Phase.SORT); this.statusUpdate(umbilical); //map运行完,关闭输入、输出流 input.close(); input = null; ((RecordWriter)output).close(mapperContext); output = null; } finally { this.closeQuietly((org.apache.hadoop.mapreduce.RecordReader)input); this.closeQuietly((RecordWriter)output, mapperContext); }}
可以看到,这里就是整个map任务的核心流程,做了以下工作:
(1)获取mapper类对象,下面要执行里面的map方法
(2)获取InputFormat对象,默认是默认inputformat为TextInputFormat
(3)通过InputFormat对象获取RecordReader对象,后面用于读取数据文件
(4)获取用于输出map的结果的RecordWriter对象
(5)获取切片信息,比如切片所在文件的路径,起始偏移量等
(6)初始化切片数据
(7)开始运行mapper中的run()方法
(8)运行完毕,关闭输入流,将结果通过RecordWriter刷写。
(9)刷写完毕后,关闭输入流以及输出流
下面看看其中的核心方法
4、this.getSplitDetails() 获取切片信息
//---------------------------------MapTask.javaprivate T getSplitDetails(Path file, long offset) throws IOException { //获取文件系统对象,并打开文件输出流 FileSystem fs = file.getFileSystem(this.conf); FSDataInputStream inFile = fs.open(file); //跳过指定的偏移量,也就是从指定偏移量的位置开始读取数据,其实就是切片开始的偏移量 inFile.seek(offset); String className = StringInterner.weakIntern(Text.readString(inFile)); Class cls; try { cls = this.conf.getClassByName(className); } catch (ClassNotFoundException var13) { IOException wrap = new IOException("Split class " + className + " not found"); wrap.initCause(var13); throw wrap; } SerializationFactory factory = new SerializationFactory(this.conf); //反序列化方式打开输入流 Deserializer deserializer = factory.getDeserializer(cls); deserializer.open(inFile); T split = deserializer.deserialize((Object)null); long pos = inFile.getPos(); ((Counter)this.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)).increment(pos - offset); inFile.close(); //返回切片经过反序列化之后的可读取对象 return split;}
可以看到这里主要是返回切片的反序列化之后可以读取的信息对象
5、接着看看 input.initialize()
在看这个方法之前,首先我们看看input这个对象是由哪个类创建的。它是由NewTrackingRecordReader 这个类创建的。这是个静态内部类
//---------------------------------MapTask.javastatic class NewTrackingRecordReader extends org.apache.hadoop.mapreduce.RecordReader { private final org.apache.hadoop.mapreduce.RecordReader real; private final org.apache.hadoop.mapreduce.Counter inputRecordCounter; private final org.apache.hadoop.mapreduce.Counter fileInputByteCounter; private final TaskReporter reporter; private final List fsStats; NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split, InputFormat inputFormat, TaskReporter reporter, TaskAttemptContext taskContext) throws InterruptedException, IOException { this.reporter = reporter; this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS); this.fileInputByteCounter = reporter.getCounter(FileInputFormatCounter.BYTES_READ); List matchedStats = null; if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) { matchedStats = Task.getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit)split).getPath(), taskContext.getConfiguration()); } this.fsStats = matchedStats; long bytesInPrev = this.getInputBytes(this.fsStats); //调用job任务中定义的inputformat类中的createRecordReader方法,获取RecordReader对象。返回的是 LineRecordReader this.real = inputFormat.createRecordReader(split, taskContext); long bytesInCurr = this.getInputBytes(this.fsStats); this.fileInputByteCounter.increment(bytesInCurr - bytesInPrev); } ...........}
我们可以看到构造方法中,是调用 inputFormat对象的createRecordReader() 方法来创建RecordReader对象的,上面也说了默认inputFormat为 TextInputFormat。
//---------------------------TextInputFormat.javapublic class TextInputFormat extends FileInputFormat { public TextInputFormat() { } public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) { String delimiter = context.getConfiguration().get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) { recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); } return new LineRecordReader(recordDelimiterBytes); }
可以清楚看到,返回的就是 LineRecordReader 这个reader类。
接着我们继续看 input.initialize()
static class NewTrackingRecordReader extends org.apache.hadoop.mapreduce.RecordReader { public void initialize(org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { long bytesInPrev = this.getInputBytes(this.fsStats); //调用 RecordReader对象的 initialize方法,初始化输入。上面说到默认的是LineRecordReader //this.real已经在上面初始化了,就是LineRecordReader this.real.initialize(split, context); long bytesInCurr = this.getInputBytes(this.fsStats); this.fileInputByteCounter.increment(bytesInCurr - bytesInPrev); }}
可以看到,调用 RecordReader中的 initialize 方法,也就是调用LineRecordReader 中的 initialize() 方法,下面看看
//---------------------------------------LineRecordReader.javapublic void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit)genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647); //获取切片的数据开始位置以及终止位置 this.start = split.getStart(); this.end = this.start + split.getLength(); //获取切片对应的文件的输入流 Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); this.fileIn = fs.open(file); //如果文件有压缩,则用压缩类解压 CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file); //以压缩方式读取切片 if (null != codec) { this.isCompressedInput = true; this.decompressor = CodecPool.getDecompressor(codec); if (codec instanceof SplittableCompressionCodec) { SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream(this.fileIn, this.decompressor, this.start, this.end, READ_MODE.BYBLOCK); this.in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); this.start = cIn.getAdjustedStart(); this.end = cIn.getAdjustedEnd(); this.filePosition = cIn; } else { if (this.start != 0L) { throw new IOException("Cannot seek in " + codec.getClass().getSimpleName() + " compressed stream"); } this.in = new SplitLineReader(codec.createInputStream(this.fileIn, this.decompressor), job, this.recordDelimiterBytes); this.filePosition = this.fileIn; } } else { //无压缩方式读取切片 this.fileIn.seek(this.start); //这里很重要,是真正用于读取数据的类 this.in = new UncompressedSplitLineReader(this.fileIn, job, this.recordDelimiterBytes, split.getLength()); this.filePosition = this.fileIn; } //对起始偏移量进行修正,并赋值给pos这个偏移量 if (this.start != 0L) { this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start)); } this.pos = this.start;}
这里的工作主要是给 RecordReader对象读取文件做初始化工作。主要就是获取切片的输入流对象。
this.in 这里就用于后面读取数据的对象,这里就是完成了这个输入流对象的初始化。
6、接着我们回到3中,看mapper.run() 方法
这个其实就是写的mapper 的run方法:
//------------------------Mapper.java mapper.run(mapperContext);public void run(Mapper.Context context) throws IOException, InterruptedException { this.setup(context); try { //这里循环读取key和value,给map方法处理 //关键在于 context这个对象,从上面runNewApi中可以看到,是MapContextImpl类型的 while(context.nextKeyValue()) { this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { this.cleanup(context); }}
可以看到,这里是个while循环,通过context上下文对象获取KV,然后传入map方法中处理。
7、下面看看 context.nextKeyValue()
从3中可以看到,这个context是 MapContextImpl类型的,看看这个类
//-----------------------MapContextImpl.java.. public class MapContextImpl extends TaskInputOutputContextImpl implements MapContext { private RecordReader reader; private InputSplit split; //构造方法中包括获取 RecordReader对象,以及split public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader reader, RecordWriter writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { super(conf, taskid, writer, committer, reporter); this.reader = reader; this.split = split; } public InputSplit getInputSplit() { return this.split; } //下面都是调用 RecordReader 中的get方法获取key value public KEYIN getCurrentKey() throws IOException, InterruptedException { return this.reader.getCurrentKey(); } public VALUEIN getCurrentValue() throws IOException, InterruptedException { return this.reader.getCurrentValue(); } public boolean nextKeyValue() throws IOException, InterruptedException { //这里就是调用reader 的方法 return this.reader.nextKeyValue(); }}
在它的构造方法中,主要从3中传入了 split切片,以及 RecordReader对象。下面就是三个获取KV的方法,也就是在 mapper.run() 中调用的方法。
下面看看 this.reader.nextKeyValue()
//----------------------------------LineRecordReader.javapublic boolean nextKeyValue() throws IOException { if (this.key == null) { this.key = new LongWritable(); } //设置key为偏移量 this.key.set(this.pos); if (this.value == null) { this.value = new Text(); } int newSize = 0; while(this.getFilePosition() <= this.end || this.in.needAdditionalRecordAfterSplit()) { if (this.pos == 0L) { newSize = this.skipUtfByteOrderMark(); } else { /*读取数据到value中。this.in是UncompressedSplitLineReader类型的,在LineRecordReader的initialize方法中初始化了。该类父类为LineReader。*/ //调用 LineRreader 的readline 方法。读一行数据 newSize = this.in.readLine(this.value, this.maxLineLength, this.maxBytesToConsume(this.pos)); this.pos += (long)newSize; } if (newSize == 0 || newSize < this.maxLineLength) { break; } LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - (long)newSize)); } if (newSize == 0) { this.key = null; this.value = null; return false; } else { return true; }}
可以看到,这里已经看到key和value的踪影了。key就是数据偏移量,value就是通过readLine读取的数据。如果有数据返回true,mapper.run() 通过getKey和getValue对应的KV。下面看看 this.in.readLine,也就是 LineReader.readLine()。
8、LineReader.readLine() 按行读取的reader
//---------------------------LineReader.javapublic int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { return this.recordDelimiterBytes != null ? this.readCustomLine(str, maxLineLength, maxBytesToConsume) : this.readDefaultLine(str, maxLineLength, maxBytesToConsume);}private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { str.clear(); int txtLength = 0; long bytesConsumed = 0L; int delPosn = 0; int ambiguousByteCount = 0; do { int startPosn = this.bufferPosn; if (this.bufferPosn >= this.bufferLength) { startPosn = this.bufferPosn = 0; this.bufferLength = this.fillBuffer(this.in, this.buffer, ambiguousByteCount > 0); if (this.bufferLength <= 0) { if (ambiguousByteCount > 0) { str.append(this.recordDelimiterBytes, 0, ambiguousByteCount); bytesConsumed += (long)ambiguousByteCount; } break; } } for(; this.bufferPosn < this.bufferLength; ++this.bufferPosn) { if (this.buffer[this.bufferPosn] == this.recordDelimiterBytes[delPosn]) { ++delPosn; if (delPosn >= this.recordDelimiterBytes.length) { ++this.bufferPosn; break; } } else if (delPosn != 0) { this.bufferPosn -= delPosn; if (this.bufferPosn < -1) { this.bufferPosn = -1; } delPosn = 0; } } int readLength = this.bufferPosn - startPosn; bytesConsumed += (long)readLength; int appendLength = readLength - delPosn; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } bytesConsumed += (long)ambiguousByteCount; if (appendLength >= 0 && ambiguousByteCount > 0) { //看到这里就很明显了,将数据追加到 value中 str.append(this.recordDelimiterBytes, 0, ambiguousByteCount); ambiguousByteCount = 0; this.unsetNeedAdditionalRecordAfterSplit(); } if (appendLength > 0) { str.append(this.buffer, startPosn, appendLength); txtLength += appendLength; } if (this.bufferPosn >= this.bufferLength && delPosn > 0 && delPosn < this.recordDelimiterBytes.length) { ambiguousByteCount = delPosn; bytesConsumed -= (long)delPosn; } } while(delPosn < this.recordDelimiterBytes.length && bytesConsumed < (long)maxBytesToConsume); if (bytesConsumed > 2147483647L) { throw new IOException("Too many bytes before delimiter: " + bytesConsumed); } else { return (int)bytesConsumed; }}
上面重要就是读取数据的过程了,过程过于长,抓住关键的看,其实就是将读取的一行数据追加到 this.value中。
9、总结
至此,map的整个输入流程涉及到两个重要的类
InputFormat -- 处理原始数据并切片;创建RecordReader 对象
RecordReader -- 读取切片中的数据,处理成KV,传递KV给map方法处理
这两个都是抽象类:
public abstract class RecordReader implements Closeable { public RecordReader() { } public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException; public abstract boolean nextKeyValue() throws IOException, InterruptedException; public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; public abstract float getProgress() throws IOException, InterruptedException; public abstract void close() throws IOException;}
public abstract class InputFormat { public InputFormat() { } public abstract List getSplits(JobContext var1) throws IOException, InterruptedException; public abstract RecordReader createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;}
当我们想自定义inputformat类和recordreader类时,就需要继承这两个类,并实现其中的方法。