千家信息网

MapReduce的典型编程场景3

发表于:2024-12-12 作者:千家信息网编辑
千家信息网最后更新 2024年12月12日,1. 自定义InputFormat -数据分类输出  需求:小文件的合并  分析:     - 在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS     - 在业务处理之前,在 HDF
千家信息网最后更新 2024年12月12日MapReduce的典型编程场景3

1. 自定义InputFormat -数据分类输出

  需求:小文件的合并

  分析:

     - 在数据采集的时候,就将小文件或小批数据合成大文件再上传 HDFS
     - 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
     - 在 MapReduce 处理时,可采用 CombineFileInputFormat 提高效率

  实现思路:

     - 编写自定义的InoputFormat
     - 改写 RecordReader,实现一次 maptask 读取一个小文件的完整内容封装到一个 KV 对
     - 在Driver 类中一定要设置使用自定义的 InputFormat: job.setInputFormatClass(WholeFileInputFormat.class)


代码实现

public class MergeDriver {    //job    public static void main(String[] args) {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://hadoop01:9000");        Job job = null;        try {            job = Job.getInstance(conf, "combine small files to bigfile");            job.setJarByClass(MergeDriver.class);            job.setMapperClass(MyMapper.class);            job.setReducerClass(MyReducer.class);            job.setMapOutputKeyClass(NullWritable.class);            job.setMapOutputValueClass(Text.class);            job.setOutputKeyClass(NullWritable.class);            job.setOutputValueClass(Text.class);            //设置自定义输入的类            job.setInputFormatClass(MyMyFileInputForamt.class);            Path input = new Path("/hadoop/input/num_add");            Path output = new Path("/hadoop/output/merge_output1");            //这里使用自定义得我FileInputForamt去格式化input            MyMyFileInputForamt.addInputPath(job,input);            FileSystem fs = FileSystem.get(conf);            if (fs.exists(output)) {                fs.delete(output, true);            }            FileOutputFormat.setOutputPath(job, output);            int status = job.waitForCompletion(true) ? 0 : 1;            System.exit(status);        } catch (Exception e) {            e.printStackTrace();        }    }    //Mapper    static private class MyMapper extends Mapper {        /*            这里的map方法就是每读取一个文件调用一次         */        @Override        protected void map(NullWritable key, Text value, Mapper.Context context)                throws IOException, InterruptedException {            context.write(key, value);        }    }    //Reducer    private static class MyReducer extends Reducer {        @Override        protected void reduce(NullWritable key, Iterable values,                              Reducer.Context context) throws IOException, InterruptedException {            for (Text v : values) {                context.write(key, v);            }        }    }    //RecordReader ,这种这个两个泛型,是map端输入的key和value的类型    private static class MyRecordReader extends RecordReader {        // 输出的value对象        Text map_value = new Text();        // 文件系统对象,用于获取文件的输入流        FileSystem fs;        // 判断当前文件是否已经读完        Boolean isReader = false;        //文件的切片信息        FileSplit fileSplit;        //初始化方法,类似于Mapper中的setup,整个类最开始运行        @Override        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {            //初始化文件系统对象            fs = FileSystem.get(context.getConfiguration());            //获取文件路径            fileSplit = (FileSplit) split;        }        //这个方法,在每次调用map中传入的K-V中,就是在这个方法中给K-V赋值的        @Override        public boolean nextKeyValue() throws IOException, InterruptedException {            //先读取一次            if (!isReader) {                FSDataInputStream input = fs.open(fileSplit.getPath());                //一次性将整个小文件内容都读取出来                byte flush[] = new byte[(int) fileSplit.getLength()];                //将文件内容读取到这个byte数组中                /**                 * 参数一:读取的字节数组                 * 参数二:开始读取的偏移量                 * 参数三:读取的长度                 */                input.readFully(flush, 0, (int) fileSplit.getLength());                isReader = true;                map_value.set(flush);  //将读取的内容,放置在map的value中                //保证能正好读一次,nextKeyValue()第一次返回true正好可以调用一次map,第二次返回false                return isReader;            }            return false;        }        @Override        public NullWritable getCurrentKey() throws IOException, InterruptedException {            return NullWritable.get();        }        @Override        public Text getCurrentValue() throws IOException, InterruptedException {            return map_value;        }        @Override        public float getProgress() throws IOException, InterruptedException {            return 0;        }        @Override        public void close() throws IOException {            fs.close();        }    }    //FileInputFormat    private static class MyMyFileInputForamt extends FileInputFormat {        @Override        public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {            MyRecordReader mr = new MyRecordReader();            //先调用初始化方法            mr.initialize(split, context);            return mr;        }    }}

2. 自定义OutputFormat

