千家信息网

MapReduce中怎样实现二次排序

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇文章给大家分享的是有关MapReduce中怎样实现二次排序,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。一、需求分析MR的二次排序的
千家信息网最后更新 2025年01月23日MapReduce中怎样实现二次排序

本篇文章给大家分享的是有关MapReduce中怎样实现二次排序,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

一、需求分析

MR的二次排序的需求说明: 在mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的(如下图)。如果想要此时value值也是排序好的,这种需求就是二次排序。

  原始数据            无二次排序   有二次排序        a 12           a 12           a 12        b 34               b 34               b 13        c 90               b 23           b 23        b 23               b 13           b 34        b 13               c 90           c 90

根据案例分析,我们要将下面数据key按照abc,value按照大小排序,这也就是一个典型的MR的二次排序的案例,准备原始数据:

a 20b 20a 5c 10c 8b 15a 10b 18c 29b 52

我们想要得到的结果:

a       5a       10a       20b       15b       18b       20b       52c       8c       10c       29

二、方案一实现

先看方案一的实现思路:

input -> map -> -> shuffle ->  -> reduce ->                                                                                                                                                                                                                                                                                           ...                                                                                                                                                                                                                                                                                                                                                                               ...

直接在reduce端对分组后的values进行排序 示例代码:

package com.kfk.hadoop.mr.sort;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.List;/** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/9 * @time : 7:07 下午 */public class SortMR extends Configured implements Tool {    /**     * map     * TODO     */    public static class TemplateMapper extends Mapper{        // 创建map输出的对象        private static final Text mapOutKey = new Text();        private static final IntWritable mapOutValue = new IntWritable();        @Override        public void setup(Context context) throws IOException, InterruptedException {            // TODO        }        @Override        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            // 将每一行数据按空格拆开            String[] values = value.toString().split(" ");            // 数据预处理,将数组超过2的过滤掉            if (values.length != 2){                return;            }            mapOutKey.set(values[0]);            mapOutValue.set(Integer.valueOf(values[1]));            context.write(mapOutKey,mapOutValue);        }        @Override        public void cleanup(Context context) throws IOException, InterruptedException {            // TODO        }    }    /**     * reduce     * TODO     */    public static class TemplateReducer extends Reducer{        // 创建reduceout端的对象        private static final IntWritable outputValue = new IntWritable();        @Override        public void setup(Context context) throws IOException, InterruptedException {            // TODO        }        @Override        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {            List valueList = new ArrayList();            // 取出value            for (IntWritable value:values){                valueList.add(value.get());            }                        // 打印出reduce输入的key和valueList            System.out.println("Reduce in == KeyIn: "+key+"   ValueIn: "+valueList);            // 进行排序            Collections.sort(valueList);                        /*                valueList:表示上面已经排序好的列表,即需要遍历列表中的值作为reduce的输出                key不需要改变,即可作为reduce的输出             */            for (Integer value : valueList){                outputValue.set(value);                context.write(key,outputValue);            }        }        @Override        public void cleanup(Context context) throws IOException, InterruptedException {            // TODO        }    }    /**     * run     * @param args     * @return     * @throws IOException     * @throws ClassNotFoundException     * @throws InterruptedException     */    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        // 1) get conf        Configuration configuration = this.getConf();        // 2) create job        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());        job.setJarByClass(this.getClass());        // 3.1) input,指定job的输入        Path path = new Path(args[0]);        FileInputFormat.addInputPath(job,path);        // 3.2) map,指定job的mapper和输出的类型        job.setMapperClass(TemplateMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        // 1.分区        // job.setPartitionerClass();        // 2.排序        // job.setSortComparatorClass();        // 3.combiner -可选项        // job.setCombinerClass(WordCountCombiner.class);        // 4.compress -可配置        // configuration.set("mapreduce.map.output.compress","true");        // 使用的SnappyCodec压缩算法        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");        // 5.分组        // job.setGroupingComparatorClass();        // 6.设置reduce的数量        // job.setNumReduceTasks(2);        // 3.3) reduce,指定job的reducer和输出类型        job.setReducerClass(TemplateReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        // 3.4) output,指定job的输出        Path outpath = new Path(args[1]);        FileOutputFormat.setOutputPath(job,outpath);        // 4) commit,执行job        boolean isSuccess = job.waitForCompletion(true);        // 如果正常执行返回0,否则返回1        return (isSuccess) ? 0 : 1;    }    public static void main(String[] args) {        // 添加输入,输入参数        args = new String[]{            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort",            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"        };//        WordCountUpMR wordCountUpMR = new WordCountUpMR();        Configuration configuration = new Configuration();        try {            // 判断输出的文件存不存在,如果存在就将它删除            Path fileOutPath = new Path(args[1]);            FileSystem fileSystem = FileSystem.get(configuration);            if (fileSystem.exists(fileOutPath)){                fileSystem.delete(fileOutPath,true);            }            // 调用run方法            int status = ToolRunner.run(configuration,new SortMR(),args);            // 退出程序            System.exit(status);        } catch (IOException e) {            e.printStackTrace();        } catch (ClassNotFoundException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        } catch (Exception e) {            e.printStackTrace();        }    }}

运行结果:

a       5a       10a       20b       15b       18b       20b       52c       8c       10c       29

很容易发现,这样把排序工作都放到reduce端完成,当values序列长度非常大的时候,会对CPU和内存造成极大的负载。

注意的地方(容易被"坑") 在reduce端对values进行迭代的时候,不要直接存储value值或者key值,因为reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。需要用相应的数据类型.get()取出后再存储。

三、方案二实现

方案二的解决思路:

  原始数据                  自定义newkey 在shuffle中排序  reduce输入                               reduce输出        a 12                   a#12,12    a#12,12        b 34                       b#34,34    b#13,13        c 90 -> map ->        c#90,90    b#23,23       b#,List(13,23,34)-> reduce ->  b,13 b,23 b,34        b 23                       b#23,23    b#34,34          b 13                       b#13,13    c#90,90

我们可以把key和value联合起来作为新的key,记作newkey。这时,newkey含有两个字段,假设分别是k,v。这里的k和v是原来的key和value。原来的value还是不变。这样,value就同时在newkey和value的位置。我们再实现newkey的比较规则,先按照key排序,在key相同的基础上再按照value排序。在分组时,再按照原来的key进行分组,就不会影响原有的分组逻辑了。最后在输出的时候,只把原有的key、value输出,就可以变通的实现了二次排序的需求。

需要自定义的地方   1.自定义数据类型实现组合key     实现方式:继承WritableComparable   2.自定义partioner,形成newKey后保持分区规则任然按照key进行。保证不打乱原来的分区。     实现方式:继承Partitioner   3.自定义分组,保持分组规则任然按照key进行。不打乱原来的分组     实现方式:继承RawComparator

     自定义数据类型代码:

package com.kfk.hadoop.mr.secondsort;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/15 * @time : 6:16 下午 */public class PairWritable implements WritableComparable {    // 组合key:a#12,12    private String first;    private int second;    public PairWritable() {    }    public PairWritable(String first, int second) {        this.set(first,second);    }    /**     * 方便设置字段     */    public void set(String first, int second){        this.first = first;        this.second = second;    }    public String getFirst() {        return first;    }    public void setFirst(String first) {        this.first = first;    }    public int getSecond() {        return second;    }    public void setSecond(int second) {        this.second = second;    }    /**     * 重写比较器     */    public int compareTo(PairWritable o) {        int comp = this.getFirst().compareTo(o.getFirst());        if (0 == comp){            // 若第一个字段相等,则比较第二个字段            return Integer.valueOf(this.getSecond()).compareTo(o.getSecond());        }        return comp;    }    /**     * 序列化     */    public void write(DataOutput out) throws IOException {        out.writeUTF(first);        out.writeInt(second);    }    /**     * 反序列化     */    public void readFields(DataInput in) throws IOException {        this.first = in.readUTF();        this.second = in.readInt();    }    @Override    public String toString() {        return "PairWritable{" +                "first='" + first + '\'' +                ", second=" + second +                '}';    }    @Override    public boolean equals(Object o) {        if (this == o) return true;        if (o == null || getClass() != o.getClass()) return false;        PairWritable that = (PairWritable) o;        if (second != that.second) return false;        return first != null ? first.equals(that.first) : that.first == null;    }    @Override    public int hashCode() {        int result = first != null ? first.hashCode() : 0;        result = 31 * result + second;        return result;    }}

自定义分区代码:

package com.kfk.hadoop.mr.secondsort;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Partitioner;/** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/15 * @time : 7:09 下午 */public class FristPartitioner extends Partitioner {    public int getPartition(PairWritable key, IntWritable intWritable, int numPartitions) {       /*        * 默认的实现 (key.hashCode() & Integer.MAX_VALUE) % numPartitions        * 让key中first字段作为分区依据        */        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;    }}

自定义分组比较器代码:

package com.kfk.hadoop.mr.secondsort;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.WritableComparator;/** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/15 * @time : 6:59 下午 */public class FristGrouping implements RawComparator {    /*     * 字节比较     * bytes1,bytes2为要比较的两个字节数组     * i,i1表示第一个字节数组要进行比较的收尾位置,i2,i3表示第二个     * 从第一个字节比到组合key中second的前一个字节,因为second为int型,所以长度为4     */    public int compare(byte[] bytes1, int i, int i1, byte[] bytes2, int i2, int i3) {        return WritableComparator.compareBytes(bytes1,0,i1-4,bytes2,0,i3-4);    }    /*     * 对象比较     */    public int compare(PairWritable o1, PairWritable o2) {        return o1.getFirst().compareTo(o2.getFirst());    }}

二次排序实现代码:

package com.kfk.hadoop.mr.secondsort;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/9 * @time : 7:07 下午 */public class SecondSortMR extends Configured implements Tool {    /**     * map     * TODO     */    public static class TemplateMapper extends Mapper{        // 创建map输出的对象        private static final PairWritable mapOutKey = new PairWritable();        private static final IntWritable mapOutValue = new IntWritable();        @Override        public void setup(Context context) {            // TODO        }        @Override        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            // 将每一行数据按空格拆开            String[] values = value.toString().split(" ");            // 数据预处理,将数组超过2的过滤掉            if (values.length != 2){                return;            }            mapOutKey.set(values[0],Integer.parseInt(values[1]));            mapOutValue.set(Integer.parseInt(values[1]));            context.write(mapOutKey,mapOutValue);            System.out.println("Map out == KeyOut: "+mapOutKey+"   ValueOut: "+mapOutValue);        }        @Override        public void cleanup(Context context) {            // TODO        }    }    /**     * reduce     * TODO     */    public static class TemplateReducer extends Reducer{        // 创建reduce output端的对象        private static final IntWritable outputValue = new IntWritable();        private static final Text outputKey = new Text();        @Override        public void setup(Context context) throws IOException, InterruptedException {            // TODO        }        @Override        public void reduce(PairWritable key, Iterable values, Context context) throws IOException, InterruptedException {                        /*                values表示reduce端输入已经排序好的列表,即需要遍历values每一个值作为reduce输出即可                key表示为自定义的key(newkey),即需要取出newkey的第一部分,也就是原始的key,作为reduce的输出             */            for (IntWritable value:values){                outputKey.set(key.getFirst());                context.write(outputKey,value);            }        }        @Override        public void cleanup(Context context) throws IOException, InterruptedException {            // TODO        }    }    /**     * run     * @param args     * @return     * @throws IOException     * @throws ClassNotFoundException     * @throws InterruptedException     */    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        // 1) get conf        Configuration configuration = this.getConf();        // 2) create job        Job job = Job.getInstance(configuration,this.getClass().getSimpleName());        job.setJarByClass(this.getClass());        // 3.1) input,指定job的输入        Path path = new Path(args[0]);        FileInputFormat.addInputPath(job,path);        // 3.2) map,指定job的mapper和输出的类型        job.setMapperClass(TemplateMapper.class);        job.setMapOutputKeyClass(PairWritable.class);        job.setMapOutputValueClass(IntWritable.class);        // 1.分区        job.setPartitionerClass(FristPartitioner.class);        // 2.排序        // job.setSortComparatorClass();        // 3.combiner -可选项        // job.setCombinerClass(WordCountCombiner.class);        // 4.compress -可配置        // configuration.set("mapreduce.map.output.compress","true");        // 使用的SnappyCodec压缩算法        // configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");        // 5.分组        job.setGroupingComparatorClass(FristGrouping.class);        // 6.设置reduce的数量        // job.setNumReduceTasks(2);        // 3.3) reduce,指定job的reducer和输出类型        job.setReducerClass(TemplateReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        // 3.4) output,指定job的输出        Path outpath = new Path(args[1]);        FileOutputFormat.setOutputPath(job,outpath);        // 4) commit,执行job        boolean isSuccess = job.waitForCompletion(true);        // 如果正常执行返回0,否则返回1        return (isSuccess) ? 0 : 1;    }    public static void main(String[] args) {        // 添加输入,输入参数        args = new String[]{            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/datas/secondSort",            "hdfs://bigdata-pro-m01:9000/user/caizhengjie/mr/output"        };//        WordCountUpMR wordCountUpMR = new WordCountUpMR();        Configuration configuration = new Configuration();        try {            // 判断输出的文件存不存在,如果存在就将它删除            Path fileOutPath = new Path(args[1]);            FileSystem fileSystem = FileSystem.get(configuration);            if (fileSystem.exists(fileOutPath)){                fileSystem.delete(fileOutPath,true);            }            // 调用run方法            int status = ToolRunner.run(configuration,new SecondSortMR(),args);            // 退出程序            System.exit(status);        } catch (IOException e) {            e.printStackTrace();        } catch (ClassNotFoundException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        } catch (Exception e) {            e.printStackTrace();        }    }}

运行结果:

a       5a       10a       20b       15b       18b       20b       52c       8c       10c       29

以上就是MapReduce中怎样实现二次排序,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

排序 输出 数据 分组 输入 对象 类型 代码 字段 字节 原始 两个 序列 数组 方案 需求 方式 方法 时候 结果 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 安全模式 网络安全 中国中医药临床案例成果数据库 数据库设计与业务关系 软件开发中的需求文档 网络安全就像空气 idea关联数据库下载驱动失败 山东大学网络安全学院导师 腾讯云香港服务器防护 kpi考核 网络技术公司 如何判断服务器硬盘有多少g 查数据库实际下所有表名 微信数据库可以保存多久聊天记录 南通慕泰网络技术有限公司 刺激战场上最强的服务器 ntp时间服务器软件 帝国cms的数据库时间转换 中山机器人rpa软件开发公司 计算机网络技术职业素养目标 数据库 访问模式 win7有服务器管理器吗 数据库数据用逗号分隔 我国网络安全的手抄报 深圳安卓应用软件开发公司 山东玩购网络技术有限公司 西安软件开发靠谱的公司 网络游戏服务器登录超时怎么办 中国的网络技术到底有多牛 网络技术的内容 高并发更新数据库乐观锁 维护网络安全的具体举措
0