如何理解TopK算法及其实现
发表于:2024-11-23 作者:千家信息网编辑
千家信息网最后更新 2024年11月23日,今天就跟大家聊聊有关如何理解TopK算法及其实现,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。1、问题描述在大数据规模中,经常遇到一类需要求出
千家信息网最后更新 2024年11月23日如何理解TopK算法及其实现
今天就跟大家聊聊有关如何理解TopK算法及其实现,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
1、问题描述
在大数据规模中,经常遇到一类需要求出现频率最高的K个数,这类问题称为"TOPK"问题!例如:统计歌曲中最热门的前10首歌曲,统计访问流量最高的前5个网站等。
2、例如统计访问流量最高的前5个网站:
数据test.data文件:
数据格式解释:域名 上行流量 下行流量
思路:
1、Mapper每解析一行内容,按照"\t"获取各个字段
2、因为URL有很多重复记录,所以将URL放到key(通过分析MapReduce原理),流量放在value
3、在reduce统计总流量,通过TreeMap进行对数据进行缓存,最后一并输出(值得注意的是要一次性输出必须要用到Reduce类的cleanup方法)
程序如下:
Mapper类:
package com.itheima.hadoop.mapreduce.mapper;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Counter;import com.itheima.hadoop.mapreduce.bean.FlowBean;public class TopKURLMapper extends Mapper{ /** * @param key * : 每一行偏移量 * @param value * : 每一行的内容 * @param context * : 环境上下文 */ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /** * 该计数器是org.apache.hadoop.mapreduce.Counter */ Counter counter = context .getCounter("ExistProblem", "ExistProblemLine"); // 自定义存在问题的行错误计数器 String line = value.toString(); // 读取一行数据 String[] fields = line.split("\t"); // 获取各个字段,按照\t划分 try { String url = fields[0]; // 获取URL字段 long upFlow = Long.parseLong(fields[1]); // 获取上行流量(upFlow)字段 long downFlow = Long.parseLong(fields[2]); // 获取下行流量(downFlow)字段 FlowBean bean = new FlowBean(upFlow, downFlow); // 将上行流量和下行流量封装到bean中 Text tUrl = new Text(url); // 将java数据类型转换hadoop数据类型 context.write(tUrl, bean); // 传递的数据较多,封装到bean进行传输(tips:bean传输时需要注意序列化问题) } catch (Exception e) { e.printStackTrace(); counter.increment(1); // 记录错误行数 } }}
Reduce类:
package com.itheima.hadoop.mapreduce.reducer;import java.io.IOException;import java.util.Map.Entry;import java.util.TreeMap;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import com.itheima.hadoop.mapreduce.bean.FlowBean;public class TopKURLReducer extends Reducer{ private TreeMap treeMap = new TreeMap (); /** * @param key * : 每一行相同URL * @param values * : 总流量bean */ @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long countUpFlow = 0; long countDownFlow = 0; /* * 1、取出每个bean的总流量 2、统计多个bean的总流量 3、缓存到treeMap中 */ for (FlowBean bean : values) { countUpFlow += bean.getUpFlow(); // 统计上行流量 countDownFlow += bean.getDownFlow(); // 统计下行总流量 } // 封装统计的流量 FlowBean bean = new FlowBean(countUpFlow, countDownFlow); treeMap.put(bean, new Text(key)); // 缓存到treeMap中 } @Override public void cleanup(Context context) throws IOException, InterruptedException { //遍历缓存 for (Entry entry : treeMap.entrySet()) { context.write(entry.getKey(), entry.getValue()); } super.cleanup(context); // 不能动原本的销毁操作 }}
FlowBean类:
package com.itheima.hadoop.mapreduce.bean;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable, Comparable{ private long upFlow; private long downFlow; private long maxFlow; @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + maxFlow; } /** * 1、序列化注意的问题,序列化需要默认的构造方法(反射) 2、在readFields()和write()方法中,应该遵循按照顺序写出和读入 */ public FlowBean() { } public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.maxFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getMaxFlow() { return maxFlow; } public void setMaxFlow(long maxFlow) { this.maxFlow = maxFlow; } @Override public void readFields(DataInput dataIn) throws IOException { upFlow = dataIn.readLong(); downFlow = dataIn.readLong(); maxFlow = dataIn.readLong(); } @Override public void write(DataOutput dataOut) throws IOException { dataOut.writeLong(upFlow); dataOut.writeLong(downFlow); dataOut.writeLong(maxFlow); } @Override public int compareTo(FlowBean o) { return this.maxFlow > o.maxFlow ? -1 : this.maxFlow < o.maxFlow ? 1 : 0; }}
驱动类:
package com.itheima.hadoop.drivers;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import com.itheima.hadoop.mapreduce.bean.FlowBean;import com.itheima.hadoop.mapreduce.mapper.TopKURLMapper;import com.itheima.hadoop.mapreduce.reducer.TopKURLReducer;public class TopKURLDriver extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { /** * 1、创建job作业 * 2、设置job提交的Class * 3、设置MapperClass,设置ReduceClass * 4、设置Mapper和Reduce各自的OutputKey和OutputValue类型 * 5、设置处理文件的路径,输出结果的路径 * 6、提交job */ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TopKURLRunner.class); job.setMapperClass(TopKURLMapper.class); job.setReducerClass(TopKURLReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(FlowBean.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //参数true为打印进度 return job.waitForCompletion(true)?0:1; }}
package com.itheima.hadoop.runner;import org.apache.hadoop.util.ToolRunner;import com.itheima.hadoop.runner.TopKURLRunner;public class TopKURLRunner { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new TopKURLRunner(), args); System.exit(res); }}
运行命令:hadoop jar topkurl.jar com.itheima.hadoop.drives.TopKURLDriver /test/inputData /test/outputData
运行结果:
看完上述内容,你们对如何理解TopK算法及其实现有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
流量
数据
统计
问题
一行
内容
字段
总流量
缓存
上行
最高
序列
方法
类型
封装
输出
算法
文件
歌曲
结果
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全可以写什么内容
如何弄个服务器让数据更加安全
美国宣传教育网络安全
设备风险数据库
seer数据库怎么看手术方式
网络安全等级评估几级强制
查看数据库中所有有数据的表结构
mysql数据库热备份
计算机网络技术的实习内容与要求
达梦数据库查看历史命令
长沙软件开发定制平台
软件开发应用件
游戏服务器和客户端
软件开发的研发风险
mc有哪些什么好的纯生存服务器
dna人种数据库
国家负责网络安全和监督
网络安全全球最好的学校
华为企业网络安全
模拟器卡在检查服务器更新状态
linux版服务器管理工具
npv服务器地址是什么意思
数据库dbms实验心得
银行都用哪种数据库
长沙软件开发定制平台
福建企业软件开发系统
徐汇区市场软件开发销售价格
海南省网络技术应用期末试题
网络安全运营服务包括
战塔英雄不同商店服务器不同吗