五、MapReduce普通排序例子--统计手机号流量
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,1、需求统计每一个手机号的总流量(上行流量+下行流量)、上行流量、下行流量,并且最后按照总流量进行手机号的排序。****2、数据输入及输出格式源数据比较敏感,这里就不展示出来了输入格式为:时间戳、电话
千家信息网最后更新 2025年02月03日五、MapReduce普通排序例子--统计手机号流量
1、需求
统计每一个手机号的总流量(上行流量+下行流量)、上行流量、下行流量,并且最后按照总流量进行手机号的排序。****
2、数据输入及输出格式
源数据比较敏感,这里就不展示出来了
输入格式为:
时间戳、电话号码、基站的物理地址、访问网址的ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码分隔符为"\t"
输出格式为:
手机号码 上行流量 下行流量 总流量并且根据总流量的大小进行排序
3、思路分析
map阶段:
切分字段,以手机号为key,value为一个bean对象,value保存对应手机号的上下行流量、以及总流量;key保存手机号,也就是类似的结构:
<1234567,<上下行流量,总流量>>
reduce阶段:
对于同一个key的(即同一手机号)的上下行流量进行累加,获取总的上下行流量、总流量。
并且最后需要对总流量进行排序,所以reduce输出的key为整个bean,value为空
4、具体程序
FlowBean.java
/*用于保存流量数据的自定义可序列化类*/package PhoneData;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Getter@Setter@NoArgsConstructorpublic class FlowBean implements WritableComparable { /** 该类是一个可序列化类,且可比较,所以要实现 WritableComparable接口 * 上传、下载、总流量 */ private int upFlow; private int downFlow; private int sumFlow; public FlowBean(int upFlow, int downFlow) { super(); this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } /** * 序列化方法 * * @param dataOutput * @throws IOException */ @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(this.upFlow); dataOutput.writeInt(this.downFlow); dataOutput.writeInt(this.sumFlow); } /** * 反序列化 * @param dataInput * @throws IOException */ @Override public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readInt(); this.downFlow = dataInput.readInt(); this.sumFlow = dataInput.readInt(); } /** * 打印字符串方法 * @return */ @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.upFlow); sb.append(" "); sb.append(this.downFlow); sb.append(" "); sb.append(this.sumFlow); return sb.toString(); } /** * 对象的比较方法,用于排序比较 * @param o * @return */ @Override public int compareTo(FlowBean o) { return this.getSumFlow() > o.getSumFlow() ? -1 : 1; }}
mapper:
package PhoneData;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PhoneMapper extends Mapper { Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); //开始解析切割数据 k.set(fields[1]); int downFlow = Integer.parseInt(fields[fields.length - 2]); int upFlow = Integer.parseInt(fields[fields.length - 3]); v.setDownFlow(downFlow); v.setUpFlow(upFlow); v.setSumFlow(upFlow + downFlow); context.write(k, v); }}
reducer:
package PhoneData;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PhoneReducer extends Reducer { FlowBean v = new FlowBean(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int upFlow = 0; int downFlow = 0; int sumFlow = 0; //对上传、下载、总流量进行累加 for (FlowBean f : values) { upFlow += f.getUpFlow(); downFlow += f.getDownFlow(); sumFlow += f.getSumFlow(); } //将汇总的数据写到新的bean中,然后输出 v.setUpFlow(upFlow); v.setDownFlow(downFlow); v.setSumFlow(sumFlow); context.write(v, key); }}
driver:
package PhoneData;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.BZip2Codec;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class PhoneDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{"G:\\test\\A\\phone_data.txt", "G:\\test\\A\\phonetest5\\"}; Configuration conf = new Configuration(); //获取job对象 Job job = Job.getInstance(conf); //配置driver,map,reduce类 job.setJarByClass(PhoneDriver.class); job.setMapperClass(PhoneMapper.class); job.setReducerClass(PhoneReducer.class); //指定map和reduce的输出类 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(Text.class); //指定输入数据,输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //提交job job.waitForCompletion(true); }}
流量
总流量
手机
数据
手机号
输出
排序
序列
上行
上下
对象
方法
格式
输入
号码
阶段
统计
也就是
分隔符
地址
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库表命名规范td
计算机网络技术适合女孩吗
电子支付软件开发
北京所有互联网科技公司名称
风险数据库应用
软件开发的通俗说法
江苏推荐的软件开发创新服务
pdps数据库可以干什么
服务器安全狗占用cpu较多
杭州创业 his采用数据库
软件开发嵌入数据库
公安部全国网络安全优秀团队
网络安全法制体系
gbase数据库系统当前时间
开展网络安全教育主题班会的摘要
表情包服务器后台管理采集
如何检查服务器的安全配置
昌江软件开发
数据库文件路径什么意思
金山区个性化软件开发规格尺寸
网络安全指数2020
项目经理助理软件开发
四川web前端软件开发靠谱吗
泰州电子网络技术咨询热线
检测服务器硬件好坏的软件
广联达企业数据库建设
b2b订货软件开发
美国工业互联网科技园区
完美服务器异常怎么回事
浙江dns服务器地址