千家信息网

Hadoop中MapReduce常用算法有哪些

发表于:2025-02-23 作者:千家信息网编辑
千家信息网最后更新 2025年02月23日,这篇文章将为大家详细讲解有关Hadoop中MapReduce常用算法有哪些,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。1.排序:1)数据:hadoop fs -m
千家信息网最后更新 2025年02月23日Hadoop中MapReduce常用算法有哪些

这篇文章将为大家详细讲解有关Hadoop中MapReduce常用算法有哪些,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。


1.排序:

1)数据:

hadoop fs -mkdir /import
创建一个或者多个文本,上传
hadoop fs -put test.txt /import/

2)代码:

package com.cuiweiyou.sort;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//hadoop默认排序: //如果k2、v2类型是Text-文本,结果是按照字典顺序//如果k2、v2类型是LongWritable-数字,结果是按照数字大小顺序public class SortTest {        /**         * 内部类:映射器 Mapper         */        public static class MyMapper extends Mapper {                /**                 * 重写map方法                 */                public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {                        //这里v1转为k2-数字类型,舍弃k1。null为v2                        context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get());  //因为v1可能重复,这时,k2也是可能有重复的                }        }        /**         * 内部类:拆分器 Reducer         */        public static class MyReducer extends Reducer {                /**                 * 重写reduce方法   * 在此方法执行前,有个shuffle过程,会根据k2将对应的v2归并为v2[...]                  */                protected void reduce(LongWritable k2, Iterable v2, Reducerk3, v2[...]舍弃。null => v3                        context.write(k2, NullWritable.get());  //此时,k3如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k3                 }        }        public static void main(String[] args) throws Exception {                // 声明配置信息                Configuration conf = new Configuration();                conf.set("fs.default.name", "hdfs://localhost:9000");                                // 创建作业                Job job = new Job(conf, "SortTest");                job.setJarByClass(SortTest.class);                                // 设置mr                job.setMapperClass(MyMapper.class);                job.setReducerClass(MyReducer.class);                                // 设置输出类型,和Context上下文对象write的参数类型一致                job.setOutputKeyClass(LongWritable.class);                job.setOutputValueClass(NullWritable.class);                                // 设置输入输出路径                FileInputFormat.setInputPaths(job, new Path("/import/"));                FileOutputFormat.setOutputPath(job, new Path("/out"));                                // 执行                System.exit(job.waitForCompletion(true) ? 0 : 1);        }}


3)测试:

可以看到,不仅排序而且去重了。


2.去重:

需求:查取手机号有哪些。这里的思路和上面排序算法的思路是一致的,仅仅多了分割出手机号这一步骤。

1)数据:

创建两个文本,手动输入一些测试内容。每个字段用制表符隔开。日期,电话,地址,方式,数据量。


2)代码:

(1)map和reduce:
/**         * 映射器 Mapper         */        public static class MyMapper extends Mapper {                /**                 * 重写map方法                 */                protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {                        //按照制表符进行分割                        String[] tels = v1.toString().split("\t");                        //k1 => k2-第2列手机号,null => v2                        context.write(new Text(tels[1]), NullWritable.get());                }        }                        /************************************************************         *  在map后,reduce前,有个shuffle过程,会根据k2将对应的v2归并为v2[...]          ***********************************************************/                /**         * 拆分器 Reducer         */        public static class MyReducer extends Reducer {                /**                 * 重写reduce方法                 */                protected void reduce(Text k2, Iterable v2, Context context) throws IOException ,InterruptedException {                        //此时,k3如果发生重复,根据默认算法会发生覆盖,即最终仅保存一个k3,达到去重到效果                        context.write(k2, NullWritable.get());                }        }


(2)配置输出:
// 设置输出类型,和Context上下文对象write的参数类型一致job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);


3)测试:


3.过滤:

需求:查询在北京地区发生的上网记录。思路同上,当写出 k2 、 v2 时加一个判断即可。

1)数据:

同上。

2)代码:

