千家信息网

MapReduce 初试

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,一、境遇接触Hadoop已经有半年了,从Hadoop集群搭建到Hive、HBase、Sqoop相关组件的安装,甚至Spark on Hive、Phoenix、Kylin这些边缘的项目都有涉及。如果说部
千家信息网最后更新 2025年02月02日MapReduce 初试

一、境遇


接触Hadoop已经有半年了,从Hadoop集群搭建到Hive、HBase、Sqoop相关组件的安装,甚至Spark on Hive、Phoenix、Kylin这些边缘的项目都有涉及。如果说部署,我自认为可以没有任何问题,但是如果说我对于这个系统已经掌握了,我却不敢这么讲,因为至少MapReduce我还没有熟悉,其工作机制也只是一知半解。关于MapReduce的运算,我差不多理解了,但是实际实现现在却只能靠找到的代码,真的是惭愧的很。

于是再也忍不住,一定要有点自己的东西,最起码,写的时候不用去找别人的博客,嗯,找自己的就行。


二、实验


1、实验过程

最开始实验的是最简单的去重MapReduce,在本地文件实验时没有任何问题,但把文件放到HDFS上就怎么也找不到了,究其原因,HDFS上的需要用Hadoop执行jar文件才可以

1)javac输出类到指定目录 dir

javac *.java -d dir

2)jar打包class文件

1,打包指定class文件到target.jar

jar cvf target.jar x1.class x2.class ... xn.class

2,打包指定路径dir下的所有class文件到target.jar

jar cvf target.jar -C dir .

3,打包class文件成可执行jar,程序入口Main函数

jar cvfe tarrget.jar Main -C dir .

Hadoop只需要普通jar即可,不用打包成可执行jar

3)执行jar,主类MapDuplicate

Hadoop jar target.jar MapDuplicate (params)

2、代码分析

1)import类

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;

Configuration类:用来设定Hadoop的参数,如:IP、端口等

Path:用来设定输入输出路径

IntWritable:MapReduce用到的int类型

Text:MapReduce用到的string类型

Job:生成MapReduce任务的主类,任务参数也在此类中设定

Mapper:被继承的Map类

Reducer:被继承的Reduce类

FileInputFormat:输入文件格式

FileOutputFormat:输出文件格式(可改为其它IO类,如数据库)

GenericOptionsParser:解析命令行参数的类

2)代码结构

public class MapDuplicate {    public static class Map extends Mapper<...> { ... }    public static class Reduce extends Reducer<...> { ... }    public static void main(String[] args) throws Ex { ... }}

2)Map类

        public static class Map extends Mapper {                private static Text line = new Text();                public void map(Object key,Text value,Context context)                throws IOException,InterruptedException {                        line = value;                        context.write(line,new Text(""));                }        }

Map类的主要作用是将数据进行统一处理,按照规则给出键值对,为Combine和Reduce等Reduce操作提供标准化数据。从代码上来讲,均继承Mapper类,并实现map函数

Mapper类继承的四个参数,前两个分别是输入数据键和值的类型,一般写Object,Text即可;后两个是输出数据键和值的类型,这两个类型必须和Reduce的输入数据键值类型一致。

所有的Java值类型在送到MapReduce任务前都要转化成对应的值类型:如:String->Text,int->IntWritable,long->LongWritable

Context是Java类与MapReduce任务交互的类,它把Map的键值对传给Combiner或者Reducer,也把Reducer的结果写到HDFS上

3)Reduce类

        public static class Reduce extends Reducer {                public void reduce(Text key,Iterable values,Context context)                throws IOException,InterruptedException {                        context.write(key,new Text(""));                }        }

Reduce有两种操作,Combine和Reduce,都继承Reducer类。前者用于对数据进行预处理,将处理好的数据交给Reduce,可以看成是本地的Reduce,当不需要任何处理时,Combine可以直接用Reduce代替;后者用于对数据进行正式处理,将相同键值的数据合并,每一个Reduce函数过程只处理同一个键(key)的数据。

Reducer类继承的四个参数,前两个是输入数据键和值的类型,必须与Mapper类的输出类型一致(Combine也必须一致,而且Combine输出需要跟Reduce的输入一致,所以Combine输入输出类型必须是相同的);后两个是输出数据键和值的类型,即我们最终得到的结果

4)Main函数

        public static void main(String[] args) throws Exception {                Configuration conf = new Configuration();                conf.set("mapred.job.tracker","XHadoop1:9010");                String[] ioArgs = new String[] {"duplicate_in","duplicate_out"};                String[] otherArgs = new GenericOptionsParser(conf,ioArgs).getRemainingArgs();                if (otherArgs.length != 2) {                        System.err.println("Usage: MapDuplicate  ");                        System.exit(2);                }                Job job = new Job(conf,"MapDuplicate");                job.setJarByClass(MapDuplicate.class);                job.setMapperClass(Map.class);                job.setCombinerClass(Reduce.class);                job.setReducerClass(Reduce.class);                job.setOutputKeyClass(Text.class);                job.setOutputValueClass(Text.class);                FileInputFormat.addInputPath(job,new Path(otherArgs[0]));                FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));                System.exit(job.waitForCompletion(true) ? 0 : 1);        }

首先,必须有Configuration类,通过这个类指定工作的机器

然后,接收参数的语句,这个不解释了

然后,需要有Job类,指定MapReduce处理用到的类,需要指定的有:Mapper类、Combiner类、Reducer类、输出数据键和值类型的类

然后,指定输入数据的路径

然后,等待任务结束并退出


三、总结


这个实验可以说是最简单的MapReduce,但是麻雀虽小五脏俱全。


从原理来讲,MapReduce有以下步骤:

HDFS(Block)->Split->Mapper->Partion->Spill->Sort->Combiner->Merge->Reducer->HDFS

1、HDFS输入数据被分成Split,被Mapper类读取,

2、Mapper读取数据后,将任务进行Partion(分配)

3、如果Map操作内存溢出,需要Spill(溢写)到磁盘上

4、Mapper进行Sort(排序)操作

5、排序之后进行Combine(合并key)操作,可以理解为本地模式Reduce

6、Combine的同时会进行溢出文件的Merge(合并)

7、所有任务完成后将数据交给Reducer进行处理,处理完成写入HDFS

8、从Map任务开始到Reduce任务开始的数据传输操作叫做Shuffle


从编程来讲,MapReduce有以下步骤:

1、编写Mapper类

2、编写Combiner类(可选)

3、编写Reducer类

4、调用过程:参数配置Configuration

指定任务类

指定输入输出格式

指定数据位置

开始任务

以上仅仅是浅层认识,仅供学习参考及备查。


0