千家信息网

MapReduce如何实现WordCount及其优化

发表于:2025-01-28 作者:千家信息网编辑
千家信息网最后更新 2025年01月28日,这期内容当中小编将会给大家带来有关MapReduce如何实现WordCount及其优化,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。WordCount: 单词计数,
千家信息网最后更新 2025年01月28日MapReduce如何实现WordCount及其优化

这期内容当中小编将会给大家带来有关MapReduce如何实现WordCount及其优化,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

WordCount: 单词计数, 统计文本文件中每一个单词出现的次数

定义Mapper类, 该类继承org.apache.hadoop.mapreduce.Mapper

并重写map()方法

public static class TokenizerMapper extends                        Mapper {                // 定义一个静态成员变量, 并且是不可变的, 避免每一次调用map()方法时, 创建重复对象                private final static IntWritable one = new IntWritable(1);                // 定义一个成员变量, 可变, 每一次调用map()方法时, 只需要调用Text.set()方法赋新值                private Text word = new Text();                public void map(LongWritable key, Text value, Context context)                                throws IOException, InterruptedException {                        String[] words = value.toString().split(" ");                        for (String item : words) {                                word.set(item);                                context.write(word, one);                        }                }        }

定义Reducer类, 该类继承org.apache.hadoop.mapreduce.Reducer

并重写reduce()方法

public static class IntSumReducer extends                        Reducer {                // 定义一个成员变量, 可变, 每一次调用reduce()方法时, 只需要调用IntWritable.set()方法赋新值                private IntWritable result = new IntWritable();                public void reduce(Text key, Iterable values,                                Context context) throws IOException, InterruptedException {                        int sum = 0;                        for (IntWritableval : values) {                                sum += val.get();                        }                        result.set(sum);                        context.write(key, result);                }        }

测试WordCount

public static void main(String[] args) throws Exception {                Configuration conf = new Configuration();                Job job = Job.getInstance(conf);                job.setJarByClass(WordCount.class); // 设置job的主类                job.setMapperClass(TokenizerMapper.class); // 设置Mapper类                // 利用combiner来减少通过shuffle传输的数据量                job.setCombinerClass(IntSumReducer.class); // 设置Combiner类                job.setReducerClass(IntSumReducer.class); // 设置Reducer类                job.setMapOutputKeyClass(Text.class); // 设置map阶段输出Key的类型                job.setMapOutputValueClass(IntWritable.class); // 设置map阶段输出Value的类型                job.setOutputKeyClass(Text.class); // 设置reduce阶段输出Key的类型                job.setOutputValueClass(IntWritable.class); // 设置reduce阶段输出Value的类型                // 设置job输入路径(从main方法参数args中获取)                FileInputFormat.addInputPath(job, new Path(args[0]));                // 设置job输出路径(从main方法参数args中获取)                FileOutputFormat.setOutputPath(job, new Path(args[1]));                job.waitForCompletion(true); // 提交job        }

输入:

words:

hello tomhello jerryhello kittyhello worldhello tom

输出:

hello        5jerry   1kitty   1tom     2world   1

减少对象的创建, 更少的GC, 肯定会带来更快的速度

利用combiner来减少通过shuffle传输的数据量, 这是MapReduce作业调优的关键点之一

上述就是小编为大家分享的MapReduce如何实现WordCount及其优化了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

0