TopKey怎么设置分隔符
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容介绍了"TopKey怎么设置分隔符"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!key和va
千家信息网最后更新 2025年01月23日TopKey怎么设置分隔符
本篇内容介绍了"TopKey怎么设置分隔符"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
key和value的默认分隔符为tab键
设置分隔符
程序一
package org.conan.myhadoop.TopKey;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件最值public class TopKMapReduce { static class TopKMapper extends Mapper{ // 输出的key private Text mapOutputKey = new Text(); // 输出的value private LongWritable mapOutputValue = new LongWritable(); // 存储最大值和初始值 long topkValue = Long.MIN_VALUE; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); // 中间值 long tempValue = Long.valueOf(strs[1]); if (topkValue < tempValue) { topkValue = tempValue; mapOutputKey.set(strs[0]); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mapOutputValue.set(topkValue); context.write(mapOutputKey, mapOutputValue); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduce.class.getSimpleName()); job.setJarByClass(TopKMapReduce.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // job.setReducerClass(ModuleReducer.class); // job.setOutputKeyClass(LongWritable.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput" }; int status = new TopKMapReduce().run(args); System.exit(status); }}
程序二
package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Set;import java.util.TreeMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件 top n TreeMap实现public class TopKMapReduceV2 { static class TopKMapper extends Mapper{ public static final int K=3;//前三名 private LongWritable mapKey = new LongWritable(); private Text mapValue = new Text(); TreeMap topMap = null;//默认按key的升序排列 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); long tempValue = Long.valueOf(strs[1]); String tempKey=strs[0]; mapKey.set(tempValue); mapValue.set(tempKey); topMap.put(mapKey, mapValue); if(topMap.size()>K){ topMap.remove(topMap.firstKey()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Set keySet= topMap.keySet(); for( LongWritable key:keySet) { context.write(topMap.get(key), key); } } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduceV2.class.getSimpleName()); job.setJarByClass(TopKMapReduceV2.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // job.setReducerClass(ModuleReducer.class); // job.setOutputKeyClass(LongWritable.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput2" }; int status = new TopKMapReduceV2().run(args); System.exit(status); }}
程序三
package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Comparator;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//单文件 top n TreeSet实现public class TopKMapReduceV3 { static class TopKMapper extends Mapper{ public static final int K=3;//前三名 TreeSet topSet = new TreeSet (// new Comparator () { @Override public int compare(TopKWritable o1, TopKWritable o2) { return o1.getCount().compareTo(o2.getCount()); } }) ; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); long tempValue = Long.valueOf(strs[1]); topSet.add(new TopKWritable(strs[0], tempValue)); if(topSet.size()>K){ topSet.remove(topSet.first()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for( TopKWritable top:topSet) { context.write(new Text(top.getWord()), new LongWritable(top.getCount())); } } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduceV3.class.getSimpleName()); job.setJarByClass(TopKMapReduceV3.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // job.setReducerClass(ModuleReducer.class); // job.setOutputKeyClass(LongWritable.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput3" }; int status = new TopKMapReduceV3().run(args); System.exit(status); }}
程序四 自定义数据类型加比较器
package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.Comparator;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;//多个文件,需要reduce统计top npublic class TopKMapReduceV4 { static class TopKMapper extends Mapper{ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); long tempValue = Long.valueOf(strs[1]); context.write(new Text(strs[0]), new LongWritable(tempValue)); } @Override public void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public static class TopKReducer extends Reducer { public static final int K = 3;// 前三名 TreeSet topSet = new TreeSet (// new Comparator () { @Override public int compare(TopKWritable o1, TopKWritable o2) { return o1.getCount().compareTo(o2.getCount()); } }); @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value : values) { count += value.get(); } topSet.add(new TopKWritable(key.toString(), count)); if (topSet.size() > K) { topSet.remove(topSet.first()); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { for (TopKWritable top : topSet) { context.write(new Text(top.getWord()), new LongWritable(top.getCount())); } } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduceV4.class.getSimpleName()); job.setJarByClass(TopKMapReduceV4.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(TopKReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setNumReduceTasks(1); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput4" }; int status = new TopKMapReduceV4().run(args); System.exit(status); }}
package org.conan.myhadoop.TopKey;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;//自定义数据类型public class TopKWritable implements WritableComparable{ private String word; private Long count; public TopKWritable(){}; public TopKWritable(String word,Long count) { this.set(word, count); } public void set(String word,Long count) { this.word = word; this.count = count; } public String getWord() { return word; } public Long getCount() { return count; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(word); out.writeLong(count); } @Override public void readFields(DataInput in) throws IOException { this.word=in.readUTF(); this.count=in.readLong(); } @Override public int compareTo(TopKWritable o) { int cmp=this.word.compareTo(o.getWord()); if(0!=cmp){ return cmp; } return this.count.compareTo(o.getCount()); } @Override public String toString() { return word +"\t"+count; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((count == null) ? 0 : count.hashCode()); result = prime * result + ((word == null) ? 0 : word.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; TopKWritable other = (TopKWritable) obj; if (count == null) { if (other.count != null) return false; } else if (!count.equals(other.count)) return false; if (word == null) { if (other.word != null) return false; } else if (!word.equals(other.word)) return false; return true; } }
程序五:经典案例
package org.conan.myhadoop.TopKey;import java.io.IOException;import java.util.TreeSet;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * 数据格式: 语言类别 歌曲名称 收藏次数 播放次数 歌手名称 * * 需求: 统计前十首播放最多的歌曲名称和次数 * * */public class TopKeyMapReduce { public static final int K = 10; static class TopKeyMapper extends Mapper{ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); if (null == lineValue) { return; } String[] strs = lineValue.split("\t"); if (null!=strs&&strs.length==5){ String languageType=strs[0]; String singName=strs[1]; String playTimes=strs[3]; context.write(// new Text(languageType+"\t"+ singName),// new LongWritable(Long.valueOf(playTimes))); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public static class TopKeyReducer extends Reducer { TreeSet topSet = new TreeSet (); @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { if (null==key){ return; } String[] splited =key.toString().split("\t"); if(null==splited||splited.length==0){ return ; } String languageType=splited[0]; String singName=splited[1]; Long playTimes=0L; for (LongWritable value : values) { playTimes += value.get(); } topSet.add(new TopKeyWritable(languageType, singName, playTimes)); if (topSet.size() > K) { topSet.remove(topSet.last()); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { for (TopKeyWritable top : topSet) { context.write(top,NullWritable.get()); } } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKeyMapReduce.class.getSimpleName()); job.setJarByClass(TopKeyMapReduce.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKeyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(TopKeyReducer.class); job.setOutputKeyClass(TopKeyWritable.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(1); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/topkey/input", "hdfs://hadoop-master:9000/data/topkey/output" }; int status = new TopKMapReduceV4().run(args); System.exit(status); }}
package org.conan.myhadoop.TopKey;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class TopKeyWritable implements WritableComparable{ String languageType; String singName; Long playTimes; public TopKeyWritable() { }; public TopKeyWritable(String languageType, String singName, Long playTimes) { this.set(languageType, singName, playTimes); }; public void set(String languageType, String singName, Long playTimes) { this.languageType = languageType; this.singName = singName; this.playTimes = playTimes; } public String getLanguageType() { return languageType; } public String getSingName() { return singName; } public Long getPlayTimes() { return playTimes; } @Override public void readFields(DataInput in) throws IOException { this.languageType = in.readUTF(); this.singName = in.readUTF(); this.playTimes = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(languageType); out.writeUTF(singName); out.writeLong(playTimes); } @Override public int compareTo(TopKeyWritable o) { // 加个负号倒排序 return -(this.getPlayTimes().compareTo(o.getPlayTimes())); } @Override public String toString() { return languageType + "\t" + singName + "\t" + playTimes; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((languageType == null) ? 0 : languageType.hashCode()); result = prime * result + ((playTimes == null) ? 0 : playTimes.hashCode()); result = prime * result + ((singName == null) ? 0 : singName.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; TopKeyWritable other = (TopKeyWritable) obj; if (languageType == null) { if (other.languageType != null) return false; } else if (!languageType.equals(other.languageType)) return false; if (playTimes == null) { if (other.playTimes != null) return false; } else if (!playTimes.equals(other.playTimes)) return false; if (singName == null) { if (other.singName != null) return false; } else if (!singName.equals(other.singName)) return false; return true; }}
"TopKey怎么设置分隔符"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
程序
分隔符
文件
名称
数据
次数
三名
输出
内容
更多
案例
歌曲
知识
类型
统计
实用
最大
学有所成
接下来
升序
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发许可证怎么办理
哪里有学软件开发工程师的
开学前 网络安全检查
网络安全生物安全总体国家安全观
商城软件开发价位
使用r语言爬数据库
dota国内没有服务器吗
分析管理软件开发商
交警到监测站检查公安网络安全
哪个软件开发工资高
底层软件开发招聘信息
软件开发部门结构的规划
网络技术转让货源充足
网络安全总监吴迪
零元代还软件开发
网络安全模范范文
群晖nas作为服务器
亚马逊服务器虚化
软件开发中poc是什么
it软件开发外包的流程
数据库物理模型设计表格
数据库安全技术角度
上海聚群软件开发有限公司
web服务器和中间件
药品毒性数据库
河南常用软件开发价钱
死亡骑士t11数据库
spoon 数据库查询
网络安全行业竞争态势
cf服务器人数已满2019