MapReduce怎样实现TopK
发表于:2025-02-14 作者:千家信息网编辑
千家信息网最后更新 2025年02月14日,今天就跟大家聊聊有关MapReduce怎样实现TopK,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。需求: HTTP日志文件中全部流量前80%
千家信息网最后更新 2025年02月14日MapReduce怎样实现TopK
今天就跟大家聊聊有关MapReduce怎样实现TopK,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
需求: HTTP日志文件中全部流量前80%的记录, 按流量值降序排序
输出格式
HTTP日志文件:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 2001363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 2001363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 2001363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 2001363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 2001363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 2001363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 2001363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 2001363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 2001363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 2001363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 2001363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 2001363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 2001363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 2001363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash3-http.qq.com 综合门户 15 12 1938 2910 2001363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 2001363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 2001363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 2001363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 2001363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 2001363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 2001363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
定义FlowBean类,该类实现WritableComparable接口
实现write(), readFields(), compareTo()方法
public class FlowBean implements WritableComparable{ private String phoneNB;// 号码 private long up_flow;// 上行流量 private long down_flow;// 下行流量 private long sum_flow;// 总流量 public String getPhoneNB() { return phoneNB; } public void setPhoneNB(String phoneNB) { this.phoneNB = phoneNB; } public long getUp_flow() { return up_flow; } public void setUp_flow(long up_flow) { this.up_flow = up_flow; } public long getDown_flow() { return down_flow; } public void setDown_flow(long down_flow) { this.down_flow = down_flow; } public long getSum_flow() { return sum_flow; } public void setSum_flow(long sum_flow) { this.sum_flow = sum_flow; } public FlowBean() { } public FlowBean(String phoneNB, long up_flow, long down_flow) { this.phoneNB = phoneNB; this.up_flow = up_flow; this.down_flow = down_flow; this.sum_flow = up_flow + down_flow; } /** * up_flow + "\t" + down_flow + "\t" + sum_flow */ @Override public String toString() { return up_flow + "\t" + down_flow + "\t" + sum_flow; } /** * 序列化, 序列化与反序列化各属性顺序一致 */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNB); out.writeLong(up_flow); out.writeLong(down_flow); out.writeLong(sum_flow); } /** * 反序列化, 反序列化与序列化各属性顺序一致 */ @Override public void readFields(DataInput in) throws IOException { phoneNB = in.readUTF(); up_flow = in.readLong(); down_flow = in.readLong(); sum_flow = in.readLong(); } /** * 按总流量降序排序, 但总流量相等时, 两个FlowBean对象内容并不相等 */ @Override public int compareTo(FlowBean o) { if (sum_flow == o.sum_flow) { return 1; } return -Long.compare(sum_flow, o.sum_flow); }}
定义Mapper类TopKFlowMapper
并重写map方法
public class TopKFlowMapper extends Mapper{ // mapper输出格式: @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] data = StringUtils.split(line, "\t"); String phoneNB = data[1]; long up_flow = Long.parseLong(data[7]); long down_flow = Long.parseLong(data[8]); context.write(new Text(phoneNB), new FlowBean(phoneNB, up_flow, down_flow)); } }
定义Reducer类TopKFlowReducer
并实现reduce(), 重写cleanup()方法
public class TopKFlowReducer extends Reducer{ // 利用TreeMap的排序功能, 将FlowBean对象按总流量降序排序 private Map treeMap = new TreeMap (); private double globalFlow = 0;// 全局流量计数器, 初值值为0 // reducer输入格式: @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long up_sum = 0; long down_sum = 0; for (FlowBean bean : values) { up_sum += bean.getUp_flow(); down_sum += bean.getDown_flow(); } // 每求得一条phoneNB的总流量, 就累加到全局流量计数器globalCount中 globalFlow += (up_sum + down_sum); // 利用TreeMap的排序功能, 将FlowBean对象按总流量降序排序 treeMap.put(new FlowBean("", up_sum, down_sum), key.toString()); } // cleanup方法是在reduce阶段退出前被调用一次 @Override protected void cleanup(Context context) throws IOException, InterruptedException { double itemCount = 0; for (Map.Entry item : treeMap.entrySet()) { if (itemCount > globalFlow * 0.8) { return; } // 只输出全局流量计数器globalCount前80%的记录 context.write(new Text(item.getValue()), new VLongWritable(item.getKey().getSum_flow())); itemCount += item.getKey().getSum_flow(); } } }
测试TopK
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(TopKFlowRunner.class); // 设置job的主类 job.setMapperClass(TopKFlowMapper.class); // 设置Mapper类 job.setReducerClass(TopKFlowReducer.class); // 设置Reducer类 job.setMapOutputKeyClass(Text.class); // 设置map阶段输出Key的类型 job.setMapOutputValueClass(FlowBean.class); // 设置map阶段输出Value的类型 job.setOutputKeyClass(Text.class); // 设置reduce阶段输出Key的类型 job.setOutputValueClass(VLongWritable.class); // 设置reduce阶段输出Value的类型 // 设置job输入路径(从main方法参数args中获取) FileInputFormat.setInputPaths(job, new Path(args[0])); // 设置job输出路径(从main方法参数args中获取) FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); // 提交job }
job输出的结果文件:
13726230503 27162
13726238888 27162
13925057413 11121
18320173382 9549
13502468823 7437
13660577991 6969
13922314466 6728
13560439658 6292
看完上述内容,你们对MapReduce怎样实现TopK有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
输出
序列
总流量
方法
流量
排序
阶段
内容
类型
全局
对象
引擎
搜索引擎
文件
格式
计数器
搜索
一致
功能
参数
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
宁夏护苗网络安全进课堂
葆婴云服务器
如何保护服务器运转
基带软件开发平台叫什么
后厨管理软件开发
什么软件可以网络安全
想学网络安全大赛可以吗
2019高职网络安全竞赛
数据库管理岗位日常工作
分级落实网络安全责任
生化危机怎么搜索服务器
藕飞丝无线网络安全吗
苹果手机的网络安全
新余高性价比服务器哪家可靠
金数据服务器
崇明区网络软件开发协议
珠海农业银行软件开发中心
济南锐盾网络技术有限公司
什么不是传统的ai服务器弊端
服务器电源故障隔离
凯弗克数据库
ar软件开发基础
软件开发联合方式的优点有
银行分行软件开发是干嘛的
白皮书5g网络安全吗
计算机网络技术应用题填表
人民大学金融数据库
域服务器 dns
软件开发成功故事
深夜造访打一网络安全用语