mapreduce 模板代码
发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,jai包 org.apache.hadoop hadoop-core 1.2.12.x以后就拆成一些零散的包了,没有core包了代码:package org.conan.myhado
千家信息网最后更新 2025年02月01日mapreduce 模板代码
jai包
org.apache.hadoop hadoop-core 1.2.1
2.x以后就拆成一些零散的包了,没有core包了
代码:
package org.conan.myhadoop.mr;import java.io.IOException;import org.apache.hadoop.conf.Configuration;//org.apache.hadoop.mapred 老系统的包//org.apache.hadoop.mapreduce 新系统的包 import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/* * ModuleMapReduce Class * 单纯的注释 */public class ModuleMapReduce extends Configured implements Tool { /** * * ModuleMapper Class 不仅有注释的功效而且你鼠标放在你注释的方法上面他会把你注释的内容显示出来, * */ public static class ModuleMapper extends Mapper{ @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO } @Override public void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } /** * * ModuleReducer Class * */ public static class ModuleReducer extends Reducer { @Override public void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.setup(context); } @Override protected void reduce(LongWritable key, Iterable value, Context context) throws IOException, InterruptedException { // TODO } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } // Driver 驱动 // @Override //实现接口时关键字1.5和1.7的JDK都会报错,只有1.6不报错 public int run(String[] args) throws Exception { Job job = parseInputAndOutput(this, this.getConf(), args); // 2.set job // step 1:set input job.setInputFormatClass(TextInputFormat.class); // step 3:set mappper class job.setMapperClass(ModuleMapper.class); // step 4:set mapout key/value class job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); // step 5:set shuffle(sort,combiner,group) // set sort job.setSortComparatorClass(LongWritable.Comparator.class); // set combiner(optional,default is unset)必须是Reducer的子类 job.setCombinerClass(ModuleReducer.class); // set grouping job.setGroupingComparatorClass(LongWritable.Comparator.class); // step 6 set reducer class job.setReducerClass(ModuleReducer.class); // step 7:set job output key/value class job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // step 8:set output format job.setOutputFormatClass(FileOutputFormat.class); // step 10: submit job Boolean isCompletion = job.waitForCompletion(true);// 提交job return isCompletion ? 0 : 1; } public Job parseInputAndOutput(Tool tool, Configuration conf, String[] args) throws IOException { // 输入参数的合法性 if (args.length != 2) { System.err.printf( "Usage: %s [generic options]
倒排索引代码
输入文件如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789 112
15699807656 110
011-678987 112
说明:每一行为一条电话通话记录,左边的号码(记为a)打给右边的号码(记为b号码),中间用空格隔开
要求:
将以上文件以如下格式输出:
110 18987655436|15699807656
112 13588888888|011-678987
13509098987 13678987879
说明:左边为被呼叫的号码b,右边为呼叫b的号码a以"|"分割
package org.conan.myhadoop.mr;import java.io.IOException;import java.text.DateFormat;import java.text.SimpleDateFormat;import java.util.Date;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.*;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class ReverseIndex extends Configured implements Tool { enum Counter { LINESKIP, // 出错的行 } public static class Map extends Mapper { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // 读取源数据 try { // 数据处理 String[] lineSplit = line.split(" "); String anum = lineSplit[0]; String bnum = lineSplit[1]; context.write(new Text(bnum), new Text(anum)); // 输出 } catch (java.lang.ArrayIndexOutOfBoundsException e) { context.getCounter(Counter.LINESKIP).increment(1); // 出错hang计数器+1 return; } } } public static class Reduce extends Reducer { public void reduce(Text key, Iterablevalues, Context context) throws IOException, InterruptedException { String valueString; String out = ""; for (Text value : values) { valueString = value.toString(); out += valueString + "|"; System.out.println("Ruduce:key=" + key + " value=" + value); } context.write(key, new Text(out)); } } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); Job job = new Job(conf, "ReverseIndex"); // 任务名 job.setJarByClass(ReverseIndex.class); // 指定Class FileInputFormat.addInputPath(job, new Path(args[0])); // 输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 输出路径 job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码 job.setReducerClass(ReverseIndex.Reduce.class); // 调用上面Reduce类作为Reduce任务代码 job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式 job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式 job.waitForCompletion(true); // 输出任务完成情况 System.out.println("任务名称:" + job.getJobName()); System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否")); System.out.println("输入行数:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue()); System.out.println("输出行数:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue()); System.out.println("跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue()); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws Exception { // 判断参数个数是否正确 // 如果无参数运行则显示以作程序说明 if (args.length != 2) { System.err.println(""); System.err .println("Usage: ReverseIndex < input path > < output path > "); System.err .println("Example: hadoop jar ~/ReverseIndex.jar hdfs://localhost:9000/in/telephone.txt hdfs://localhost:9000/out"); System.exit(-1); } // 记录开始时间 DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date start = new Date(); // 运行任务 int res = ToolRunner.run(new Configuration(), new ReverseIndex(), args); // 输出任务耗时 Date end = new Date(); float time = (float) ((end.getTime() - start.getTime()) / 60000.0); System.out.println("任务开始:" + formatter.format(start)); System.out.println("任务结束:" + formatter.format(end)); System.out.println("任务耗时:" + String.valueOf(time) + " 分钟"); System.exit(res); } }
去重代码
//Mapper任务 static class DDMap extends Mapper{ private static Text line = new Text(); protected void map(LongWritable k1,Text v1,Context context){ line = v1; Text text = new Text(""); try { context.write(line,text); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; } //Reducer任务 static class DDReduce extends Reducer { protected void reduce(Text k2,Iterable v2s,Context context){ Text text = new Text(""); try { context.write(k2, text); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; }
参考文章;
一个经典的MapReduce模板代码,倒排索引(ReverseIndex)
http://blog.itpub.net/26400547/viewspace-1214945/
详解MapReduce实现数据去重与倒排索引应用场景案例
http://www.tuicool.com/articles/emi6Fb
任务
输出
代码
号码
注释
输入
参数
数据
格式
索引
运行
右边
字符
字符串
文件
程序
系统
路径
呼叫
应用
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器续费
供货系统数据库查询例题
福建工程学院网络安全专业校区
教育信息网络安全研修总结
数据库技术的广泛应用
软件开发工程师用什么电脑比较好
上海质量软件开发出厂价格
软件开发流程 模板
jdbc工厂数据库配对操作
软件开发QA考试
驻马店网络技术参考价格
电脑服务器可以装显卡吗
幻塔初音未来数据库码
软件开发整个流程
java 云服务器debug
你碰到过哪些网络安全问题
度盘网络安全问题
网络安全靶场ppt
关系型数据库系统英文
松原OA软件开发招聘
使用数据库应用系统教学设计
obs推流b站无法连接服务器
网络安全技能表格
高中信息技术数据库管理技术
网络广告机软件开发
数据库和人的关系是什么意思
数据库的序号
饥荒本地服务器无响应怎么办
服务器登陆记录
系统的数据库配置文件