千家信息网

二、MapReduce基本编程规范

发表于:2024-11-11 作者:千家信息网编辑
千家信息网最后更新 2024年11月11日,[TOC]一、MapReduce编程基本组成编写MapReduce的程序有至少三个必不可少的部分:mapper,reducer,driver。可选的有 partitioner,combiner而且ma
千家信息网最后更新 2024年11月11日二、MapReduce基本编程规范

[TOC]

一、MapReduce编程基本组成

编写MapReduce的程序有至少三个必不可少的部分:mapper,reducer,driver。可选的有 partitioner,combiner
而且mapper的输入输出、reducer的输入输出都是key value型的,所以要求我们在编写mapper和reducer时,必须实现明确这4个键值对中的8种数据类型,而且必须还是hadoop的可序列化类型。同时还需要注意的是,map的输出其实就是reduce的输入,所以包括的数据类型是一样的。

1、map阶段

编写基本流程
1)自定义map类,需要继承 Mapper这个类
2)继承Mapper 的时候,需要指定输入和输出的键值对中的类型
3)必须重写继承自父类的map() 方法
4)上面重写的map() 方法是每个map task对每一个输入到mapper中的键值对都会调用处理一次。

基本编写实例如下:

/*指定Mapper 这4个类型分别为:LongWritable, Text, Text, IntWritable,相当于普通类型:long,string,string,int*/public class TestMapper extends Mapper {    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        这里是map方法 处理逻辑    }}

2、reduce阶段

基本编写流程
1)自定义reduce类,需要继承 Reducer这个类
2)继承Reducer的时候,需要指定输入和输出的键值对中的类型
3)必须重写继承自父类的reduce() 方法
4)上面重写的reduce() 方法是每个reduer task对每一个输入到reducer中的键值对都会调用处理一次。

基本编写实例如下:

/*指定Reducer 这4个类型分别为:Text, IntWritable, Text, IntWritable,相当于普通类型:string,int,string,int*/public class TestReducer extends Reducer {    protected void reduce(Text key,                          Iterable values,                          Context context) throws IOException, InterruptedException {        这里是reduce方法 处理逻辑    }}

3、driver阶段

这个部分是用于配置job对象的各种必须配置信息,配置完成后,将job提交给yarn执行
具体配置啥下面直接上例子看好了。主要起到调度map和reduce任务执行的作用

4、partitioner阶段

这个阶段主要是对map阶段的输出进行分区,而map的分区数直接决定reduce task的数量(一般来说是一对一),编写流程如下:
1)自定义分区类,继承 Partitioner
2)继承Partitioner的时候,处理的输入的键值对类型
3)必须重写继承自父类的getPartition() 方法
4)上面重写的getPartition() () 方法是每个maptask对每一个输入的键值对都会调用处理一次。
5)根据分区规则,返回0~n,表示分区格式为0~n

编写案例如下:

public class WordCountPartitioner extends Partitioner {    @Override    public int getPartition(Text text, IntWritable intWritable, int i) {        判断条件1:        return 0;        判断条件2:        return 1;        .......        return n;    }}

5、combiner

combiner不是一个独立的阶段,它其实是包含在map阶段中的。map本身输出的键值对中,每个键值对的value都是1,就算是一样的key,也是独立一个键值对。如果重复的键值对越多,那么将map输出传递到reduce的过程中,就会占用很多带宽资源。优化的方法就是每个map输出时,先在当前map task下进行局部合并汇总,减少重复可以的出现。即

 <>king,1>  这种一样的key的,就会合并成 这样就会减少传输的数据量

所以其实由此可以知道,其实combiner的操作和reduce的操作是一样的,只不过一个是局部,一个是全局。简单的做法就是,直接将reducer作为combiner类传入job,如:

job.setCombinerClass(WordCountReducer.class);

我们可以看看这个方法的源码:

