TopKey怎么设置分隔符
发表于:2025-02-24 作者:千家信息网编辑
千家信息网最后更新 2025年02月24日,本篇内容介绍了"TopKey怎么设置分隔符"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!key和va
千家信息网最后更新 2025年02月24日TopKey怎么设置分隔符
本篇内容介绍了"TopKey怎么设置分隔符"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
key和value的默认分隔符为tab键
设置分隔符
程序一
package org.conan.myhadoop.TopKey;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件最值public class TopKMapReduce { static class TopKMapper extends Mapper{ // 输出的key private Text mapOutputKey = new Text(); // 输出的value private LongWritable mapOutputValue = new LongWritable(); // 存储最大值和初始值 long topkValue = Long.MIN_VALUE; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); // 中间值 long tempValue = Long.valueOf(strs[1]); if (topkValue < tempValue) { topkValue = tempValue; mapOutputKey.set(strs[0]); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mapOutputValue.set(topkValue); context.write(mapOutputKey, mapOutputValue); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduce.class.getSimpleName()); job.setJarByClass(TopKMapReduce.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // job.setReducerClass(ModuleReducer.class); // job.setOutputKeyClass(LongWritable.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput" }; int status = new TopKMapReduce().run(args); System.exit(status); }}
程序二
package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Set;import java.util.TreeMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件 top n TreeMap实现public class TopKMapReduceV2 { static class TopKMapper extends Mapper{ public static final int K=3;//前三名 private LongWritable mapKey = new LongWritable(); private Text mapValue = new Text(); TreeMap topMap = null;//默认按key的升序排列 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); long tempValue = Long.valueOf(strs[1]); String tempKey=strs[0]; mapKey.set(tempValue); mapValue.set(tempKey); topMap.put(mapKey, mapValue); if(topMap.size()>K){ topMap.remove(topMap.firstKey()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Set keySet= topMap.keySet(); for( LongWritable key:keySet) { context.write(topMap.get(key), key); } } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduceV2.class.getSimpleName()); job.setJarByClass(TopKMapReduceV2.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // job.setReducerClass(ModuleReducer.class); // job.setOutputKeyClass(LongWritable.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput2" }; int status = new TopKMapReduceV2().run(args); System.exit(status); }}
程序三
package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Comparator;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件 top n TreeSet实现public class TopKMapReduceV3 { static class TopKMapper extends Mapper{ public static final int K=3;//前三名 TreeSet topSet = new TreeSet (// new Comparator () { @Override public int compare(TopKWritable o1, TopKWritable o2) { return o1.getCount().compareTo(o2.getCount()); } }) ; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); long tempValue = Long.valueOf(strs[1]); topSet.add(new TopKWritable(strs[0], tempValue)); if(topSet.size()>K){ topSet.remove(topSet.first()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for( TopKWritable top:topSet) { context.write(new Text(top.getWord()), new LongWritable(top.getCount())); } } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduceV3.class.getSimpleName()); job.setJarByClass(TopKMapReduceV3.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // job.setReducerClass(ModuleReducer.class); // job.setOutputKeyClass(LongWritable.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput3" }; int status = new TopKMapReduceV3().run(args); System.exit(status); }}
程序四 自定义数据类型加比较器
package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Comparator;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//多个文件,需要reduce统计top npublic class TopKMapReduceV4 { static class TopKMapper extends Mapper{ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); long tempValue = Long.valueOf(strs[1]); context.write(new Text(strs[0]), new LongWritable(tempValue)); } @Override public void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public static class TopKReducer extends Reducer { public static final int K = 3;// 前三名 TreeSet topSet = new TreeSet (// new Comparator () { @Override public int compare(TopKWritable o1, TopKWritable o2) { return o1.getCount().compareTo(o2.getCount()); } }); @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value : values) { count += value.get(); } topSet.add(new TopKWritable(key.toString(), count)); if (topSet.size() > K) { topSet.remove(topSet.first()); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { for (TopKWritable top : topSet) { context.write(new Text(top.getWord()), new LongWritable(top.getCount())); } } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduceV4.class.getSimpleName()); job.setJarByClass(TopKMapReduceV4.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(TopKReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setNumReduceTasks(1); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput4" }; int status = new TopKMapReduceV4().run(args); System.exit(status); }}
package org.conan.myhadoop.TopKey;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;//自定义数据类型public class TopKWritable implements WritableComparable{ private String word; private Long count; public TopKWritable(){}; public TopKWritable(String word,Long count) { this.set(word, count); } public void set(String word,Long count) { this.word = word; this.count = count; } public String getWord() { return word; } public Long getCount() { return count; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(word); out.writeLong(count); } @Override public void readFields(DataInput in) throws IOException { this.word=in.readUTF(); this.count=in.readLong(); } @Override public int compareTo(TopKWritable o) { int cmp=this.word.compareTo(o.getWord()); if(0!=cmp){ return cmp; } return this.count.compareTo(o.getCount()); } @Override public String toString() { return word +"\t"+count; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((count == null) ? 0 : count.hashCode()); result = prime * result + ((word == null) ? 0 : word.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; TopKWritable other = (TopKWritable) obj; if (count == null) { if (other.count != null) return false; } else if (!count.equals(other.count)) return false; if (word == null) { if (other.word != null) return false; } else if (!word.equals(other.word)) return false; return true; } }
程序五:经典案例
package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * 数据格式: 语言类别 歌曲名称 收藏次数 播放次数 歌手名称 * * 需求: 统计前十首播放最多的歌曲名称和次数 * * */public class TopKeyMapReduce { public static final int K = 10; static class TopKeyMapper extends Mapper{ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); if (null == lineValue) { return; } String[] strs = lineValue.split("\t"); if (null!=strs&&strs.length==5){ String languageType=strs[0]; String singName=strs[1]; String playTimes=strs[3]; context.write(// new Text(languageType+"\t"+ singName),// new LongWritable(Long.valueOf(playTimes))); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public static class TopKeyReducer extends Reducer { TreeSet topSet = new TreeSet (); @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { if (null==key){ return; } String[] splited =key.toString().split("\t"); if(null==splited||splited.length==0){ return ; } String languageType=splited[0]; String singName=splited[1]; Long playTimes=0L; for (LongWritable value : values) { playTimes += value.get(); } topSet.add(new TopKeyWritable(languageType, singName, playTimes)); if (topSet.size() > K) { topSet.remove(topSet.last()); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { for (TopKeyWritable top : topSet) { context.write(top,NullWritable.get()); } } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKeyMapReduce.class.getSimpleName()); job.setJarByClass(TopKeyMapReduce.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKeyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(TopKeyReducer.class); job.setOutputKeyClass(TopKeyWritable.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(1); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/topkey/input", "hdfs://hadoop-master:9000/data/topkey/output" }; int status = new TopKMapReduceV4().run(args); System.exit(status); }}
package org.conan.myhadoop.TopKey;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class TopKeyWritable implements WritableComparable{ String languageType; String singName; Long playTimes; public TopKeyWritable() { }; public TopKeyWritable(String languageType, String singName, Long playTimes) { this.set(languageType, singName, playTimes); }; public void set(String languageType, String singName, Long playTimes) { this.languageType = languageType; this.singName = singName; this.playTimes = playTimes; } public String getLanguageType() { return languageType; } public String getSingName() { return singName; } public Long getPlayTimes() { return playTimes; } @Override public void readFields(DataInput in) throws IOException { this.languageType = in.readUTF(); this.singName = in.readUTF(); this.playTimes = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(languageType); out.writeUTF(singName); out.writeLong(playTimes); } @Override public int compareTo(TopKeyWritable o) { // 加个负号倒排序 return -(this.getPlayTimes().compareTo(o.getPlayTimes())); } @Override public String toString() { return languageType + "\t" + singName + "\t" + playTimes; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((languageType == null) ? 0 : languageType.hashCode()); result = prime * result + ((playTimes == null) ? 0 : playTimes.hashCode()); result = prime * result + ((singName == null) ? 0 : singName.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; TopKeyWritable other = (TopKeyWritable) obj; if (languageType == null) { if (other.languageType != null) return false; } else if (!languageType.equals(other.languageType)) return false; if (playTimes == null) { if (other.playTimes != null) return false; } else if (!playTimes.equals(other.playTimes)) return false; if (singName == null) { if (other.singName != null) return false; } else if (!singName.equals(other.singName)) return false; return true; }}
"TopKey怎么设置分隔符"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
程序
分隔符
文件
名称
数据
次数
三名
输出
内容
更多
案例
歌曲
知识
类型
统计
实用
最大
学有所成
接下来
升序
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
关于软件开发的毕业设计
国家层面网络安全
铁路网络安全事件分几级
软件开发 取费标准
网络安全保障体系及措施
网上租服务器是怎么租的
sdmaid优化数据库有什么用
疫情下的网络安全问题
服务器的内存有什么用
access数据库字段命名
台服dnf云服务器
基层公安网络安全员
lol手游内测服务器结束时间
攻击联通服务器
计算机网络技术入门基础书籍
女生学习软件开发
万维数据库app下载
西安aqq软件开发公司
蓬莱游戏软件开发哪家好
网页数据库制作文件
服务器主机主板橙色灯常亮
推广软件开发服务哪些行业
软件开发培训学时
四大基础数据库 如何
阿里新数据库
分级保护 软件开发
数据库emp表格
德清智辰网络技术
投影仪服务器异常怎样解决
与网络安全有关的手抄报内容