千家信息网

如何理解TopK算法及其实现

发表于:2024-11-23 作者:千家信息网编辑
千家信息网最后更新 2024年11月23日,今天就跟大家聊聊有关如何理解TopK算法及其实现,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。1、问题描述在大数据规模中,经常遇到一类需要求出
千家信息网最后更新 2024年11月23日如何理解TopK算法及其实现

今天就跟大家聊聊有关如何理解TopK算法及其实现,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

1、问题描述

在大数据规模中,经常遇到一类需要求出现频率最高的K个数,这类问题称为"TOPK"问题!例如:统计歌曲中最热门的前10首歌曲,统计访问流量最高的前5个网站等。

2、例如统计访问流量最高的前5个网站:

数据test.data文件:

数据格式解释:域名 上行流量 下行流量

思路:

1、Mapper每解析一行内容,按照"\t"获取各个字段

2、因为URL有很多重复记录,所以将URL放到key(通过分析MapReduce原理),流量放在value

3、在reduce统计总流量,通过TreeMap进行对数据进行缓存,最后一并输出(值得注意的是要一次性输出必须要用到Reduce类的cleanup方法)

程序如下:

Mapper类:

package com.itheima.hadoop.mapreduce.mapper;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Counter;import com.itheima.hadoop.mapreduce.bean.FlowBean;public class TopKURLMapper extends Mapper {    /**     * @param key     *            : 每一行偏移量     * @param value     *            : 每一行的内容     * @param context     *            : 环境上下文     */    @Override    public void map(LongWritable key, Text value, Context context)            throws IOException, InterruptedException {        /**         * 该计数器是org.apache.hadoop.mapreduce.Counter         */        Counter counter = context                .getCounter("ExistProblem", "ExistProblemLine"); // 自定义存在问题的行错误计数器        String line = value.toString(); // 读取一行数据        String[] fields = line.split("\t"); // 获取各个字段,按照\t划分        try {            String url = fields[0]; // 获取URL字段            long upFlow = Long.parseLong(fields[1]); // 获取上行流量(upFlow)字段            long downFlow = Long.parseLong(fields[2]); // 获取下行流量(downFlow)字段            FlowBean bean = new FlowBean(upFlow, downFlow); // 将上行流量和下行流量封装到bean中            Text tUrl = new Text(url); // 将java数据类型转换hadoop数据类型            context.write(tUrl, bean); // 传递的数据较多,封装到bean进行传输(tips:bean传输时需要注意序列化问题)        } catch (Exception e) {            e.printStackTrace();            counter.increment(1); // 记录错误行数        }    }}

Reduce类:

package com.itheima.hadoop.mapreduce.reducer;import java.io.IOException;import java.util.Map.Entry;import java.util.TreeMap;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import com.itheima.hadoop.mapreduce.bean.FlowBean;public class TopKURLReducer extends Reducer {    private TreeMap treeMap = new TreeMap();    /**     * @param key     *            : 每一行相同URL     * @param values     *            : 总流量bean     */    @Override    public void reduce(Text key, Iterable values, Context context)            throws IOException, InterruptedException {        long countUpFlow = 0;        long countDownFlow = 0;        /*         * 1、取出每个bean的总流量 2、统计多个bean的总流量 3、缓存到treeMap中         */        for (FlowBean bean : values) {            countUpFlow += bean.getUpFlow(); // 统计上行流量            countDownFlow += bean.getDownFlow(); // 统计下行总流量        }        // 封装统计的流量        FlowBean bean = new FlowBean(countUpFlow, countDownFlow);        treeMap.put(bean, new Text(key)); // 缓存到treeMap中    }    @Override    public void cleanup(Context context) throws IOException,            InterruptedException {        //遍历缓存        for (Entry entry : treeMap.entrySet()) {            context.write(entry.getKey(), entry.getValue());        }        super.cleanup(context); // 不能动原本的销毁操作    }}

FlowBean类:

package com.itheima.hadoop.mapreduce.bean;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable, Comparable {    private long upFlow;    private long downFlow;    private long maxFlow;    @Override    public String toString() {        return upFlow + "\t" + downFlow + "\t" + maxFlow;    }    /**     * 1、序列化注意的问题,序列化需要默认的构造方法(反射) 2、在readFields()和write()方法中,应该遵循按照顺序写出和读入     */    public FlowBean() {    }    public FlowBean(long upFlow, long downFlow) {        this.upFlow = upFlow;        this.downFlow = downFlow;        this.maxFlow = upFlow + downFlow;    }    public long getUpFlow() {        return upFlow;    }    public void setUpFlow(long upFlow) {        this.upFlow = upFlow;    }    public long getDownFlow() {        return downFlow;    }    public void setDownFlow(long downFlow) {        this.downFlow = downFlow;    }    public long getMaxFlow() {        return maxFlow;    }    public void setMaxFlow(long maxFlow) {        this.maxFlow = maxFlow;    }    @Override    public void readFields(DataInput dataIn) throws IOException {        upFlow = dataIn.readLong();        downFlow = dataIn.readLong();        maxFlow = dataIn.readLong();    }    @Override    public void write(DataOutput dataOut) throws IOException {        dataOut.writeLong(upFlow);        dataOut.writeLong(downFlow);        dataOut.writeLong(maxFlow);    }    @Override    public int compareTo(FlowBean o) {        return this.maxFlow > o.maxFlow ? -1                : this.maxFlow < o.maxFlow ? 1 : 0;    }}

驱动类:

package com.itheima.hadoop.drivers;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import com.itheima.hadoop.mapreduce.bean.FlowBean;import com.itheima.hadoop.mapreduce.mapper.TopKURLMapper;import com.itheima.hadoop.mapreduce.reducer.TopKURLReducer;public class TopKURLDriver extends Configured implements Tool{    @Override    public int run(String[] args) throws Exception {                /**         * 1、创建job作业         * 2、设置job提交的Class         * 3、设置MapperClass,设置ReduceClass         * 4、设置Mapper和Reduce各自的OutputKey和OutputValue类型         * 5、设置处理文件的路径,输出结果的路径         * 6、提交job         */        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);                job.setJarByClass(TopKURLRunner.class);                job.setMapperClass(TopKURLMapper.class);        job.setReducerClass(TopKURLReducer.class);                job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(FlowBean.class);        job.setOutputKeyClass(FlowBean.class);        job.setOutputValueClass(Text.class);                FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job,new Path(args[1]));                //参数true为打印进度        return job.waitForCompletion(true)?0:1;    }}
package com.itheima.hadoop.runner;import org.apache.hadoop.util.ToolRunner;import com.itheima.hadoop.runner.TopKURLRunner;public class TopKURLRunner {    public static void main(String[] args) throws Exception {        int res = ToolRunner.run(new TopKURLRunner(), args);        System.exit(res);    }}

运行命令:hadoop jar topkurl.jar com.itheima.hadoop.drives.TopKURLDriver /test/inputData /test/outputData

运行结果:

看完上述内容,你们对如何理解TopK算法及其实现有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0