千家信息网

怎么编写不同MapReudce程序

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章主要讲解了"怎么编写不同MapReudce程序",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么编写不同MapReudce程序"吧!1.Ha
千家信息网最后更新 2025年02月04日怎么编写不同MapReudce程序

这篇文章主要讲解了"怎么编写不同MapReudce程序",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"怎么编写不同MapReudce程序"吧!

1.Hadoop的序列化机制

序列化就是把 内存中的对象的状态信息,转换成 字节序列以便于存储(持久化)和网络传输。而反序列化就是将收到 字节序列或者是硬盘的持久化数据,转换成内存中的对象。

其实在Java规范中,已经有了一套序列化的机制,某个面向对象的类实现Serializable接口就能实现序列化与反序列化,但是记得一定要加上序列化版本ID serialVersionUID .可是为什么Hadoop要自主研发序列化机制呢?它对比原生态的有什么特点和区别呢?

JDK在序列化的时候,算法会考虑这些事情:

  1. 将对象实例相关的类元数据输出。

  2. 递归地输出类的超类描述直到不再有超类。

  3. 类元数据完了以后,开始从最顶层的超类开始输出对象实例的实际数据值.

  4. 从上至下递归输出实例的数据

    优点:从上面来看java的序列化确实很强大,序列化后得到的信息也很详细,所以反序列化就变得特别简单.

所以我们只要implements Serializable接口,JDK会自动处理一切,Java的序列化机制相当复杂,能处理各种对象关系。

缺点:Java的序列化机制计算量开销大,且序列化的结果体积太大,有时能达到对象大小的数倍.引用机制也会导致大文件不能分割.

这些缺点对于Hadoop是非常致命的,因为在Hadoop集群之间需要通讯或者是RPC调用的话,需要序列化,而且要求序列化要快,且体积要小,占用带宽要小。所以Hadoop就自个玩了一套.

Hadoop的序列化的特点是:

1 . 紧凑:由于带宽是集群中信息传递的最宝贵的资源所以我们必须想法设法缩小传递信息的大小,hadoop的序列化就 为了更好地坐到这一点而设计的。

2 . 对象可重用:JDK的反序列化会不断地创建对象,这肯定会造成一定的系统开销,但是在hadoop的反序列化中,能重复的利用一个对象的readField方法来重新产生不同的对象。

3 . 可扩展性:Hadoop的序列化有多中选择

a.可以利用实现hadoop框架中的Writable接口。(原生的)

b.使用开源的序列化框架protocol Buffers,Avro等框架。

PS(网络来源):hadoop2.X之后是实现一个叫YARN,所有应用(如mapreduce,或者其他spark实时或者离线的计算框架都可以运行在YARN上),YARN还负责对资源的调度等等。YARN的序列化就是用Google开发的序列化框架protocol Buffers,目前支持支持三种语言C++,java,Python.所以RPC这一层我们就可以利用其他语言来做文章,满足其他语言开发者的需求。

接下来的话就是如何使用序列化机制,Writable介绍如下.

2.Writable接口及其它的实现类

Hadoop原生的序列化,hadoop原生的序列化类需要实现一个叫Writeable的接口,类似于Serializable接口。

还有hadoop也为我们提供了几个序列化类,他们都直接或者间接地实现了Writable接口。如:IntWritable,LongWritable,Text,org.apache.hadoop.io.WritableComparable等等。

实现Writable接口必须实现两个方法:

public void write(DataOutput out) throws IOException ;public void readFields(DataInput in) throws IOException ;

实现WritableComparable接口必须实现三个方法,翻阅该接口的的源码,都已经给出demo了.篇幅原因,自己去看吧

案例1:数据如下图,统计电话号码相同的的上传下载流量和总流量.电话号码,上传流量,下载流量,总流量.(1,lastest-2,lastest-3)

