千家信息网

五、MapReduce普通排序例子--统计手机号流量

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,1、需求统计每一个手机号的总流量(上行流量+下行流量)、上行流量、下行流量,并且最后按照总流量进行手机号的排序。****2、数据输入及输出格式源数据比较敏感,这里就不展示出来了输入格式为:时间戳、电话
千家信息网最后更新 2025年02月03日五、MapReduce普通排序例子--统计手机号流量

1、需求

统计每一个手机号的总流量(上行流量+下行流量)、上行流量、下行流量,并且最后按照总流量进行手机号的排序。****

2、数据输入及输出格式

源数据比较敏感,这里就不展示出来了

输入格式为:

时间戳、电话号码、基站的物理地址、访问网址的ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码分隔符为"\t"

输出格式为:

手机号码        上行流量        下行流量        总流量并且根据总流量的大小进行排序

3、思路分析

map阶段:
切分字段,以手机号为key,value为一个bean对象,value保存对应手机号的上下行流量、以及总流量;key保存手机号,也就是类似的结构:

<1234567,<上下行流量,总流量>>

reduce阶段:
对于同一个key的(即同一手机号)的上下行流量进行累加,获取总的上下行流量、总流量。
并且最后需要对总流量进行排序,所以reduce输出的key为整个bean,value为空

4、具体程序

FlowBean.java

/*用于保存流量数据的自定义可序列化类*/package PhoneData;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Getter@Setter@NoArgsConstructorpublic class FlowBean implements WritableComparable {    /**  该类是一个可序列化类,且可比较,所以要实现 WritableComparable接口     * 上传、下载、总流量     */    private int upFlow;    private int downFlow;    private int sumFlow;    public FlowBean(int upFlow, int downFlow) {        super();        this.upFlow = upFlow;        this.downFlow = downFlow;        this.sumFlow = upFlow + downFlow;    }    /**     * 序列化方法     *     * @param dataOutput     * @throws IOException     */    @Override    public void write(DataOutput dataOutput) throws IOException {        dataOutput.writeInt(this.upFlow);        dataOutput.writeInt(this.downFlow);        dataOutput.writeInt(this.sumFlow);    }    /**     * 反序列化     * @param dataInput     * @throws IOException     */    @Override    public void readFields(DataInput dataInput) throws IOException {        this.upFlow = dataInput.readInt();        this.downFlow = dataInput.readInt();        this.sumFlow = dataInput.readInt();    }    /**     * 打印字符串方法     * @return     */    @Override    public String toString() {        StringBuilder sb = new StringBuilder();        sb.append(this.upFlow);        sb.append(" ");        sb.append(this.downFlow);        sb.append(" ");        sb.append(this.sumFlow);        return sb.toString();    }    /**     * 对象的比较方法,用于排序比较     * @param o     * @return     */    @Override    public int compareTo(FlowBean o) {        return this.getSumFlow() > o.getSumFlow() ? -1 : 1;    }}

mapper:

package PhoneData;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PhoneMapper extends Mapper {    Text k = new Text();    FlowBean v = new FlowBean();    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        String line = value.toString();        String[] fields = line.split("\t");        //开始解析切割数据        k.set(fields[1]);        int downFlow = Integer.parseInt(fields[fields.length - 2]);        int upFlow = Integer.parseInt(fields[fields.length - 3]);        v.setDownFlow(downFlow);        v.setUpFlow(upFlow);        v.setSumFlow(upFlow + downFlow);        context.write(k, v);    }}

reducer:

package PhoneData;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PhoneReducer extends Reducer {    FlowBean v = new FlowBean();    @Override    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {        int upFlow = 0;        int downFlow = 0;        int sumFlow = 0;        //对上传、下载、总流量进行累加        for (FlowBean f : values) {            upFlow += f.getUpFlow();            downFlow += f.getDownFlow();            sumFlow += f.getSumFlow();        }        //将汇总的数据写到新的bean中,然后输出        v.setUpFlow(upFlow);        v.setDownFlow(downFlow);        v.setSumFlow(sumFlow);        context.write(v, key);    }}

driver:

package PhoneData;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.BZip2Codec;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class PhoneDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        args = new String[]{"G:\\test\\A\\phone_data.txt", "G:\\test\\A\\phonetest5\\"};        Configuration conf = new Configuration();        //获取job对象        Job job = Job.getInstance(conf);        //配置driver,map,reduce类        job.setJarByClass(PhoneDriver.class);        job.setMapperClass(PhoneMapper.class);        job.setReducerClass(PhoneReducer.class);        //指定map和reduce的输出类        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(FlowBean.class);        job.setOutputKeyClass(FlowBean.class);        job.setOutputValueClass(Text.class);        //指定输入数据,输出路径        FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        //提交job        job.waitForCompletion(true);    }}
0