千家信息网

Hadoop2.6.0学习笔记(四)TextInputFormat及RecordReader解析

发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,鲁春利的工作笔记,谁说程序员不能有文艺范?一个最简单的MapReduce程序package com.lucl.hadoop.mapreduce;public class MiniMRDriver ex
千家信息网最后更新 2025年02月05日Hadoop2.6.0学习笔记(四)TextInputFormat及RecordReader解析

鲁春利的工作笔记,谁说程序员不能有文艺范?


一个最简单的MapReduce程序

package com.lucl.hadoop.mapreduce;public class MiniMRDriver extends Configured implements Tool {    public static void main(String[] args) {        try {            ToolRunner.run(new MiniMRDriver(), args);        } catch (Exception e) {            e.printStackTrace();        }    }        @Override    public int run(String[] args) throws Exception {        Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName());        job.setJarByClass(MiniMRDriver.class);                FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));           return job.waitForCompletion(true) ? 0 : 1;    }}

查看MapReduce任务的数据

[hadoop@nnode code]$ hdfs dfs -text /data/HTTP_SITE_FLOW.log视频网站        15      1527信息安全        20      3156站点统计        24      6960搜索引擎        28      3659站点统计        3       1938综合门户        15      1938搜索引擎        21      9531搜索引擎        63      11058[hadoop@nnode code]$

打包运行该MapReduce程序

[hadoop@nnode code]$ hadoop jar MiniMR.jar /data/HTTP_SITE_FLOW.log /20151130211915/11/30 21:19:46 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:803215/11/30 21:19:48 INFO input.FileInputFormat: Total input paths to process : 115/11/30 21:19:48 INFO mapreduce.JobSubmitter: number of splits:115/11/30 21:19:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448889273221_000115/11/30 21:19:50 INFO impl.YarnClientImpl: Submitted application application_1448889273221_000115/11/30 21:19:50 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448889273221_0001/15/11/30 21:19:50 INFO mapreduce.Job: Running job: job_1448889273221_000115/11/30 21:20:26 INFO mapreduce.Job: Job job_1448889273221_0001 running in uber mode : false15/11/30 21:20:26 INFO mapreduce.Job:  map 0% reduce 0/11/30 21:20:59 INFO mapreduce.Job:  map 100% reduce 0/11/30 21:21:30 INFO mapreduce.Job:  map 100% reduce 100/11/30 21:21:31 INFO mapreduce.Job: Job job_1448889273221_0001 completed successfully15/11/30 21:21:31 INFO mapreduce.Job: Counters: 49        File System Counters                FILE: Number of bytes read=254                FILE: Number of bytes written=213863                FILE: Number of read operations=0                FILE: Number of large read operations=0                FILE: Number of write operations=0                HDFS: Number of bytes read=277                HDFS: Number of bytes written=194                HDFS: Number of read operations=6                HDFS: Number of large read operations=0                HDFS: Number of write operations=2        Job Counters                 Launched map tasks=1                Launched reduce tasks=1                Data-local map tasks=1                Total time spent by all maps in occupied slots (ms)=30256                Total time spent by all reduces in occupied slots (ms)=27787                Total time spent by all map tasks (ms)=30256                Total time spent by all reduce tasks (ms)=27787                Total vcore-seconds taken by all map tasks=30256                Total vcore-seconds taken by all reduce tasks=27787                Total megabyte-seconds taken by all map tasks=30982144                Total megabyte-seconds taken by all reduce tasks=28453888        Map-Reduce Framework                Map input records=8                Map output records=8                Map output bytes=232                Map output materialized bytes=254                Input split bytes=103                Combine input records=0                Combine output records=0                Reduce input groups=8                Reduce shuffle bytes=254                Reduce input records=8                Reduce output records=8                Spilled Records=16                Shuffled Maps =1                Failed Shuffles=0                Merged Map outputs=1                GC time elapsed (ms)=182                CPU time spent (ms)=2000                Physical memory (bytes) snapshot=305459200                Virtual memory (bytes) snapshot=1697824768                Total committed heap usage (bytes)=136450048        Shuffle Errors                BAD_ID=0                CONNECTION=0                IO_ERROR=0                WRONG_LENGTH=0                WRONG_MAP=0                WRONG_REDUCE=0        File Input Format Counters                 Bytes Read=174        File Output Format Counters                 Bytes Written=194[hadoop@nnode code]$

查看输出结果

[hadoop@nnode code]$ hdfs dfs -ls /201511302119Found 2 items-rw-r--r--   2 hadoop hadoop          0 2015-11-30 21:21 /201511302119/_SUCCESS-rw-r--r--   2 hadoop hadoop        194 2015-11-30 21:21 /201511302119/part-r-00000[hadoop@nnode code]$ hdfs dfs -text /201511302119/part-r-000000       视频网站        15      152722      信息安全        20      315644      站点统计        24      696066      搜索引擎        28      365988      站点统计        3       1938109     综合门户        15      1938131     搜索引擎        21      9531153     搜索引擎        63      11058[hadoop@nnode code]$


