千家信息网

十、MapReduce--InputFormat以及RecordReader抽象类

发表于:2024-11-16 作者:千家信息网编辑
千家信息网最后更新 2024年11月16日,一、基本原理​ 在map执行之前,需要将数据进行切片,每个切片对应一个map任务。而每个map任务并不是直接处理这些切片数据的,它是处理KV的。所以问题有两个:数据是如何切片的、切片是如何转为KV给m
千家信息网最后更新 2024年11月16日十、MapReduce--InputFormat以及RecordReader抽象类

一、基本原理

​ 在map执行之前,需要将数据进行切片,每个切片对应一个map任务。而每个map任务并不是直接处理这些切片数据的,它是处理KV的。所以问题有两个:数据是如何切片的、切片是如何转为KV给map处理的。
​ 这就涉及到两个抽象类,InputFormat以及 RecordReader。具体为什么是这两个抽象类,请看之前input的源码分析

1、InputFormat

public abstract class InputFormat {    public InputFormat() {    }    public abstract List getSplits(JobContext var1) throws IOException, InterruptedException;    public abstract RecordReader createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;}

我们看到,这个抽象类就两个方法
getSplits:看名字就知道是用来将数据处理成切片的了
createRecordReader:就是用来创建RecordReader对象的。
所以这就是一个InputFormat基本的功能

2、 RecordReader

public abstract class RecordReader implements Closeable {    public RecordReader() {    }    //初始化,一般就是读取切片的数据    public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;    //检查是否还有下一对KV,并且如果有,实际上会将其处理成KV,并赋值给this.key和this.value    public abstract boolean nextKeyValue() throws IOException, InterruptedException;    //返回一个key    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;    //返回一个value    public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;    //返回是否在处理    public abstract float getProgress() throws IOException, InterruptedException;    //关闭reader    public abstract void close() throws IOException;}

这个抽象类就涉及到读取切片的数据,处理成KV结构。而在input源码分析中说到,mapper.run方法中通过 context.getCurrentKey() 类似的方法获取key其实就是调用这个RecordReader中的这些get方法而已。

3、InputFormat以及 RecordReader的关系

从上面的源码可以看到。
InputFormat:负责规划切片信息,以及创建RecordReader对象
RecordReader:负责按照切片规划去读取当前mapper处理的切片数据,并将其处理成KV形式,然后通过context传递给mapper。

二、InputFormat以及 RecordReader常用实现类

常用的有:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat(自定义有另外的文章讲)

1、TextInputFormat

​ 这是默认的InputFormat,切片方式是按数据块的方式切割,默认大小block大小。一个文件至少是一个切片(无论多小)。因为这个类继承FileInputFormat,使用的是其父类定义的getsplit() 方法进行切片。
​ 使用的RecordReader是LineRecordReader。处理切片成KV时,每条记录是一行输入。键K是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不包括任何行终止符(换行符和回车符)。

2、KeyValueTextInputFormat

​ 这个类也是使用父类FileInputFormat的getsplit() 方法进行切片,所以切片方式和上面一致。
​ 使用的RecordReader是KeyValueLineRecordReader。每一行均为一条记录,被分隔符分割为key,value。可以通过在驱动类中设置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");来设定分隔符。默认分隔符是tab(\t)。

3、NLineInputFormat

​ 这个类虽然继承了FileInputFormat,但是自己重写了getSplit方法,使用另外的方式来切片。是按指定的行数来切片,比如5行,那就5行作为一个切片,无论数据大小。通过mapreduce.input.lineinputformat.linespermap 这个参数设置切片行数。
​ 使用的RecordReader是LineRecordReader。和上面类似,不重复说。

4、CombineTextInputFormat

​ 这个类继承于 CombineFileInputFormat,父类继承于FileInputFormat。在CombineFileInputFormat中重写了 getSplits方法。因为FileInputFormat默认无论多小的文件,一个文件至少是一个切片。如果遇到很多小文件,就会导致很多切片。而这里的切片方式就是严格按照大小来切片,会将小文件集合在一起,达到指定大小,才作为一个切片。
​ 使用的RecordReader是CombineFileRecordReader。处理方式和 LineRecordReader类似,只不过切片可能是来自多个文件,读取方式上略显麻烦。

三、设置使用指定的inputformat

job.setInputFormatClass(xxxInputFormat.class);
0