hadoop中mapreduce如何实现串联执行
发表于:2024-12-03 作者:千家信息网编辑
千家信息网最后更新 2024年12月03日,小编给大家分享一下hadoop中mapreduce如何实现串联执行,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!impor
千家信息网最后更新 2024年12月03日hadoop中mapreduce如何实现串联执行
小编给大家分享一下hadoop中mapreduce如何实现串联执行,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
import java.io.IOException;import java.util.Iterator;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PickMain { private static final Log LOG = LogFactory.getLog(PickMain.class); public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {/* * Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(PickMain.class); job1.setMapperClass(FindMapper.class); job1.setReducerClass(FindReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); boolean flag1 = job1.waitForCompletion(true); //下面这种方法也可以实现串联执行job if(flag1) { Job job2 = Job.getInstance(conf); job2.setJarByClass(PickMain.class); job2.setMapperClass(SecondFindMapper.class); job2.setReducerClass(SecondFindReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); boolean flag2 = job2.waitForCompletion(true); System.out.println(flag2?0:1); if(flag2) { LOG.info("The job is done!"); System.exit(0); }else { LOG.info("The Second job is wrong!"); System.exit(1); } }else { LOG.info("The firt job is Running Wrong job break!"); System.exit(1); } */ //下面通过使用ContolledJob和JobControl来实现提交多个作业 Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(PickMain.class); job1.setMapperClass(FindMapper.class); job1.setReducerClass(FindReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); Configuration conf2 = new Configuration(); Job job2 = Job.getInstance(conf2); job2.setJarByClass(PickMain.class); job2.setMapperClass(SecondFindMapper.class); job2.setReducerClass(SecondFindReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); //创建ControlledJob对job进行包装 ControlledJob cjob1 = new ControlledJob(conf); ControlledJob cjob2 = new ControlledJob(conf2); cjob1.setJob(job1); cjob2.setJob(job2); //设置依赖关系,这个时候只有等到job1执行完成后job2才会执行 cjob2.addDependingJob(cjob1); //JobControl该类相当于一个job控制器,它是一个线程,需要通过线程启动 JobControl jc = new JobControl("my_jobcontrol"); jc.addJob(cjob1); jc.addJob(cjob2); Thread th = new Thread(jc); th.start(); //等到所有的job都执行完成后在退出 while(!jc.allFinished()) { Thread.sleep(5000); } System.exit(0); }}class FindMapper extends Mapper{ Text m1 = new Text(); Text m2 = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String line = value.toString(); String[] tmp1 = line.split(":"); String outval = tmp1[0]; String[] outkeys = tmp1[1].split(","); for(int i = 0 ; i { StringBuilder sb = new StringBuilder(); NullWritable nul = NullWritable.get(); Text outval = new Text(); String spector = ":"; @Override protected void reduce(Text txt, Iterable txtiter, Reducer .Context context) throws IOException, InterruptedException { sb.delete(0, sb.length()); sb.append(txt.toString()); Iterator it = txtiter.iterator(); while(it.hasNext()) { sb.append(spector+it.next().toString()); } outval.set(sb.toString()); context.write(outval, nul); } }class SecondFindMapper extends Mapper { Text keyout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String[] fs = value.toString().split(":"); valueout.set(fs[0]); if(fs.length>0) { for(int i = 1;i (int)fs[j].toCharArray()[0]) { keyout.set(fs[j]+"-"+fs[i]); }else { keyout.set(fs[i]+"-"+fs[j]); } context.write(keyout, valueout); } } } } }class SecondFindReducer extends Reducer { StringBuilder sb = new StringBuilder(); Text outvalue = new Text(); @Override protected void reduce(Text key, Iterable iter, Reducer .Context context) throws IOException, InterruptedException { sb.delete(0, sb.length()); Iterator it = iter.iterator(); if(it.hasNext()) { sb.append(it.next().toString()); } while(it.hasNext()) { sb.append(","+it.next().toString()); } outvalue.set(sb.toString()); context.write(key, outvalue); } }
以上是"hadoop中mapreduce如何实现串联执行"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
篇文章
内容
线程
不怎么
只有
多个
大部分
控制器
方法
时候
更多
知识
行业
资讯
资讯频道
频道
作业
包装
参考
学习
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
云服务器遇到黑客
山东北斗授时服务器云主机
网络安全宣传标语24句
数据库技术电子教案
软件开发部英文
网络安全学科包含
长沙管理软件开发服务
oracle备份空数据库
没有网络安全拦截的浏览器
ktv收银服务器插u盘
数据库建表代码
软件开发商有多少
创城承诺书中的网络安全如何提现
数据库概念设计得出的结果
建设网络安全精英
网络安全检测装置有哪些
带宽 下载服务器
北京好的服务器代理
河北软件开发市场前景如何
聊天室 数据库设计
网络安全动态安全模型
高斯数据库系统配置表
数据库信息管理方面参考文献
数据库出现乱码变量怎么配置
软件开发毕业论文算法
内蒙网络安全保密协议
中国知网中国工具书全文数据库
马鞍山网络安全宣传周
星火矿池日本服务器还能用吗
软件开发需求登记表