二、MapReduce基本编程规范
[TOC]
一、MapReduce编程基本组成
编写MapReduce的程序有至少三个必不可少的部分:mapper,reducer,driver。可选的有 partitioner,combiner
而且mapper的输入输出、reducer的输入输出都是key value型的,所以要求我们在编写mapper和reducer时,必须实现明确这4个键值对中的8种数据类型,而且必须还是hadoop的可序列化类型。同时还需要注意的是,map的输出其实就是reduce的输入,所以包括的数据类型是一样的。
1、map阶段
编写基本流程
1)自定义map类,需要继承 Mapper这个类
2)继承Mapper 的时候,需要指定输入和输出的键值对中的类型
3)必须重写继承自父类的map() 方法
4)上面重写的map() 方法是每个map task对每一个输入到mapper中的键值对都会调用处理一次。
基本编写实例如下:
/*指定Mapper 这4个类型分别为:LongWritable, Text, Text, IntWritable,相当于普通类型:long,string,string,int*/public class TestMapper extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 这里是map方法 处理逻辑 }}
2、reduce阶段
基本编写流程
1)自定义reduce类,需要继承 Reducer这个类
2)继承Reducer的时候,需要指定输入和输出的键值对中的类型
3)必须重写继承自父类的reduce() 方法
4)上面重写的reduce() 方法是每个reduer task对每一个输入到reducer中的键值对都会调用处理一次。
基本编写实例如下:
/*指定Reducer 这4个类型分别为:Text, IntWritable, Text, IntWritable,相当于普通类型:string,int,string,int*/public class TestReducer extends Reducer { protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { 这里是reduce方法 处理逻辑 }}
3、driver阶段
这个部分是用于配置job对象的各种必须配置信息,配置完成后,将job提交给yarn执行
具体配置啥下面直接上例子看好了。主要起到调度map和reduce任务执行的作用
4、partitioner阶段
这个阶段主要是对map阶段的输出进行分区,而map的分区数直接决定reduce task的数量(一般来说是一对一),编写流程如下:
1)自定义分区类,继承 Partitioner
2)继承Partitioner的时候,处理的输入的键值对类型
3)必须重写继承自父类的getPartition() 方法
4)上面重写的getPartition() () 方法是每个maptask对每一个输入的键值对都会调用处理一次。
5)根据分区规则,返回0~n,表示分区格式为0~n
编写案例如下:
public class WordCountPartitioner extends Partitioner { @Override public int getPartition(Text text, IntWritable intWritable, int i) { 判断条件1: return 0; 判断条件2: return 1; ....... return n; }}
5、combiner
combiner不是一个独立的阶段,它其实是包含在map阶段中的。map本身输出的键值对中,每个键值对的value都是1,就算是一样的key,也是独立一个键值对。如果重复的键值对越多,那么将map输出传递到reduce的过程中,就会占用很多带宽资源。优化的方法就是每个map输出时,先在当前map task下进行局部合并汇总,减少重复可以的出现。即
<>king,1> 这种一样的key的,就会合并成 这样就会减少传输的数据量
所以其实由此可以知道,其实combiner的操作和reduce的操作是一样的,只不过一个是局部,一个是全局。简单的做法就是,直接将reducer作为combiner类传入job,如:
job.setCombinerClass(WordCountReducer.class);
我们可以看看这个方法的源码:
public void setCombinerClass(Class extends Reducer> cls) throws IllegalStateException { this.ensureState(Job.JobState.DEFINE); //看到没,那个 Reducer.class this.conf.setClass("mapreduce.job.combine.class", cls, Reducer.class); }
可以清楚看到设置combine class时,可以看到多态的类型设置就是 Reducer 类型的,从这里也可以更加确定 combiner 的操作和 reducer的就是一样的。
二、wordcount编程实例
下面开始用wordcount作为例子编写一个完整的MapReduce程序
1、mapper
public class WordCountMapper extends Mapper { //setup 和 clean 方法不是必须的 @Override protected void setup(Context context) throws IOException, InterruptedException { //最先执行 //System.out.println("this is setup"); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { //执行完map之后执行 //System.out.println("this is cleanup"); } //这里创建一个临时对象,用于保存中间值 Text k = new Text(); IntWritable v = new IntWritable(); /** * * * @param key * @param value * @param context 用于连接map和reduce上下文,通过这个对象传递map的结果给reduce * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //System.out.println("开始map====================="); //1.value是读取到的一行字符串,要将其转换为java中的string进行处理,即反序列化 String line = value.toString(); //2.切分数据 String[] words = line.split(" "); //3.输出map结构, <单词,个数>的形式,写入的时候需将普通类型转为序列化类型 /** * 两种写法: * 1) context.write(new Text(word), new IntWritable(1)); * 缺点:每次都会创建两个对象,最后会造成创建了很多临时对象 * * 2)Text k = new Text(); * IntWritable v = new IntWritable(); * * for { * k.set(word); * v.set(1); * context.write(k, v); * } * * 这种方法好处就是,对象只创建了一次,后续只是通过修改对象内部的值的方式传递,无需重复创建多个对象 */ for (String word:words) { //转换普通类型为可序列化类型 k.set(word); v.set(1); //写入到上下文对象中 context.write(k, v); } }}
2、reducer
public class WordCountReducer extends Reducer { /** * 这里的 Iterable values 之所以是一个可迭代的对象, * 是因为从map传递过来的数据经过合并了,如: * (HDFS,1),(HDFS,1)合并成 (HDFS,[1,1]) 这样的形式,所以value可以通过迭代方式获取其中的值 * */ IntWritable counts = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //1.初始化次数 int count = 0; //2.汇总同一个key中的个数 for (IntWritable value: values) { count += value.get(); } //3.输出reduce counts.set(count); context.write(key, counts); }}
3、driver
public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //这里只是方便在ide下直接运行,如果是在命令行下直接输入输入和输出文件路径即可 args = new String[]{"G:\\test2\\", "G:\\testmap6\\"}; //1.获取配置对象 Configuration conf = new Configuration(); //2.获取job对象 Job job = Job.getInstance(conf); //3.分别给job指定driver,map,reducer的类 job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //4.分别指定map和reduce阶段输出的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //这里可以设置分区类,需要额外编写分区实现类// job.setPartitionerClass(WordCountPartitioner.class);// job.setNumReduceTasks(2); //设置预合并类 //job.setCombinerClass(WordCountReducer.class); //设置inputFormat类,大量小文件优化,不设置默认使用 TextInputFormat job.setInputFormatClass(CombineTextInputFormat.class); CombineTextInputFormat.setMaxInputSplitSize(job,3* 1024 * 1024); CombineTextInputFormat.setMinInputSplitSize(job, 2 * 1024 * 1024); //5.数据输入来源以及结果的输出位置 // 输入的时候会根据数据源的情况自动map切片,形成切片信息(或者叫切片方案) FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //以上就是将一个job的配置信息配置完成后,下面就提交job,hadoop将跟就job的配置执行job //6.提交job任务,这个方法相当于 job.submit()之后,然后等待执行完成 //任务配置信息是提交至yarn的 MRappmanager job.waitForCompletion(true); }}