在这里没有指定Mapper类、Reducer类,并通过FileInputFormat和FileOutputFormat指定了输入数据及输出结果存储路径,执行后把行偏移量和行内容保存到了指定的输出路径下。


FileInputFormat的默认实现为TextInputFormat,专门用来处理文本数据,以回车换行符作为一行的分割标记,其中key为该行的行偏移量,value为这一行内容。

类定义如下:

public class TextInputFormat extends FileInputFormat {  @Override  public RecordReader createRecordReader(InputSplit split,                                                   TaskAttemptContext context) {    // 略    return new LineRecordReader(recordDelimiterBytes);  }  @Override  protected boolean isSplitable(JobContext context, Path file) {    // 是否可切片  }}

在Job任务中可以通过public void setInputFormatClass(Class cls)方法设定希望使用的InputFormat格式。

public abstract class InputFormat {    public abstract List getSplits(JobContext context)                         throws IOException, InterruptedException;                                   public abstract RecordReader createRecordReader(InputSplit split,                                         TaskAttemptContext context                                        ) throws IOException, InterruptedException;}

文件在HDFS上是以Block块的形式存储的,而在MapReduce计算中则是以划分的切片(split后称为split分片或chunk)进行读取的,每个split的就对应一个mapper task,split的数量决定了mappertask的数量。

注意:MapReduce是由Mapper和Reducer组成的,MapperTask由split决定,那么Reducer由什么来决定呢?后面会逐渐通过示例代码进行说明


List getSplits(JobContext context)负责将一个大数据逻辑分成多片。比如数据库表有100条数据,按照主键ID升序存储,假设每20条分成一片,这个List的大小就是5,然后每个InputSplit记录两个参数,第一个为这个分片的起始ID,第二个为这个分片数据的大小(这里是20)。InputSplit并没有真正存储数据,只是提供了一个如何将数据分片的方法。

RecordReader


InputSplit类定义

public abstract class InputSplit {    // Split分片的大小,用来实现输入的split的排序    public abstract long getLength() throws IOException, InterruptedException;    // 用来获取存储分片的位置列表    public abstract String[] getLocations() throws IOException, InterruptedException;}


RecordReader类定义

public abstract class RecordReader implements Closeable {    public abstract void initialize(InputSplit split,TaskAttemptContext context                                  ) throws IOException, InterruptedException;    public abstract boolean nextKeyValue() throws IOException, InterruptedException;    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;    public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;    public abstract float getProgress() throws IOException, InterruptedException;    public abstract void close() throws IOException;}

InputSplit描述了数据块的切分方式,RecordReader类则是实际用来加载split分片数据,并把数据转换为适合Mapper类里面map()方法处理的形式。

RecordReader实例是由输入格式定义的,默认的输入格式为TextInputFormat,提供了一个LineRecordReader,把每一行的行偏移量作为key,把内容作为value。RecordReader会在输入块上被反复调用,直到整个输入块被处理完毕,每一次调用RecordReader都会调用Mapper类的map()函数。


TextInputFormat并没有getSplits的实现,而是其父类FileInputFormat进行了实现。

public abstract class FileInputFormat extends InputFormat {    // Generate the list of files and make them into FileSplits    public List getSplits(JobContext job) throws IOException {        // 1. 通过JobContext中获取List;        // 2. 遍历文件属性数据        //    2.1. 如果是空文件,则初始化一个无主机信息的FileSplits实例;        //    2.2. 非空文件,判断是否分片,默认是分片的        //         如果不分片则每个文件作为一个FileSplit        //         计算分片大小splitSize                // getFormatMinSplitSize()返回固定值1        // getMinSplitSize(job)通过Configuration获取,配置参数为(mapred-default.xml):        // mapreduce.input.fileinputformat.split.minsize默认值为0        // minSize的值为1        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));        // 实际调用context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);        // 通过Configuration获取,配置参数为(mapred-default.xml无该参数):        // mapreduce.input.fileinputformat.split.maxsize        // 未配置该参数,取Long.MAX_VALUE,maxSize的值为Long.MAX_VALUE        long maxSize = getMaxSplitSize(job);            // generate splits        List splits = new ArrayList();        List files = listStatus(job);        for (FileStatus file: files) {          Path path = file.getPath();     // 在HDFS上的绝对路径          long length = file.getLen();    // 文件的实际大小          if (length != 0) {            BlockLocation[] blkLocations;            if (file instanceof LocatedFileStatus) {              blkLocations = ((LocatedFileStatus) file).getBlockLocations();            } else {              FileSystem fs = path.getFileSystem(job.getConfiguration());              blkLocations = fs.getFileBlockLocations(file, 0, length);            }            if (isSplitable(job, path)) {              // 这里取的是Block块的大小,在2.6里面默认是134217728(即128M)              long blockSize = file.getBlockSize();              // 获取切片大小,computeSplitSize(blockSize, minSize, maxSize)实际调用:              //          1                Long.MAX_VALUE   128M              // Math.max(minSize, Math.min(maxSize,        blockSize));              // split的大小刚好等于block块的大小,为128M              long splitSize = computeSplitSize(blockSize, minSize, maxSize);                  long bytesRemaining = length;   // 取文件的实际大小               // 如果文件的实际大小/splitSize > 1.1(即实际大小大于128M * 1.1)              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {                // getBlockIndex判断is the offset inside this block?                // 第一次length-bytesRemaining的值为0,取block块的第一个复本                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);                splits.add(makeSplit(path, length-bytesRemaining, splitSize,                            blkLocations[blkIndex].getHosts(),                            blkLocations[blkIndex].getCachedHosts()));                bytesRemaining -= splitSize;    // 依次减去分片的大小,对剩余长度再次分片              }                            /**              * 加入有一个300M的文件,设置bytesRemaining = length = 300M;              * 1、判定bytesRemaining / splitSize = 300 / 128 > 1.1              *  makeSplie-->FileSplit(path, length - bytesRemaining = 0, splitSize=128M)              *  bytesRemaining -= splitSize => bytesRemaining = 172M              * 2、判定bytesRemaining / splitSize = 172 / 128 > 1.1              *  makeSplie-->FileSplit(path, length - bytesRemaining = 128, splitSize=128M)              *  bytesRemaining -= splitSize => bytesRemaining = 44M              * 3、判定bytesRemaining / splitSize = 44 / 128 < 1.1              *  while循环结束。              */                  // 多次分片后,最后的数据长度仍不为0但又不足一个分片大小              if (bytesRemaining != 0) {                   int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,                           blkLocations[blkIndex].getHosts(),                           blkLocations[blkIndex].getCachedHosts()));                // 在这里把最后的44M又make了一个分片                // makeSplie-->FileSplit(path, length - bytesRemaining = 256, splitSize=44)              }            } else { // not splitable,就取实际大小              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),                          blkLocations[0].getCachedHosts()));            }          } else {             //Create empty hosts array for zero length files            splits.add(makeSplit(path, 0, length, new String[0]));          }        }        // Save the number of input files for metrics/loadgen        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());                return splits;    }}

说明:List中FileStatus可能为LocatedFileStatus(a FileStatus that includes a file's block locations)。


LineRecordReader提供对文本数据的读取解析,并依次调用Mapper的map()函数传入

个人理解TextInputFormat通过Split将文件逻辑上进行分片,对于每一个分片分别new一个LineRecordReader进行解析处理,解析后的买一行调用一次map()函数,而map task仍是一个。

public class LineRecordReader extends RecordReader {    public void initialize(InputSplit genericSplit,TaskAttemptContext context)     throws IOException {         // 1. 接收split(FileSplit对象)分片,并通过分片解析出:         //     分片起始位置:start = split.getStart();         //     结束位置:end = start + split.getLength();         //     文件位置:在HDFS上的绝对路径final Path file = split.getPath();         // 2. 获取文件的输入流         //     通过FileSystem获取文件,并获取输入流 fileIn = fs.open(file);         // 3. 判定是否为压缩文件,并获取压缩格式         //     CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);         // 4. 计算行偏移量(原始解释如下)         //     If this is not the first split, we always throw away first record         //     because we always (except the last split) read one extra line in         //     next() method.        if (start != 0) {          start += in.readLine(new Text(), 0, maxBytesToConsume(start));        }        this.pos = start;           }        public boolean nextKeyValue() throws IOException {         if (key == null) {    // key-->这里为map task中map()函数的key          key = new LongWritable();         }         key.set(pos);         // 取的是行偏移量         if (value == null) {          value = new Text();         }         // 判定split是否已经读取解析完成,如果未完成的话就读取一行数据         // 通过org.apache.hadoop.util.LineReader的readCustomLine或readDefaultLine读取         //   如果指定了行分隔符则调用readCustomLine;         //   否则默认通过回车换行作为分隔符调用readDefaultLine         newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));         pos += newSize;        // 偏移量加上个读取的行的长度,作为下一行的偏移量    }        /**     * nextKeyValue是一个对split分片依次读入迭代的过程,     * 每次读一行,并从这一行中解析出key和value,并分别赋值,     * 传入到map函数时将该值传入(具体是怎么调用map函数的,后续分析)。     */    @Override    public LongWritable getCurrentKey() {        return key;    }    @Override    public Text getCurrentValue() {        return value;    }        /**    * Get the progress within the split    */    public float getProgress() throws IOException {        if (start == end) {          return 0.0f;        } else {          return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start));        }    }        // 关闭打开的从hdfs的输入流对象    public synchronized void close() throws IOException {        try {          if (in != null) {            in.close();          }        } finally {          if (decompressor != null) {            CodecPool.returnDecompressor(decompressor);          }        }    }}



0