1363157985066       13726230503     00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com                24      27      2481    24681   2001363157995052      13826544101     5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4                    4       0       264     0       2001363157991076      13926435656     20-10-7A-28-CC-0A:CMCC  120.196.100.99                  2       4       132     1512    2001363154400022      13926251106     5C-0E-8B-8B-B1-50:CMCC  120.197.40.4                    4       0       240     0       2001363157993044      18211575961     94-71-AC-CD-E6-18:CMCC-EASY     120.196.100.99  iface.qiyi.com  视频网站    15      12      1527    2106    2001363157995074      84138413        5C-0E-8B-8C-E8-20:7DaysInn      120.197.40.4    122.72.52.12            20      16      4116    1432    2001363157993055      13560439658     C4-17-FE-BA-DE-D9:CMCC  120.196.100.99                  18      15      1116    954     2001363157995033      15920133257     5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   信息安全    20      20      3156    2936    2001363157983019      13719199419     68-A1-B7-03-07-B1:CMCC-EASY     120.196.100.82                  4       0       240     0       2001363157984041      13660577991     5C-0E-8B-92-5C-20:CMCC-EASY     120.197.40.4    s19.cnzz.com    站点统计    24      9       6960    690     2001363157973098      15013685858     5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com       搜索引擎    28      27      3659    3538    2001363157986029      15989002119     E8-99-C4-4E-93-E0:CMCC-EASY     120.196.100.99  www.umeng.com   站点统计    3       3       1938    180     2001363157992093      13560439658     C4-17-FE-BA-DE-D9:CMCC  120.196.100.99                  15      9       918     4938    2001363157986041      13480253104     5C-0E-8B-C7-FC-80:CMCC-EASY     120.197.40.4                    3       3       180     180     2001363157984040      13602846565     5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash3-http.qq.com 综合门户    15      12      1938    2910    2001363157995093      13922314466     00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn              12      12      3008    3720    2001363157982040      13502468823     5C-0A-5B-6A-0B-D4:CMCC-EASY     120.196.100.99  y0.ifengimg.com 综合门户    57      102     7335    110349  2001363157986072      18320173382     84-25-DB-4F-10-1A:CMCC-EASY     120.196.100.99  input.shouji.sogou.com  搜索引擎    21      18      9531    2412    2001363157990043      13925057413     00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜索引擎    69      63      11058   48243   2001363157988072      13760778710     00-FD-07-A4-7B-08:CMCC  120.196.100.82                  2       2       120     120     2001363157985066      13726238888     00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com                24      27      2481    24681   2001363157993055      13560436666     C4-17-FE-BA-DE-D9:CMCC  120.196.100.99                  18      15      1116    954     200
  1. 定义可序列化的JavaBean.com.codewatching.fluxcount.bean.FlowBean

package com.codewatching.fluxcount.bean;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable {        private String phoneNum;        private long upFlow;        private long downFlow;        private long sumFlow;        public FlowBean(){}        public FlowBean(String phoneNum, long upFlow, long downFlow) {                super();                this.phoneNum = phoneNum;                this.upFlow = upFlow;                this.downFlow = downFlow;                this.sumFlow = upFlow+downFlow;        }        public String getPhoneNum() {                return phoneNum;        }        public void setPhoneNum(String phoneNum) {                this.phoneNum = phoneNum;        }        public long getUpFlow() {                return upFlow;        }        public void setUpFlow(long upFlow) {                this.upFlow = upFlow;        }        public long getDownFlow() {                return downFlow;        }        public void setDownFlow(long downFlow) {                this.downFlow = downFlow;        }        public long getSumFlow() {                return sumFlow;        }        public void setSumFlow(long sumFlow) {                this.sumFlow = sumFlow;        }        @Override        public void write(DataOutput out) throws IOException {                out.writeUTF(phoneNum);                out.writeLong(downFlow);                out.writeLong(upFlow);                out.writeLong(sumFlow);        }        @Override        public void readFields(DataInput in) throws IOException {                phoneNum = in.readUTF();                downFlow = in.readLong();                upFlow = in.readLong();                sumFlow = in.readLong();        }        @Override        public String toString() {                return upFlow+"\t"+downFlow+"\t"+sumFlow;        }}

2. 编写Mapper,Reducer,Runner.