public void setCombinerClass(Class cls) throws IllegalStateException {        this.ensureState(Job.JobState.DEFINE);        //看到没,那个  Reducer.class        this.conf.setClass("mapreduce.job.combine.class", cls, Reducer.class);    }

可以清楚看到设置combine class时,可以看到多态的类型设置就是 Reducer 类型的,从这里也可以更加确定 combiner 的操作和 reducer的就是一样的。

二、wordcount编程实例

下面开始用wordcount作为例子编写一个完整的MapReduce程序

1、mapper

public class WordCountMapper extends Mapper {    //setup 和 clean 方法不是必须的    @Override    protected void setup(Context context) throws IOException, InterruptedException {        //最先执行        //System.out.println("this is setup");    }    @Override    protected void cleanup(Context context) throws IOException, InterruptedException {        //执行完map之后执行        //System.out.println("this is cleanup");    }    //这里创建一个临时对象,用于保存中间值    Text k = new Text();    IntWritable v = new IntWritable();    /**     *     *     * @param key     * @param value     * @param context  用于连接map和reduce上下文,通过这个对象传递map的结果给reduce     * @throws IOException     * @throws InterruptedException     */    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        //System.out.println("开始map=====================");        //1.value是读取到的一行字符串,要将其转换为java中的string进行处理,即反序列化        String line = value.toString();        //2.切分数据        String[] words = line.split(" ");        //3.输出map结构, <单词,个数>的形式,写入的时候需将普通类型转为序列化类型        /**         * 两种写法:         * 1) context.write(new Text(word), new IntWritable(1));         *     缺点:每次都会创建两个对象,最后会造成创建了很多临时对象         *         * 2)Text k = new Text();         *    IntWritable v = new IntWritable();         *         *    for {         *       k.set(word);         *       v.set(1);         *       context.write(k, v);         *    }         *         *    这种方法好处就是,对象只创建了一次,后续只是通过修改对象内部的值的方式传递,无需重复创建多个对象         */        for (String word:words) {            //转换普通类型为可序列化类型            k.set(word);            v.set(1);            //写入到上下文对象中            context.write(k, v);        }    }}

2、reducer

public class WordCountReducer extends Reducer {    /**     * 这里的 Iterable values 之所以是一个可迭代的对象,     * 是因为从map传递过来的数据经过合并了,如:     * (HDFS,1),(HDFS,1)合并成 (HDFS,[1,1]) 这样的形式,所以value可以通过迭代方式获取其中的值     *     */    IntWritable counts = new IntWritable();    @Override    protected void reduce(Text key,                          Iterable values,                          Context context) throws IOException, InterruptedException {        //1.初始化次数        int count = 0;        //2.汇总同一个key中的个数        for (IntWritable value: values) {            count += value.get();        }        //3.输出reduce        counts.set(count);        context.write(key, counts);    }}

3、driver

public class WordCountDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        //这里只是方便在ide下直接运行,如果是在命令行下直接输入输入和输出文件路径即可        args = new String[]{"G:\\test2\\", "G:\\testmap6\\"};        //1.获取配置对象        Configuration conf = new Configuration();        //2.获取job对象        Job job = Job.getInstance(conf);        //3.分别给job指定driver,map,reducer的类        job.setJarByClass(WordCountDriver.class);        job.setMapperClass(WordCountMapper.class);        job.setReducerClass(WordCountReducer.class);        //4.分别指定map和reduce阶段输出的类型        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);         //这里可以设置分区类,需要额外编写分区实现类//        job.setPartitionerClass(WordCountPartitioner.class);//        job.setNumReduceTasks(2);        //设置预合并类        //job.setCombinerClass(WordCountReducer.class);        //设置inputFormat类,大量小文件优化,不设置默认使用 TextInputFormat        job.setInputFormatClass(CombineTextInputFormat.class);        CombineTextInputFormat.setMaxInputSplitSize(job,3* 1024 * 1024);        CombineTextInputFormat.setMinInputSplitSize(job, 2 * 1024 * 1024);        //5.数据输入来源以及结果的输出位置        // 输入的时候会根据数据源的情况自动map切片,形成切片信息(或者叫切片方案)        FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        //以上就是将一个job的配置信息配置完成后,下面就提交job,hadoop将跟就job的配置执行job        //6.提交job任务,这个方法相当于 job.submit()之后,然后等待执行完成        //任务配置信息是提交至yarn的  MRappmanager        job.waitForCompletion(true);    }}
0