MapReduce程序之序列化原理与Writable案例







/** * MapReduce的任意的key和value都必须要实现Writable接口 * MapReduce的任意key必须实现WritableComparable接口,WritableComparable是Writable的增强版 * key还需要实现Comparable的原因在于,对key排序是MapReduce模型中的基本功能 */




/** * A serializable object which implements a simple, efficient, serialization  * protocol, based on {@link DataInput} and {@link DataOutput}. * * 

Any key or value type in the Hadoop Map-Reduce * framework implements this interface.

* *

Implementations typically implement a static read(DataInput) * method which constructs a new instance, calls {@link #readFields(DataInput)} * and returns the instance.

* *



 *     public class MyWritable implements Writable { *       // Some data      *       private int counter; *       private long timestamp; *        *       public void write(DataOutput out) throws IOException { *         out.writeInt(counter); *         out.writeLong(timestamp); *       } *        *       public void readFields(DataInput in) throws IOException { *         counter = in.readInt(); *         timestamp = in.readLong(); *       } *        *       public static MyWritable read(DataInput in) throws IOException { *         MyWritable w = new MyWritable(); *         w.readFields(in); *         return w; *       } *     } * 




/** * A {@link Writable} which is also {@link Comparable}.  * * 

WritableComparables can be compared to each other, typically * via Comparators. Any type which is to be used as a * key in the Hadoop Map-Reduce framework should implement this * interface.

* *

Note that hashCode() is frequently used in Hadoop to partition * keys. It's important that your implementation of hashCode() returns the same * result across different instances of the JVM. Note also that the default * hashCode() implementation in Object does not * satisfy this property.

* *



 *     public class MyWritableComparable implements WritableComparable { *       // Some data *       private int counter; *       private long timestamp; *        *       public void write(DataOutput out) throws IOException { *         out.writeInt(counter); *         out.writeLong(timestamp); *       } *        *       public void readFields(DataInput in) throws IOException { *         counter = in.readInt(); *         timestamp = in.readLong(); *       } *        *       public int compareTo(MyWritableComparable o) { *         int thisValue = this.value; *         int thatValue = o.value; *         return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1)); *       } * *       public int hashCode() { *         final int prime = 31; *         int result = 1; *         result = prime * result + counter; *         result = prime * result + (int) (timestamp ^ (timestamp >>> 32)); *         return result *       } *     } * 










package com.uplooking.bigdata.mr.http;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * MapReduce的任意的key和value都必须要实现Writable接口 * MapReduce的任意key必须实现WritableComparable接口,WritableComparable是Writable的增强版 */public class HttpDataWritable implements Writable {    private long upPackNum;    private long downPackNum;    private long upPayLoad;    private long downPayLoad;    public HttpDataWritable() {    }    public HttpDataWritable(long upPackNum, long downPackNum, long upPayLoad, long downPayLoad) {        this.upPackNum = upPackNum;        this.downPackNum = downPackNum;        this.upPayLoad = upPayLoad;        this.downPayLoad = downPayLoad;    }    public void write(DataOutput out) throws IOException {        out.writeLong(upPackNum);        out.writeLong(downPackNum);        out.writeLong(upPayLoad);        out.writeLong(downPayLoad);    }    public void readFields(DataInput in) throws IOException {        this.upPackNum = in.readLong();        this.downPackNum = in.readLong();        this.upPayLoad = in.readLong();        this.downPayLoad = in.readLong();    }    public long getUpPackNum() {        return upPackNum;    }    public void setUpPackNum(long upPackNum) {        this.upPackNum = upPackNum;    }    public long getDownPackNum() {        return downPackNum;    }    public void setDownPackNum(long downPackNum) {        this.downPackNum = downPackNum;    }    public long getUpPayLoad() {        return upPayLoad;    }    public void setUpPayLoad(long upPayLoad) {        this.upPayLoad = upPayLoad;    }    public long getDownPayLoad() {        return downPayLoad;    }    public void setDownPayLoad(long downPayLoad) {        this.downPayLoad = downPayLoad;    }    @Override    public String toString() {        return upPackNum + "\t" + downPackNum + "\t" +                upPayLoad + "\t" + downPayLoad;    }}