package com.codewatching.fluxcount.hadoop;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import com.codewatching.fluxcount.bean.FlowBean;public class FlowSumMapper extends Mapper{        @Override        protected void map(LongWritable key, Text value,Context context)                        throws IOException, InterruptedException {                String line = value.toString();                String[] fileds = line.split("\t");                int length = fileds.length;                String phoneNum = fileds[1];                long upFlow = Long.parseLong(fileds[length-3]);                long downFlow = Long.parseLong(fileds[length-2]);                FlowBean flowBean = new FlowBean(phoneNum, upFlow, downFlow);                //以flowBean为value供reducer处理                context.write(new Text(phoneNum), flowBean);        }}
package com.codewatching.fluxcount.hadoop;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import com.codewatching.fluxcount.bean.FlowBean;public class FlowSumReducer extends Reducer{        @Override        protected void reduce(Text key, Iterable values,Context context)                        throws IOException, InterruptedException {                long _downFlow = 0;                long _upFlow = 0;                for (FlowBean flowBean : values) {                        _downFlow += flowBean.getDownFlow();                        _upFlow += flowBean.getUpFlow();                }                FlowBean bean = new FlowBean(key.toString(), _upFlow, _downFlow);                context.write(key, bean);        }}
package com.codewatching.fluxcount.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.codewatching.fluxcount.bean.FlowBean;public class FlowSumRunner extends Configured implements Tool{        @Override        public int run(String[] args) throws Exception {                Configuration configuration = new Configuration();                Job job = Job.getInstance(configuration);                configuration.set("mapreduce.job.jar", "fluxcount.jar");                job.setJarByClass(FlowSumRunner.class);                job.setMapperClass(FlowSumMapper.class);                job.setReducerClass(FlowSumReducer.class);                job.setOutputKeyClass(Text.class);                job.setOutputValueClass(FlowBean.class);                FileInputFormat.setInputPaths(job, new Path(args[0]));                FileSystem fileSystem = FileSystem.get(configuration);                Path path = new Path(args[1]);                if(fileSystem.exists(path)){                        fileSystem.delete(path, true);                }                FileOutputFormat.setOutputPath(job, path);                                return job.waitForCompletion(true)?0:1;        }        public static void main(String[] args) throws Exception {                ToolRunner.run(new Configuration(), new FlowSumRunner(), args);        }}

3.Partitioner类的编程

hadoop的map/reduce中支持对key进行分区,从而让map出来的数据均匀分布在reduce上.Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出结果.Mapper的结果,可能送到Combiner(下面回讲到)做合并, Mapper最终处理的键值对,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer。哪个key到哪个Reducer的分配过程,是由Partitioner规定的.说的真麻烦。如果我们去查阅Partitioner类的源码,就知道它是个抽象类,里面有个抽象方法:

/**    * Get the partition number for a given key (hence record) given the total    * number of partitions i.e. number of reduce-tasks for the job.   *      * 

Typically a hash function on a all or a subset of the key.

* * @param key the key to be partioned. * @param value the entry value. * @param numPartitions the total number of partitions. * @return the partition number for the key. */public abstract int getPartition(KEY key, VALUE value, int numPartitions);

而在类的注释也是非常的全面,不得抱怨一句。洋文如果好一点的话,学起来会轻松多了.唉,老大难.

Partitionercontrols the partitioning of the keys of the  intermediate map-outputs.....省略..

案例2:在案例1的基础上,然后将号码进行分区,假设135是北京,139是江西...将各地区的统计出来,并且各地区单独存放文件.效果图如下:

  1. 在案例的基础上,编写一个Partitionner实现类.

package com.codewatching.fluxcount.hadoop;import java.util.HashMap;import java.util.Map;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;import com.codewatching.fluxcount.bean.FlowBean;public class AreaPartitioner extends Partitioner{        private static Map areaMap;        static{                areaMap = new HashMap();                areaMap.put("135", 0);    //模拟分区,存在内存中。                areaMap.put("137", 1);                areaMap.put("138", 2);                areaMap.put("139", 3);        }        @Override        public int getPartition(Text key, FlowBean value, int numPartitions) {                int area = 4;  //默认都是为4                String prefix = key.toString().substring(0,3);                //判断是否在某个分区中                Integer index = areaMap.get(prefix);                if(index!=null){                        area = index;  //如果存在,取相应的数字0,1,2,3                }                return area;        }}

2.在Runner中添加两行代码.

3.在Hadoop中的运行结果.

其实上Hadoop已经提供了一个默认的实现类叫着HashPartitioner.看看它如何key分区的.

将key均匀分布在ReduceTasks上,举例如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用的Horner公式计算,得到一个int,string太大的话这个int值可能会溢出变成负数,所以与上Integer.MAX_VALUE(即0111111111111111),然后再对reduce个数取余,这样就可以让key均匀分布在reduce上。

PS:这个简单算法得到的结果可能不均匀,因为key毕竟不会那么线性连续.

4.MapReduce的输出处理类和输入处理类

输入处理类:InputFormat的作用负责MR的输入部分

1、验证作业的输入是否规范。

2、把输入文件切分成InputSplit。

3、提供RecordReader的实现类,把InputSplit读到Mapper中进行处理.

最佳分片的大小应该与块大小相同:因为它是确保可以存储在单个节点上的最大输入块的大小。如果分片跨越2个数据块,那么对于任何一个HDFS节点,基本上都不可能同时存储着2个数据块,因此分片中的部分数据需要通过网络传输到Map任务节点,与使用本地数据运行整个Map任务相比,这种方法显然效率更低。

PS:还可以编写自定义的输入处理类,继承InputFormat,重写相应的方法,当然,首先要知道方法的作用.--建议读源代码.

输出处理类:OutputFormat,在Ruduce处理之后.

编程时,输出输入处理类在哪使用指定:

感谢各位的阅读,以上就是"怎么编写不同MapReudce程序"的内容了,经过本文的学习后,相信大家对怎么编写不同MapReudce程序这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0