MapReduce的典型编程场景1
接下来通过一个实际的案例,介绍在MR编程中的,partition、sort、combiner。
流量统计项目案例
数据样本:
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4
2052.flash3-http.qq.com 综合门户 15 12 1938 2910 200
字段介绍:
需求:
1、 统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量
2、 得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序
3、 将流量汇总统计结果按照手机归属地不同省份输出到不同文件中
需求一:统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量
通过需求分析 ,我们可以知道这里可以使用combiner这个优化组件,它的作用是在 maptask之后给 maptask 的结果进行局部汇总,以减轻 reducetask 的计算负载,减少网络传输。
使用方式:Combiner 和 Reducer 一样,编写一个类,然后继承 Reducer,reduce 方法中写具体的 Combiner逻辑,然后在 job 中设置 Combiner 组件:job.setCombinerClass(FlowSumCombine.class)
代码实现:
public class FlowSum { //job public static void main(String[] args) { Configuration conf = new Configuration(true); conf.set("fs.defaultFS", "hdfs://zzy:9000"); conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); System.setProperty("HADOOP_USER_NAME", "hadoop"); try { Job job = Job.getInstance(conf); job.setJobName("FlowSum"); //设置任务类 job.setJarByClass(FlowSum.class); //设置Mapper Reducer Combine job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setCombinerClass(FlowSumCombine.class); //设置map 和reduce 的输入输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 指定该 mapreduce 程序数据的输入和输出路径 Path input=new Path("/data/input"); Path output =new Path("/data/output"); //一定要保证output不存在 if(output.getFileSystem(conf).exists(output)){ output.getFileSystem(conf).delete(output,true); //递归删除 } FileInputFormat.addInputPath(job,input); FileOutputFormat.setOutputPath(job,output); // 最后提交任务 boolean success = job.waitForCompletion(true); System.exit(success?0:-1); } catch (Exception e) { e.printStackTrace(); } } //Mapper private class MyMapper extends Mapper { //统计每一个用户(手机号)所耗费的总上行流量、总下行流量,总流量 //1363157984040 // 13602846565 // 5C-0E-8B-8B-B6-00:CMCC // 120.197.40.4 //2052.flash3-http.qq.com // 综合门户 // 15 // 12 // 1938 上行流量 // 2910 下行流量 // 200 Text mk=new Text(); Text mv=new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\\s+"); String phone = fields[0]; String upFlow = fields[8]; String downFlow = fields[9]; mk.set(phone); mv.set(upFlow+"_"+downFlow); context.write(mk,mv); } } //Combiner private class FlowSumCombine extends Reducer { Text rv=new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int upFlowSum=0; int downFlowSum=0; int upFlow = 0; int downFlow =0; for(Text value:values){ String fields[]=value.toString().split("_"); upFlow=Integer.parseInt(fields[0]); downFlow=Integer.parseInt(fields[1]); upFlowSum+=upFlow; downFlowSum+=downFlow; } rv.set(upFlowSum+"_"+downFlowSum); context.write(key,rv); } } //Reducer private class MyReducer extends Reducer { Text rv=new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int upFlowSum=0; int downFlowSum=0; int upFlow = 0; int downFlow =0; for(Text value:values){ String fields[]=value.toString().split("_"); upFlow=Integer.parseInt(fields[0]); downFlow=Integer.parseInt(fields[1]); upFlowSum+=upFlow; downFlowSum+=downFlow; } rv.set(upFlowSum+"\\t"+downFlowSum); context.write(key,rv); } }}
使用 Combiner 注意事项
- Combiner 的输出 kv 类型应该跟 Reducer 的输入 kv 类型对应起来
- Combiner 的输入 kv 类型应该跟 Mapper 的输出 kv 类型对应起来
- Combiner 的使用要非常谨慎,因为 Combiner 在MapReduce 过程中可能调用也可能不调用,可能调一次也可能调多次
- Combiner 使用的原则是:有或没有都不能影响业务逻辑,都不能影响最终结果
需求二:得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序
分析:如果是在得出上行流量和下行流量之后,实现倒叙排序呢,之前在java中,如果想让对象按照自定义的规则排序,那么就需要自定义对象并且实现它的比较器。MR也可以,在MR运行过程中,如果有Reducer阶段的话,那么是一定会排序的,根据对象的比较器,进行排序,将排序结果相同的key分到一个reduceTask中。
实现:这里我们可以使用hadoop自定义WritableComparable来自定义对象,并且实现它的比较器。
代码实现(注意:这里的是对需求一实现之后的结果进行处理的):
自定义对象,实现比较器:
public class FlowBean implements WritableComparable { private String phone; private long upFlow; private long downFlow; private long sumFlow; // 序列化框架在反序列化操作创建对象实例时会调用无参构造 public FlowBean(){ } public void set(String phone, long upFlow, long downFlow){ this.phone=phone; this.upFlow=upFlow; this.downFlow=downFlow; this.sumFlow=upFlow+downFlow; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } // 序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.phone); out.writeLong(this.upFlow); out.writeLong(this.downFlow); out.writeLong(this.sumFlow); } /* 反序列化方法,这里注意,上面的write中是什么顺写,这里就要什么顺序读取 要保证字段数据类型一一对应 */ @Override public void readFields(DataInput in) throws IOException { this.phone=in.readUTF(); this.upFlow=in.readLong(); this.downFlow=in.readLong(); this.sumFlow=in.readLong(); } //这里就是要实现的比较方法 // 0 表示相等, 1表示大于 负数表示小于 @Override public int compareTo(FlowBean o) { //倒叙输出的话就是,参数的属性-类中的属性 return (int)(o.sumFlow-this.sumFlow); }}
MR程序:
public class FlowSumSort { //job public static void main(String[] args) { Configuration conf=new Configuration(true); conf.set("fs.defaultFS","hdfs://zzy:9000"); conf.set("fs.defaultFS", "hdfs://zzy:9000"); conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); System.setProperty("HADOOP_USER_NAME", "hadoop"); try { Job job= Job.getInstance(conf); job.setJarByClass(FlowSumSort.class); job.setJobName("FlowSumSort"); job.setMapperClass(Mapper.class); job.setReducerClass(Reducer.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(NullWritable.class); // 指定该 mapreduce 程序数据的输入和输出路径 Path input=new Path("//data/output"); Path output =new Path("/data/output1"); //一定要保证output不存在 if(output.getFileSystem(conf).exists(output)){ output.getFileSystem(conf).delete(output,true); //递归删除 } FileInputFormat.addInputPath(job,input); FileOutputFormat.setOutputPath(job,output); boolean success=job.waitForCompletion(true); System.exit(success?0:1); } catch (Exception e) { e.printStackTrace(); } } //Mapper private class MyMapper extends Mapper { FlowBean bean=new FlowBean(); NullWritable mv=NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\\s+"); String phone=fields[0]; long upflow= Long.parseLong(fields[1]); long downflow= Long.parseLong(fields[2]); bean.set(phone,upflow,downflow); context.write(bean,mv); } } //Reducer private class MyReducer extends Reducer{ @Override protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException { for(NullWritable value:values){ context.write(key,value); } } }}
注意:虽然这里reduce阶段只是做了一个输出,没有任何业务操作,但是不能不使用reduce,因为MR的排序就是在MapTask运行完成之后,向reduceTask端输出数据的时候,才会进行排序,如果没有Reduce阶段就不会有排序。
需求三:根据归属地输出流量统计数据结果到不同文件,以便于在查询统计结果时可以定位到省级范围进行
分析:默认的在运行MR程序的时候,只运行一个ReducerTask,默认额的一个ReducerTask会有一个输出文件,那么只要自定义分区规则,并且设置好ReducerTask的个数,就可以完成以上需求,默认的MR的分区规则是,key的hashcode%分区数。
代码实现:
自定义分区:
//这里的两个泛型,是Mpapper端输出的key和value的类型 private static class MyPartitioner extends Partitioner { //自定义的分区规则 private static HashMap provincMap = new HashMap(); static { provincMap.put("138", 0); provincMap.put("139", 1); provincMap.put("136", 2); provincMap.put("137", 3); provincMap.put("135", 4); } @Override public int getPartition(Text text, FlowBean flowBean, int numPartitions) { //千万注意,这里的返回值,一定不能大于numPartitions,否则会报错 Integer code = provincMap.get(text.toString().substring(0, 3)); if(code!=null){ return code; } return 5; } }
job:
//指定分区规则,和分区个数 job.setPartitionerClass(MyPartitioner.class); job.setNumReduceTasks(5);
注意:
- 在使用分区时,一定注意,分区的返回值一定不能大于设置的reduceTask个数
- 虽然设置多个ReduceTask可以增加并行度,但是也不需要设置太多,如果某个ReduceTask中没有数据,那么这个ReduceTask就是空跑,浪费资源
- 尽量的在设置分区时是从0开始的连续的整数