package com.uplooking.bigdata.mr.http;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;public class HttpDataJob {    public static void main(String[] args) throws Exception {        if (args == null || args.length < 2) {            System.err.println("Parameter Errors! Usages: ");            System.exit(-1);        }        Path inputPath = new Path(args[0]);        Path outputPath = new Path(args[1]);        Configuration conf = new Configuration();        String jobName = HttpDataJob.class.getSimpleName();        Job job = Job.getInstance(conf, jobName);        //设置job运行的jar        job.setJarByClass(HttpDataJob.class);        //设置整个程序的输入        FileInputFormat.setInputPaths(job, inputPath);        job.setInputFormatClass(TextInputFormat.class);//就是设置如何将输入文件解析成一行一行内容的解析类        //设置mapper        job.setMapperClass(HttpDataMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(HttpDataWritable.class);        //设置整个程序的输出        // outputpath.getFileSystem(conf).delete(outputpath, true);//如果当前输出目录存在,删除之,以避免.FileAlreadyExistsException        FileOutputFormat.setOutputPath(job, outputPath);        job.setOutputFormatClass(TextOutputFormat.class);        //设置reducer        job.setReducerClass(HttpDataReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(HttpDataWritable.class);        //指定程序有几个reducer去运行        job.setNumReduceTasks(1);        //提交程序        job.waitForCompletion(true);    }    public static class HttpDataMapper extends Mapper {        @Override        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {            String line = v1.toString();            String[] items = line.split(",");            // 获取手机号码            String phoneNum = items[1];            // 获取upPackNum、downPackNum、upPayLoad、downPayLoad            long upPackNum = Long.parseLong(items[6]);            long downPackNum = Long.parseLong(items[7]);            long upPayLoad = Long.parseLong(items[8]);            long downPayLoad = Long.parseLong(items[9]);            // 构建HttpDataWritable对象            HttpDataWritable httpData = new HttpDataWritable(upPackNum, downPackNum, upPayLoad, downPayLoad);            // 写出数据到context            context.write(new Text(phoneNum), httpData);        }    }    public static class HttpDataReducer extends Reducer {        @Override        protected void reduce(Text k2, Iterable v2s, Context context) throws IOException, InterruptedException {            long upPackNum = 0L;            long downPackNum = 0L;            long upPayLoad = 0L;            long downPayLoad = 0L;            // 遍历v2s,计算各个参数的总和            for(HttpDataWritable htd : v2s) {                upPackNum += htd.getUpPackNum();                downPackNum += htd.getDownPackNum();                upPayLoad += htd.getUpPayLoad();                downPayLoad += htd.getDownPayLoad();            }            // 构建HttpDataWritable对象            HttpDataWritable httpData = new HttpDataWritable(upPackNum, downPackNum, upPayLoad, downPayLoad);            // 写出数据到context            context.write(k2, httpData);        }    }}


注意,上面的程序是需要读取命令行的参数输入的,可以在本地的环境执行,也可以打包成一个jar包上传到Hadoop环境的Linux服务器上执行,这里,我使用的是本地环境(我的操作系统是Mac OS),输入的参数如下:

/Users/yeyonghao/data/input/HTTP_20160415143750.dat /Users/yeyonghao/data/output/mr/http/h-1


yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/http/h-1$ cat part-r-0000013480253104 3   3   180 18013502468823 57  102 7335    11034913560439658 33  24  2034    589213600217502 18  138 1080    18685213602846565 15  12  1938    291013660577991 24  9   6960    69013719199419 4   0   240 013726230503 24  27  2481    2468113760778710 2   2   120 12013823070001 6   3   360 18013826544101 4   0   264 013922314466 12  12  3008    372013925057413 69  63  11058   4824313926251106 4   0   240 013926435656 2   4   132 151215013685858 28  27  3659    353815920133257 20  20  3156    293615989002119 3   3   1938    18018211575961 15  12  1527    210618320173382 21  18  9531    241284138413    20  16  4116    1432
