千家信息网

MapReduce的典型编程场景1

发表于:2024-11-23 作者:千家信息网编辑
千家信息网最后更新 2024年11月23日,接下来通过一个实际的案例,介绍在MR编程中的,partition、sort、combiner。  流量统计项目案例数据样本:1363157984040 13602846565 5C-0E-8B-8B-
千家信息网最后更新 2024年11月23日MapReduce的典型编程场景1

接下来通过一个实际的案例,介绍在MR编程中的,partition、sort、combiner。

  

流量统计项目案例

数据样本
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4
2052.flash3-http.qq.com 综合门户 15 12 1938 2910 200
字段介绍

需求
1、 统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量
2、 得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序
3、 将流量汇总统计结果按照手机归属地不同省份输出到不同文件中


  
需求一:统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量
  通过需求分析 ,我们可以知道这里可以使用combiner这个优化组件,它的作用是在 maptask之后给 maptask 的结果进行局部汇总,以减轻 reducetask 的计算负载,减少网络传输。
  使用方式:Combiner 和 Reducer 一样,编写一个类,然后继承 Reducer,reduce 方法中写具体的 Combiner逻辑,然后在 job 中设置 Combiner 组件:job.setCombinerClass(FlowSumCombine.class)
代码实现

public class FlowSum {    //job    public static void main(String[] args) {        Configuration conf = new Configuration(true);        conf.set("fs.defaultFS", "hdfs://zzy:9000");        conf.addResource("core-site.xml");        conf.addResource("hdfs-site.xml");        System.setProperty("HADOOP_USER_NAME", "hadoop");        try {            Job job = Job.getInstance(conf);            job.setJobName("FlowSum");            //设置任务类            job.setJarByClass(FlowSum.class);            //设置Mapper  Reducer  Combine            job.setMapperClass(MyMapper.class);            job.setReducerClass(MyReducer.class);            job.setCombinerClass(FlowSumCombine.class);            //设置map 和reduce 的输入输出类型            job.setMapOutputKeyClass(Text.class);            job.setMapOutputValueClass(Text.class);            job.setOutputKeyClass(Text.class);            job.setMapOutputValueClass(Text.class);            // 指定该 mapreduce 程序数据的输入和输出路径            Path input=new Path("/data/input");            Path output =new Path("/data/output");            //一定要保证output不存在            if(output.getFileSystem(conf).exists(output)){                output.getFileSystem(conf).delete(output,true);  //递归删除            }            FileInputFormat.addInputPath(job,input);            FileOutputFormat.setOutputPath(job,output);            // 最后提交任务            boolean success = job.waitForCompletion(true);            System.exit(success?0:-1);        } catch (Exception e) {            e.printStackTrace();        }    }    //Mapper    private class MyMapper extends Mapper {        //统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量        //1363157984040        // 13602846565        // 5C-0E-8B-8B-B6-00:CMCC        // 120.197.40.4        //2052.flash3-http.qq.com        // 综合门户        // 15        // 12        // 1938  上行流量        // 2910  下行流量        // 200        Text mk=new Text();        Text mv=new Text();        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String[] fields = value.toString().split("\\s+");            String phone = fields[0];            String upFlow = fields[8];            String downFlow = fields[9];            mk.set(phone);            mv.set(upFlow+"_"+downFlow);            context.write(mk,mv);        }    }    //Combiner    private class FlowSumCombine extends Reducer {        Text rv=new Text();        @Override        protected void reduce(Text key, Iterable values, Context context)                throws IOException, InterruptedException {            int upFlowSum=0;            int downFlowSum=0;            int  upFlow = 0;            int downFlow =0;            for(Text value:values){                String fields[]=value.toString().split("_");                upFlow=Integer.parseInt(fields[0]);                downFlow=Integer.parseInt(fields[1]);                upFlowSum+=upFlow;                downFlowSum+=downFlow;            }            rv.set(upFlowSum+"_"+downFlowSum);            context.write(key,rv);        }    }    //Reducer    private class MyReducer extends Reducer {        Text rv=new Text();        @Override        protected void reduce(Text key, Iterable values, Context context)                throws IOException, InterruptedException {            int upFlowSum=0;            int downFlowSum=0;            int  upFlow = 0;            int downFlow =0;            for(Text value:values){                String fields[]=value.toString().split("_");                upFlow=Integer.parseInt(fields[0]);                downFlow=Integer.parseInt(fields[1]);                upFlowSum+=upFlow;                downFlowSum+=downFlow;            }            rv.set(upFlowSum+"\\t"+downFlowSum);            context.write(key,rv);        }    }}

