千家信息网

TopKey怎么设置分隔符

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容介绍了"TopKey怎么设置分隔符"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!key和va
千家信息网最后更新 2025年01月23日TopKey怎么设置分隔符

本篇内容介绍了"TopKey怎么设置分隔符"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

key和value的默认分隔符为tab键

设置分隔符

程序一

package org.conan.myhadoop.TopKey;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件最值public class TopKMapReduce {    static class TopKMapper extends            Mapper {        // 输出的key        private Text mapOutputKey = new Text();        // 输出的value        private LongWritable mapOutputValue = new LongWritable();        // 存储最大值和初始值        long topkValue = Long.MIN_VALUE;        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String lineValue = value.toString();            String[] strs = lineValue.split("\t");            // 中间值            long tempValue = Long.valueOf(strs[1]);            if (topkValue < tempValue) {                topkValue = tempValue;                mapOutputKey.set(strs[0]);            }        }        @Override        protected void cleanup(Context context) throws IOException,                InterruptedException {            mapOutputValue.set(topkValue);            context.write(mapOutputKey, mapOutputValue);        }        @Override        protected void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, TopKMapReduce.class.getSimpleName());        job.setJarByClass(TopKMapReduce.class);        Path inputDir = new Path(args[0]);        FileInputFormat.addInputPath(job, inputDir);        job.setInputFormatClass(TextInputFormat.class);        job.setMapperClass(TopKMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        // job.setReducerClass(ModuleReducer.class);        // job.setOutputKeyClass(LongWritable.class);        // job.setOutputValueClass(Text.class);        job.setNumReduceTasks(0);        Path outputDir = new Path(args[1]);        FileOutputFormat.setOutputPath(job, outputDir);        Boolean isCompletion = job.waitForCompletion(true);        return isCompletion ? 0 : 1;    }    public static void main(String[] args) throws Exception {        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",                "hdfs://hadoop-master:9000/data/topkoutput" };        int status = new TopKMapReduce().run(args);        System.exit(status);    }}

程序二

package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Set;import java.util.TreeMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件 top n TreeMap实现public class TopKMapReduceV2 {    static class TopKMapper extends            Mapper {                public static final int K=3;//前三名        private LongWritable mapKey = new LongWritable();        private Text mapValue = new Text();                TreeMap topMap = null;//默认按key的升序排列        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String lineValue = value.toString();            String[] strs = lineValue.split("\t");                    long tempValue = Long.valueOf(strs[1]);               String tempKey=strs[0];               mapKey.set(tempValue);               mapValue.set(tempKey);               topMap.put(mapKey, mapValue);                   if(topMap.size()>K){                   topMap.remove(topMap.firstKey());                                  }        }        @Override        protected void cleanup(Context context) throws IOException,                InterruptedException {            Set keySet=    topMap.keySet();            for( LongWritable key:keySet) {                                context.write(topMap.get(key), key);            }        }        @Override        protected void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, TopKMapReduceV2.class.getSimpleName());        job.setJarByClass(TopKMapReduceV2.class);        Path inputDir = new Path(args[0]);        FileInputFormat.addInputPath(job, inputDir);        job.setInputFormatClass(TextInputFormat.class);        job.setMapperClass(TopKMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        // job.setReducerClass(ModuleReducer.class);        // job.setOutputKeyClass(LongWritable.class);        // job.setOutputValueClass(Text.class);        job.setNumReduceTasks(0);        Path outputDir = new Path(args[1]);        FileOutputFormat.setOutputPath(job, outputDir);        Boolean isCompletion = job.waitForCompletion(true);        return isCompletion ? 0 : 1;    }    public static void main(String[] args) throws Exception {        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",                "hdfs://hadoop-master:9000/data/topkoutput2" };        int status = new TopKMapReduceV2().run(args);        System.exit(status);    }}

程序三

package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Comparator;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件 top n TreeSet实现public class TopKMapReduceV3 {    static class TopKMapper extends            Mapper {                public static final int K=3;//前三名                        TreeSet topSet = new TreeSet(//                new Comparator() {            @Override            public int compare(TopKWritable o1, TopKWritable o2) {                            return o1.getCount().compareTo(o2.getCount());            }        }) ;        @Override        protected void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String lineValue = value.toString();            String[] strs = lineValue.split("\t");                    long tempValue = Long.valueOf(strs[1]);                                          topSet.add(new TopKWritable(strs[0], tempValue));                   if(topSet.size()>K){                   topSet.remove(topSet.first());                                  }        }        @Override        protected void cleanup(Context context) throws IOException,                InterruptedException {                        for( TopKWritable top:topSet) {                                context.write(new Text(top.getWord()), new LongWritable(top.getCount()));            }        }        @Override        protected void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, TopKMapReduceV3.class.getSimpleName());        job.setJarByClass(TopKMapReduceV3.class);        Path inputDir = new Path(args[0]);        FileInputFormat.addInputPath(job, inputDir);        job.setInputFormatClass(TextInputFormat.class);        job.setMapperClass(TopKMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        // job.setReducerClass(ModuleReducer.class);        // job.setOutputKeyClass(LongWritable.class);        // job.setOutputValueClass(Text.class);        job.setNumReduceTasks(0);        Path outputDir = new Path(args[1]);        FileOutputFormat.setOutputPath(job, outputDir);        Boolean isCompletion = job.waitForCompletion(true);        return isCompletion ? 0 : 1;    }    public static void main(String[] args) throws Exception {        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",                "hdfs://hadoop-master:9000/data/topkoutput3" };        int status = new TopKMapReduceV3().run(args);        System.exit(status);    }}

程序四 自定义数据类型加比较器

package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Comparator;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//多个文件,需要reduce统计top npublic class TopKMapReduceV4 {    static class TopKMapper extends            Mapper {        @Override        public void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String lineValue = value.toString();            String[] strs = lineValue.split("\t");            long tempValue = Long.valueOf(strs[1]);            context.write(new Text(strs[0]), new LongWritable(tempValue));        }        @Override        public void cleanup(Context context) throws IOException,                InterruptedException {            super.cleanup(context);        }        @Override        public void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }    }    public static class TopKReducer extends            Reducer {        public static final int K = 3;// 前三名        TreeSet topSet = new TreeSet(//                new Comparator() {                    @Override                    public int compare(TopKWritable o1, TopKWritable o2) {                        return o1.getCount().compareTo(o2.getCount());                    }                });        @Override        public void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }        @Override        public void reduce(Text key, Iterable values,                Context context) throws IOException, InterruptedException {            long count = 0;            for (LongWritable value : values) {                count += value.get();            }            topSet.add(new TopKWritable(key.toString(), count));            if (topSet.size() > K) {                topSet.remove(topSet.first());            }        }        @Override        public void cleanup(Context context) throws IOException,                InterruptedException {                        for (TopKWritable top : topSet) {                context.write(new Text(top.getWord()),                        new LongWritable(top.getCount()));            }        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, TopKMapReduceV4.class.getSimpleName());        job.setJarByClass(TopKMapReduceV4.class);        Path inputDir = new Path(args[0]);        FileInputFormat.addInputPath(job, inputDir);        job.setInputFormatClass(TextInputFormat.class);        job.setMapperClass(TopKMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        job.setReducerClass(TopKReducer.class);         job.setOutputKeyClass(Text.class);         job.setOutputValueClass(LongWritable.class);        job.setNumReduceTasks(1);        Path outputDir = new Path(args[1]);        FileOutputFormat.setOutputPath(job, outputDir);        Boolean isCompletion = job.waitForCompletion(true);        return isCompletion ? 0 : 1;    }    public static void main(String[] args) throws Exception {        args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput",                "hdfs://hadoop-master:9000/data/topkoutput4" };        int status = new TopKMapReduceV4().run(args);        System.exit(status);    }}
package org.conan.myhadoop.TopKey;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;//自定义数据类型public class TopKWritable implements WritableComparable {    private String word;    private Long count;    public TopKWritable(){};    public TopKWritable(String word,Long count) {        this.set(word, count);    }    public void set(String word,Long count) {        this.word = word;        this.count = count;    }    public String getWord() {        return word;    }    public Long getCount() {        return count;    }    @Override    public void write(DataOutput out) throws IOException {                out.writeUTF(word);        out.writeLong(count);    }    @Override    public void readFields(DataInput in) throws IOException {            this.word=in.readUTF();        this.count=in.readLong();    }        @Override    public int compareTo(TopKWritable o) {        int cmp=this.word.compareTo(o.getWord());        if(0!=cmp){                        return cmp;        }                return this.count.compareTo(o.getCount());    }                @Override    public String toString() {        return word +"\t"+count;    }        @Override    public int hashCode() {        final int prime = 31;        int result = 1;        result = prime * result + ((count == null) ? 0 : count.hashCode());        result = prime * result + ((word == null) ? 0 : word.hashCode());        return result;    }    @Override    public boolean equals(Object obj) {        if (this == obj)            return true;        if (obj == null)            return false;        if (getClass() != obj.getClass())            return false;        TopKWritable other = (TopKWritable) obj;        if (count == null) {            if (other.count != null)                return false;        } else if (!count.equals(other.count))            return false;        if (word == null) {            if (other.word != null)                return false;        } else if (!word.equals(other.word))            return false;        return true;    }                        }

程序五:经典案例

package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** *  数据格式: 语言类别 歌曲名称 收藏次数 播放次数 歌手名称 *  * 需求: 统计前十首播放最多的歌曲名称和次数 *  *  */public class TopKeyMapReduce {    public static final int K = 10;    static class TopKeyMapper extends            Mapper {        @Override        public void map(LongWritable key, Text value, Context context)                throws IOException, InterruptedException {            String lineValue = value.toString();            if (null == lineValue) {                return;            }            String[] strs = lineValue.split("\t");           if (null!=strs&&strs.length==5){                              String languageType=strs[0];               String singName=strs[1];               String playTimes=strs[3];               context.write(//                       new Text(languageType+"\t"+ singName),//                       new LongWritable(Long.valueOf(playTimes)));           }                                }        @Override        public void cleanup(Context context) throws IOException,                InterruptedException {            super.cleanup(context);        }        @Override        public void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }    }    public static class TopKeyReducer extends            Reducer {        TreeSet topSet = new TreeSet();        @Override        public void setup(Context context) throws IOException,                InterruptedException {            super.setup(context);        }        @Override        public void reduce(Text key, Iterable values,                Context context) throws IOException, InterruptedException {            if (null==key){                return;            }                        String[] splited =key.toString().split("\t");            if(null==splited||splited.length==0){                return ;            }                        String languageType=splited[0];            String singName=splited[1];                                    Long playTimes=0L;             for (LongWritable value : values) {                playTimes += value.get();            }            topSet.add(new TopKeyWritable(languageType, singName, playTimes));                    if (topSet.size() > K) {                topSet.remove(topSet.last());            }        }        @Override        public void cleanup(Context context) throws IOException,                InterruptedException {            for (TopKeyWritable top : topSet) {                context.write(top,NullWritable.get());            }        }    }    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = new Job(conf, TopKeyMapReduce.class.getSimpleName());        job.setJarByClass(TopKeyMapReduce.class);        Path inputDir = new Path(args[0]);        FileInputFormat.addInputPath(job, inputDir);        job.setInputFormatClass(TextInputFormat.class);                job.setMapperClass(TopKeyMapper.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);                job.setReducerClass(TopKeyReducer.class);        job.setOutputKeyClass(TopKeyWritable.class);        job.setOutputValueClass(NullWritable.class);        job.setNumReduceTasks(1);        Path outputDir = new Path(args[1]);        FileOutputFormat.setOutputPath(job, outputDir);        Boolean isCompletion = job.waitForCompletion(true);        return isCompletion ? 0 : 1;    }    public static void main(String[] args) throws Exception {        args = new String[] { "hdfs://hadoop-master:9000/data/topkey/input",                "hdfs://hadoop-master:9000/data/topkey/output" };        int status = new TopKMapReduceV4().run(args);        System.exit(status);    }}
package org.conan.myhadoop.TopKey;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class TopKeyWritable implements WritableComparable {    String languageType;    String singName;    Long playTimes;    public TopKeyWritable() {    };    public TopKeyWritable(String languageType, String singName, Long playTimes) {        this.set(languageType, singName, playTimes);    };    public void set(String languageType, String singName, Long playTimes) {        this.languageType = languageType;        this.singName = singName;        this.playTimes = playTimes;    }    public String getLanguageType() {        return languageType;    }    public String getSingName() {        return singName;    }    public Long getPlayTimes() {        return playTimes;    }    @Override    public void readFields(DataInput in) throws IOException {        this.languageType = in.readUTF();        this.singName = in.readUTF();        this.playTimes = in.readLong();    }    @Override    public void write(DataOutput out) throws IOException {        out.writeUTF(languageType);        out.writeUTF(singName);        out.writeLong(playTimes);    }    @Override    public int compareTo(TopKeyWritable o) {        // 加个负号倒排序        return -(this.getPlayTimes().compareTo(o.getPlayTimes()));    }    @Override    public String toString() {        return languageType + "\t" + singName + "\t" + playTimes;    }    @Override    public int hashCode() {        final int prime = 31;        int result = 1;        result = prime * result                + ((languageType == null) ? 0 : languageType.hashCode());        result = prime * result                + ((playTimes == null) ? 0 : playTimes.hashCode());        result = prime * result                + ((singName == null) ? 0 : singName.hashCode());        return result;    }    @Override    public boolean equals(Object obj) {        if (this == obj)            return true;        if (obj == null)            return false;        if (getClass() != obj.getClass())            return false;        TopKeyWritable other = (TopKeyWritable) obj;        if (languageType == null) {            if (other.languageType != null)                return false;        } else if (!languageType.equals(other.languageType))            return false;        if (playTimes == null) {            if (other.playTimes != null)                return false;        } else if (!playTimes.equals(other.playTimes))            return false;        if (singName == null) {            if (other.singName != null)                return false;        } else if (!singName.equals(other.singName))            return false;        return true;    }}

"TopKey怎么设置分隔符"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0