  需求:一些原始日志需要做增强解析处理,流程

     - 从原始日志文件中读取数据
     - 根据业务获取业务数据库中的数据
     - 根据某个连接条件获取相应的连接结果

  分析:

     - 在 MapReduce 中访问外部资源
     - 在业务处理之前,在 HDFS 上使用 MapReduce 程序对小文件进行合并
     - 自定义 OutputFormat,改写其中的 RecordWriter,改写具体输出数据的方法 write() CombineFileInputFormat 提高效率


代码实现
//这里以一个简单的案例为例,将文件按照不同的等级输出的不同的文件中

 public class Score_DiffDic {    //job    public static void main(String[] args) {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://hadoop01:9000");        Job job = null;        try {            job = Job.getInstance(conf, "Score_DiffDic");            job.setJarByClass(Score_DiffDic.class);            job.setMapperClass(MyMapper.class);            job.setReducerClass(MyReducer.class);            //设置自定义输出类型            job.setOutputFormatClass(MyOutputFormat.class);            job.setMapOutputKeyClass(Text.class);            job.setMapOutputValueClass(DoubleWritable.class);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(DoubleWritable.class);            Path input = new Path("/hadoop/input/num_add");            FileInputFormat.addInputPath(job,input);            Path output = new Path("/hadoop/output/merge_output1");            //这是自定义输出类型            MyOutputFormat.setOutputPath(job,output);            FileSystem fs = FileSystem.get(conf);            if (fs.exists(output)) {                fs.delete(output, true);            }            FileOutputFormat.setOutputPath(job, output);            int status = job.waitForCompletion(true) ? 0 : 1;            System.exit(status);        } catch (Exception e) {            e.printStackTrace();        }    }    //Mapper    private static class MyMapper extends Mapper{        Text mk=new Text();        DoubleWritable mv=new DoubleWritable();        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {             String[] fields = value.toString().split("\\s+");            //computer,huangxiaoming,85            if(fields.length==3){                mk.set(fields[1]);                mv.set(Double.parseDouble(fields[2]));                context.write(mk, mv);            }        }    }    //Reducer    private static class MyReducer extends Reducer{        DoubleWritable mv=new DoubleWritable();        @Override        protected void reduce(Text key, Iterable values, Context context)                throws IOException, InterruptedException {            double  sum=0;            int count=0;            for(DoubleWritable value:values){                sum+=value.get();                count++;            }            mv.set(sum/count);            context.write(key,mv);        }    }    //FileOutputFormat    private static class MyOutputFormat extends FileOutputFormat {        @Override        public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {            FileSystem fs =FileSystem.get(job.getConfiguration());            return new MyRecordWrite(fs);        }    }    //RecordWriter,这里的两个泛型是Reudcer输出K-V的类型    private static class MyRecordWrite extends RecordWriter {        FileSystem fs;        //输出的文件的路径        Path path2 = new Path("/hadoop/output/score_out1");        Path path3 = new Path("/hadoop/output/score_out2");        FSDataOutputStream output1;        FSDataOutputStream output2;        public MyRecordWrite() {        }        //初始化参数        public MyRecordWrite(FileSystem fs) {            this.fs = fs;            try {                output1=fs.create(path2);                output2=fs.create(path3);            } catch (IOException e) {                e.printStackTrace();            }        }        @Override        public void write(Text key, DoubleWritable value) throws IOException, InterruptedException {            //业务逻辑操作,平均分数大于80的在path2中,其他的在path3中            if(value.get()>80){                output1.write((key.toString()+":"+value.get()+"\n").getBytes());            }else{                output2.write((key.toString()+":"+value.get()+"\n").getBytes());            }        }        @Override        public void close(TaskAttemptContext context) throws IOException, InterruptedException {            fs.close();            output1.close();            output2.close();        }    }}
0