  使用 Combiner 注意事项
   - Combiner 的输出 kv 类型应该跟 Reducer 的输入 kv 类型对应起来
   - Combiner 的输入 kv 类型应该跟 Mapper 的输出 kv 类型对应起来
   - Combiner 的使用要非常谨慎,因为 Combiner 在MapReduce 过程中可能调用也可能不调用,可能调一次也可能调多次
   - Combiner 使用的原则是:有或没有都不能影响业务逻辑,都不能影响最终结果

需求二:得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序
   分析:如果是在得出上行流量和下行流量之后,实现倒叙排序呢,之前在java中,如果想让对象按照自定义的规则排序,那么就需要自定义对象并且实现它的比较器。MR也可以,在MR运行过程中,如果有Reducer阶段的话,那么是一定会排序的,根据对象的比较器,进行排序,将排序结果相同的key分到一个reduceTask中。
  实现:这里我们可以使用hadoop自定义WritableComparable来自定义对象,并且实现它的比较器。
代码实现(注意:这里的是对需求一实现之后的结果进行处理的):
自定义对象,实现比较器

public class FlowBean implements WritableComparable {    private String phone;    private long upFlow;    private long downFlow;    private long sumFlow;    // 序列化框架在反序列化操作创建对象实例时会调用无参构造    public FlowBean(){    }    public void set(String phone, long upFlow, long downFlow){        this.phone=phone;        this.upFlow=upFlow;        this.downFlow=downFlow;        this.sumFlow=upFlow+downFlow;    }    public String getPhone() {        return phone;    }    public void setPhone(String phone) {        this.phone = phone;    }    public long getUpFlow() {        return upFlow;    }    public void setUpFlow(long upFlow) {        this.upFlow = upFlow;    }    public long getDownFlow() {        return downFlow;    }    public void setDownFlow(long downFlow) {        this.downFlow = downFlow;    }    public long getSumFlow() {        return sumFlow;    }    public void setSumFlow(long sumFlow) {        this.sumFlow = sumFlow;    }    // 序列化方法    @Override    public void write(DataOutput out) throws IOException {        out.writeUTF(this.phone);        out.writeLong(this.upFlow);        out.writeLong(this.downFlow);        out.writeLong(this.sumFlow);    }    /*        反序列化方法,这里注意,上面的write中是什么顺写,这里就要什么顺序读取        要保证字段数据类型一一对应     */    @Override    public void readFields(DataInput in) throws IOException {        this.phone=in.readUTF();        this.upFlow=in.readLong();        this.downFlow=in.readLong();        this.sumFlow=in.readLong();    }    //这里就是要实现的比较方法    // 0 表示相等, 1表示大于  负数表示小于    @Override    public int compareTo(FlowBean o) {        //倒叙输出的话就是,参数的属性-类中的属性        return (int)(o.sumFlow-this.sumFlow);    }}

MR程序

public class FlowSumSort {    //job    public static void main(String[] args) {        Configuration conf=new Configuration(true);        conf.set("fs.defaultFS","hdfs://zzy:9000");        conf.set("fs.defaultFS", "hdfs://zzy:9000");        conf.addResource("core-site.xml");        conf.addResource("hdfs-site.xml");        System.setProperty("HADOOP_USER_NAME", "hadoop");        try {            Job job= Job.getInstance(conf);            job.setJarByClass(FlowSumSort.class);            job.setJobName("FlowSumSort");            job.setMapperClass(Mapper.class);            job.setReducerClass(Reducer.class);            job.setOutputKeyClass(FlowBean.class);            job.setOutputValueClass(NullWritable.class);            // 指定该 mapreduce 程序数据的输入和输出路径            Path input=new Path("//data/output");            Path output =new Path("/data/output1");            //一定要保证output不存在            if(output.getFileSystem(conf).exists(output)){                output.getFileSystem(conf).delete(output,true);  //递归删除            }            FileInputFormat.addInputPath(job,input);            FileOutputFormat.setOutputPath(job,output);            boolean success=job.waitForCompletion(true);            System.exit(success?0:1);        } catch (Exception e) {            e.printStackTrace();        }    }    //Mapper    private class MyMapper extends Mapper {        FlowBean bean=new FlowBean();        NullWritable mv=NullWritable.get();        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {             String[] fields = value.toString().split("\\s+");             String phone=fields[0];             long upflow= Long.parseLong(fields[1]);             long downflow= Long.parseLong(fields[2]);             bean.set(phone,upflow,downflow);            context.write(bean,mv);        }    }    //Reducer    private class MyReducer extends Reducer{        @Override        protected void reduce(FlowBean key, Iterable values, Context context)                throws IOException, InterruptedException {            for(NullWritable value:values){                context.write(key,value);            }        }    }}

注意:虽然这里reduce阶段只是做了一个输出,没有任何业务操作,但是不能不使用reduce,因为MR的排序就是在MapTask运行完成之后,向reduceTask端输出数据的时候,才会进行排序,如果没有Reduce阶段就不会有排序。

  
需求三:根据归属地输出流量统计数据结果到不同文件,以便于在查询统计结果时可以定位到省级范围进行
分析:默认的在运行MR程序的时候,只运行一个ReducerTask,默认额的一个ReducerTask会有一个输出文件,那么只要自定义分区规则,并且设置好ReducerTask的个数,就可以完成以上需求,默认的MR的分区规则是,key的hashcode%分区数
代码实现
自定义分区

    //这里的两个泛型,是Mpapper端输出的key和value的类型    private static class MyPartitioner extends Partitioner {        //自定义的分区规则        private static HashMap provincMap = new HashMap();        static {            provincMap.put("138", 0);            provincMap.put("139", 1);            provincMap.put("136", 2);            provincMap.put("137", 3);            provincMap.put("135", 4);        }        @Override        public int getPartition(Text text, FlowBean flowBean, int numPartitions) {            //千万注意,这里的返回值,一定不能大于numPartitions,否则会报错            Integer code = provincMap.get(text.toString().substring(0, 3));            if(code!=null){                return code;            }            return 5;        }    }

job

            //指定分区规则,和分区个数            job.setPartitionerClass(MyPartitioner.class);            job.setNumReduceTasks(5);

注意

  • 在使用分区时,一定注意,分区的返回值一定不能大于设置的reduceTask个数
  • 虽然设置多个ReduceTask可以增加并行度,但是也不需要设置太多,如果某个ReduceTask中没有数据,那么这个ReduceTask就是空跑,浪费资源
  • 尽量的在设置分区时是从0开始的连续的整数
0