千家信息网

十一、MapReduce--自定义Input输入

发表于:2025-01-29 作者:千家信息网编辑
千家信息网最后更新 2025年01月29日,在"MapReduce--input之输入原理"中说到实现定义输入的方法,其实就是继承InputFormat以及 RecordReader实现其中的方法。下面例子讲解操作。1、需求将多个文件合并成一个
千家信息网最后更新 2025年01月29日十一、MapReduce--自定义Input输入

在"MapReduce--input之输入原理"中说到实现定义输入的方法,其实就是继承InputFormat以及 RecordReader实现其中的方法。下面例子讲解操作。

1、需求

将多个文件合并成一个大文件(有点类似于combineInputFormat),并输出。大文件中包括小文件所在的路径,以及小文件的内容。

2、源码

inputFormat

public class SFileInputFormat extends FileInputFormat {    /**     * 是否切片     * @param context     * @param filename     * @return     */    @Override    protected boolean isSplitable(JobContext context, Path filename) {        return false;    }    /**     * 返回读取文件内容的读取器     * @param inputSplit     * @param taskAttemptContext     * @return     * @throws IOException     * @throws InterruptedException     */    @Override    public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {        SRecordReader sRecordReader = new SRecordReader();        sRecordReader.initialize(inputSplit, taskAttemptContext);        return sRecordReader;    }}

RecordReader:

public class SRecordReader extends RecordReader {    private Configuration conf;    private FileSplit split;    //当前分片是否已读取的标志位    private boolean process = false;    private BytesWritable value = new BytesWritable();    /**     * 初始化     * @param inputSplit     * @param taskAttemptContext     * @throws IOException     * @throws InterruptedException     */    @Override    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {        split = (FileSplit)inputSplit;        conf = taskAttemptContext.getConfiguration();    }    /**     * 从分片中读取下一个KV     * @return     * @throws IOException     * @throws InterruptedException     */    @Override    public boolean nextKeyValue() throws IOException, InterruptedException {        if (!process) {            byte[] buffer = new byte[(int) split.getLength()];            //获取文件系统            Path path = split.getPath();            FileSystem fs = path.getFileSystem(conf);            //创建输入流            FSDataInputStream fis = fs.open(path);            //流对接,将数据读取缓冲区            IOUtils.readFully(fis, buffer, 0, buffer.length);            //将数据装载入value            value.set(buffer, 0, buffer.length);            //关闭流            IOUtils.closeStream(fis);            //读完就标志位设置为true,表示已读            process = true;            return true;        }        return false;    }    @Override    public NullWritable getCurrentKey() throws IOException, InterruptedException {        return NullWritable.get();    }    @Override    public BytesWritable getCurrentValue() throws IOException, InterruptedException {        return this.value;    }    @Override    public float getProgress() throws IOException, InterruptedException {        return process? 1 : 0;    }    @Override    public void close() throws IOException {    }}

mapper:

public class SFileMapper extends Mapper {    Text k = new Text();    @Override    protected void setup(Context context) throws IOException, InterruptedException {        FileSplit inputSplit = (FileSplit)context.getInputSplit();        String name = inputSplit.getPath().toString();        k.set(name);    }    @Override    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {        context.write(k, value);    }   }

reducer:

public class SFileReducer extends Reducer {    @Override    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {        context.write(key, values.iterator().next());    }}

driver:

public class SFileDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        args = new String[]{"G:\\test\\date\\A\\order\\", "G:\\test\\date\\A\\order2\\"};        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);        job.setJarByClass(SFileDriver.class);        job.setMapperClass(SFileMapper.class);        job.setReducerClass(SFileReducer.class);        //设置输入和输出类,默认是 TextInputFormat        job.setInputFormatClass(SFileInputFormat.class);        job.setOutputFormatClass(SequenceFileOutputFormat.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(BytesWritable.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(BytesWritable.class);        FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        job.waitForCompletion(true);    }}

自定义的inputformat需要在job中通过 job.setInputFormatClass() 来指定

0