千家信息网

Hadoop辅助排序的示例分析

发表于:2025-02-16 作者:千家信息网编辑
千家信息网最后更新 2025年02月16日,这篇文章主要为大家展示了"Hadoop辅助排序的示例分析",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Hadoop辅助排序的示例分析"这篇文章吧。1. 需
千家信息网最后更新 2025年02月16日Hadoop辅助排序的示例分析

这篇文章主要为大家展示了"Hadoop辅助排序的示例分析",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"Hadoop辅助排序的示例分析"这篇文章吧。

1. 需求
求每年的最高温度

2. 样例数据

1995     101996    111995    161995    221996    261995    31996    71996    101996    201996    331995    211996    91995    311995    -131995    221997    -21997    281997    151995    8


3. 思路、代码
将记录按年份分组并按温度降序排序,然后才将同一年份的所有记录送到一个 reducer 组,则各组的首条记录就是这一年的最高温度。实现此方案的要点是:
a. 定义包括自然键(年份)和自然值(温度)的组合键。
b. 根据组合键对记录进行排序。
c. 针对组合键进行分区和分组时均只考虑自然键。

import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * 组合键,此例中用于辅助排序,包括年份和温度。 */public class IntPair implements WritableComparable {    private IntWritable first;    private IntWritable second;    public IntPair() {        this.first = new IntWritable();        this.second = new IntWritable();        //若注释掉上面两行,使用时会发生异常 java.lang.NullPointerException at IntPair.readFields    }    public IntPair(int first, int second) {        set(new IntWritable(first), new IntWritable(second));    }    public IntPair(IntWritable first, IntWritable second) {        set(first, second);    }    public void set(IntWritable first, IntWritable second) {        this.first = first;        this.second = second;    }    public IntWritable getFirst() {        return first;    }    public IntWritable getSecond() {        return second;    }    public void write(DataOutput out) throws IOException {        first.write(out);        second.write(out);    }    public void readFields(DataInput in) throws IOException {        first.readFields(in);        second.readFields(in);    }    @Override    public int hashCode() {        return first.hashCode() * 163 + second.hashCode();    }    @Override    public boolean equals(Object obj) {        if (obj instanceof IntPair) {            IntPair ip = (IntPair) obj;            return first.get() == ip.first.get() && second.get() == ip.second.get();        }        return false;    }    @Override    public String toString() {        return first + "\t" + second;    }    public int compareTo(IntPair o) {        int cmp = first.compareTo(o.first);        if (cmp == 0) {            cmp = second.compareTo(o.second);        }        return cmp;    }}


import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.io.WritableUtils;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {    static class MaxTemperatureMapper extends Mapper {        @Override        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            String[] val = value.toString().split("\\t");            if (val.length == 2) {                context.write(new IntPair(Integer.parseInt(val[0]), Integer.parseInt(val[1])), NullWritable.get());            }        }    }    static class MaxTemperatureReducer extends Reducer {        @Override        protected void reduce(IntPair key, Iterable values, Context context) throws IOException, InterruptedException {            context.write(key, NullWritable.get()); //仅输出第一行        }    }    //仅根据 first 分区    public static class FirstPartitioner extends Partitioner {        @Override        public int getPartition(IntPair key, NullWritable value, int numPartitions) {            return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;        }    }    //仅根据 first 分组    public static class GroupComparator extends WritableComparator {        private static final IntWritable.Comparator INT_COMPARATOR = new IntWritable.Comparator();        protected GroupComparator() {            super(IntPair.class, true);        }        @Override        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {            try {                int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);                int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2);                return INT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2);            } catch (IOException e) {                throw new IllegalArgumentException(e);            }        }        @Override        public int compare(WritableComparable a, WritableComparable b) {            if (a instanceof IntPair && b instanceof IntPair) {                return ((IntPair) a).getFirst().compareTo(((IntPair) b).getFirst());            }            return super.compare(a, b);        }    }    //根据组合键排序    public static class KeyComparator extends WritableComparator {        protected KeyComparator() {            super(IntPair.class, true);        }        @Override        public int compare(WritableComparable a, WritableComparable b) {            if (a instanceof IntPair && b instanceof IntPair) {                IntPair ip1 = (IntPair) a;                IntPair ip2 = (IntPair) b;                int cmp = ip1.getFirst().compareTo(ip2.getFirst()); //升序(年份)                if (cmp != 0) {                    return cmp;                }                return -ip1.getSecond().compareTo(ip2.getSecond()); //降序(温度)            }            return super.compare(a, b);        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();        if (otherArgs.length != 2) {            System.err.println("Parameter number is wrong, please enter two parameters: ");            System.exit(-1);        }        Path inputPath = new Path(otherArgs[0]);        Path outputPath = new Path(otherArgs[1]);        //conf.set("fs.defaultFS", "hdfs://vmnode.zhch:9000");        Job job = Job.getInstance(conf, "MaxTemperatureUsingSecondarySort");        //job.setJar("F:/workspace/AssistRanking2/target/AssistRanking2-1.0-SNAPSHOT.jar");        job.setJarByClass(MaxTemperatureUsingSecondarySort.class);        job.setMapperClass(MaxTemperatureMapper.class);        job.setPartitionerClass(FirstPartitioner.class);        job.setSortComparatorClass(KeyComparator.class); //默认根据 Key 的 compareTo 函数排序        job.setGroupingComparatorClass(GroupComparator.class);        job.setReducerClass(MaxTemperatureReducer.class);        job.setMapOutputKeyClass(IntPair.class);        job.setOutputKeyClass(IntPair.class);        job.setOutputValueClass(NullWritable.class);        FileInputFormat.addInputPath(job, inputPath);        FileOutputFormat.setOutputPath(job, outputPath);        return job.waitForCompletion(true) ? 0 : 1;    }    public static void main(String[] args) throws Exception {        int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);        System.exit(exitCode);    }}


4. 运行截图


以上是"Hadoop辅助排序的示例分析"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0