(1)map和reduce:
/**         * 内部类:映射器 Mapper         */        public static class MyMapper extends Mapper {                /**                 * 重写map方法                 */                protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {                        //按照制表符进行分割                        final String[] adds = v1.toString().split("\t");                        //地址在第3列                        //k1 => k2-地址,null => v2                        if(adds[2].equals("beijing")){                                context.write(new Text(v1.toString()), NullWritable.get());                        }                }        }        /**         * 内部类:拆分器 Reducer         */        public static class MyReducer extends Reducer {                /**                 * 重写reduce方法                 */                protected void reduce(Text k2, Iterable v2, Context context) throws IOException ,InterruptedException {                        context.write(k2, NullWritable.get());                }        }


(2)配置输出:
// 设置输出类型,和Context上下文对象write的参数类型一致job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);


3)测试:


4.TopN:

这个算法非常经典,面试必问。实现这个效果的算法也很多。下面是个简单的示例。
需求:找到流量最大值;找出前5个最大值。

1)数据:

同上。

2)代码1-最大值:

(1)map和reduce:
//map        public static class MyMapper extends Mapper {                //首先创建一个临时变量,保存一个可存储的最小值:Long.MIN_VALUE=-9223372036854775808                long temp = Long.MIN_VALUE;                                //找出最大值                protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {                        //按照制表符进行分割                        final String[] flows = v1.toString().split("\t");                        //将文本转数值                        final long val = Long.parseLong(flows[4]);                        //如果v1比临时变量大,则保存v1的值                        if(temp {                //临时变量                Long temp = Long.MIN_VALUE;                //因为一个文件得到一个最大值,再次将这些值比对,得到最大的                protected void reduce(LongWritable k2, Iterable v2, Context context) throws IOException ,InterruptedException {                                                long long1 = Long.parseLong(k2.toString());                        //如果k2比临时变量大,则保存k2的值                        if(temp


(2)配置输出:
// 设置输出类型job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(NullWritable.class);


3)测试1:

4)代码2-TopN:

(1)map和reduce:
//map        public static class MyMapper extends Mapper {                //首先创建一个临时变量,保存一个可存储的最小值:Long.MIN_VALUE=-9223372036854775808                long temp = Long.MIN_VALUE;                //Top5存储空间                long[] tops;                                /** 次方法在run中调用,在全部map之前执行一次 **/                protected void setup(Context context) {                        //初始化数组长度为5                        tops = new long[5];                  }                                //找出最大值                protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {                        //按照制表符进行分割                        final String[] flows = v1.toString().split("\t");                        //将文本转数值                        final long val = Long.parseLong(flows[4]);                        //保存在0索引                        tops[0] = val;                        //排序后最大值在最后一个索引,这样从后到前依次减小                        Arrays.sort(tops);                }                                /** ---此方法在全部到map任务结束后执行一次。这时仅输出临时变量到最大值--- **/                protected void cleanup(Context context) throws IOException ,InterruptedException {                        //保存前5条数据                        for( int i = 0; i < tops.length; i++) {                                  context.write(new LongWritable(tops[i]), NullWritable.get());                          }                }        }                //reduce        public static class MyReducer extends Reducer {                //临时变量                Long temp = Long.MIN_VALUE;                //Top5存储空间                long[] tops;                /** 次方法在run中调用,在全部map之前执行一次 **/                protected void setup(Context context) {                        //初始化长度为5                        tops = new long[5];                  }                                //因为每个文件都得到5个值,再次将这些值比对,得到最大的                protected void reduce(LongWritable k2, Iterable v2, Context context) throws IOException ,InterruptedException {                                                long top = Long.parseLong(k2.toString());                        //                        tops[0] = top;                        //                        Arrays.sort(tops);                }                                /** ---此方法在全部到reduce任务结束后执行一次。输出前5个最大值--- **/                protected void cleanup(Context context) throws IOException, InterruptedException {                        //保存前5条数据                        for( int i = 0; i < tops.length; i++) {                                  context.write(new LongWritable(tops[i]), NullWritable.get());                          }                 }        }


(2)配置输出:
// 设置输出类型job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(NullWritable.class);


5)测试2:


5.单表关联:

本例中的单表实际就是一个文本文件。

1)数据:

2)代码:

(1)map和reduce:
//map        public static class MyMapper extends Mapper {                //拆分原始数据                protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {                        //按制表符拆分记录                        String[] splits = v1.toString().split("\t");                        //一条k2v2记录:把孙辈作为k2;祖辈加下划线区分,作为v2                        context.write(new Text(splits[0]), new Text("_"+splits[1]));                        //一条k2v2记录:把祖辈作为k2;孙辈作为v2。就是把原两个单词调换位置保存                        context.write(new Text(splits[1]), new Text(splits[0]));                }                                                /**                                张三              _张三爸爸                                张三爸爸    张三                                                                张三爸爸    _张三爷爷                                张三爷爷    张三爸爸                        **/        }                //reduce        public static class MyReducer extends Reducer {                //拆分k2v2[...]数据                protected void reduce(Text k2, Iterable v2, Context context) throws IOException ,InterruptedException {                        String grandchild = "";        //孙辈                        String grandfather = "";       //祖辈                                                /**                           张三爸爸            [_张三爷爷,张三]                        **/                                                //从迭代中遍历v2[...]                        for (Text man : v2) {                                String p = man.toString();                                //如果单词是以下划线开始的                                if(p.startsWith("_")){                                        //从索引1开始截取字符串,保存到祖辈变量                                        grandfather = p.substring(1);                                }                                //如果单词没有下划线起始                                else{                                        //直接赋值给孙辈变量                                        grandchild = p;                                }                        }                                                //在得到有效数据的情况下                        if( grandchild!="" && grandfather!=""){                                //写出得到的结果。                                context.write(new Text(grandchild), new Text(grandfather));                        }                                                /**                                k3=张三,v3=张三爷爷                        **/                }        }


(2)配置输出:
// 设置输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);


3)测试:


6.双表关联:

本例中仍简单采用两个文本文件。

1)数据:

2)代码:

(1)map和reduce:
//map        public static class MyMapper extends Mapper {                //拆分原始数据                protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException {                        //拆分记录                        String[] splited = v1.toString().split("\t");                        //如果第一列是数字(使用正则判断),就是地址表                        if(splited[0].matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$")){                                String addreId = splited[0];                                String address = splited[1];  //k2,v2-加两条下划线作为前缀标识为地址                                context.write(new Text(addreId), new Text("__"+address));                        }                        //否则就是人员表                        else{                                String personId = splited[1];                                String persName = splited[0];  //k2,v2-加两条横线作为前缀标识为人员                                context.write(new Text(personId), new Text("--"+persName));                        }                        /**                         1 __北京                         1 --张三                        **/                }        }                //reduce        public static class MyReducer extends Reducer {                //拆分k2v2[...]数据                protected void reduce(Text k2, Iterable v2, Context context) throws IOException ,InterruptedException {                        String address = "";   //地址                        String person = "";            //人员                        /**                                1, [__北京,--张三]                        **/                        //迭代的是address或者person                        for (Text text : v2) {                                String tmp = text.toString();                                                                if(tmp.startsWith("__")){                                        //如果是__开头的是address                                        address = tmp.substring(2);   //从索引2开始截取字符串                                }                                if(tmp.startsWith("--")){                                        //如果是--开头的是person                                        person = tmp.substring(2);                                }                        }                        context.write(new Text(person), new Text(address));                }                /**                 k3=张三,v3=北京                **/        
(2)配置输出:
// 设置输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);

3)测试:

关于"Hadoop中MapReduce常用算法有哪些"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

输出 张三 数据 最大 类型 变量 最大值 方法 测试 算法 代码 文本 配置 制表符 地址 制表 文件 此方法 爸爸 排序 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 wwt数据库怎么使用 有关校园网络安全的内容 有关网络安全的小论文题目 王者荣耀登录不上服务器未响应 韶关通信软件开发批发价格 x86 arm 软件开发差异点 河北挑选软件开发规定 上海科世达公司软件开发怎么样 网络安全的概念里信息安全 端游绝地求生服务器登录不上 常用的网络安全管理命令 吉的堡教育软件开发怎么样 图书馆数据库概念设计er 一带一路数据库 费用 数据库安全审计系统建设方案 数据库同时不满足两个属性 学习服务器需要了解什么 石岐定制软件开发 2017最新网络安全 强制卸载服务器 电信安全网络安全信息安全 数据库系统实验报告 什么网络技术裤子 以下数据库角色中权限最高的是 软件开发的流程编码 社区妇联网络安全宣传总结 mt5软件开发对联国际数据 小学开展扫黄打非网络安全课 飞科网络技术有限公司 高安全云服务器高速存储
0