hadoop中mapreduce如何实现串联执行
发表于:2024-12-03 作者:千家信息网编辑
千家信息网最后更新 2024年12月03日,小编给大家分享一下hadoop中mapreduce如何实现串联执行,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!impor
千家信息网最后更新 2024年12月03日hadoop中mapreduce如何实现串联执行
小编给大家分享一下hadoop中mapreduce如何实现串联执行,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
import java.io.IOException;import java.util.Iterator;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PickMain { private static final Log LOG = LogFactory.getLog(PickMain.class); public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {/* * Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(PickMain.class); job1.setMapperClass(FindMapper.class); job1.setReducerClass(FindReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); boolean flag1 = job1.waitForCompletion(true); //下面这种方法也可以实现串联执行job if(flag1) { Job job2 = Job.getInstance(conf); job2.setJarByClass(PickMain.class); job2.setMapperClass(SecondFindMapper.class); job2.setReducerClass(SecondFindReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); boolean flag2 = job2.waitForCompletion(true); System.out.println(flag2?0:1); if(flag2) { LOG.info("The job is done!"); System.exit(0); }else { LOG.info("The Second job is wrong!"); System.exit(1); } }else { LOG.info("The firt job is Running Wrong job break!"); System.exit(1); } */ //下面通过使用ContolledJob和JobControl来实现提交多个作业 Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(PickMain.class); job1.setMapperClass(FindMapper.class); job1.setReducerClass(FindReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); Configuration conf2 = new Configuration(); Job job2 = Job.getInstance(conf2); job2.setJarByClass(PickMain.class); job2.setMapperClass(SecondFindMapper.class); job2.setReducerClass(SecondFindReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); //创建ControlledJob对job进行包装 ControlledJob cjob1 = new ControlledJob(conf); ControlledJob cjob2 = new ControlledJob(conf2); cjob1.setJob(job1); cjob2.setJob(job2); //设置依赖关系,这个时候只有等到job1执行完成后job2才会执行 cjob2.addDependingJob(cjob1); //JobControl该类相当于一个job控制器,它是一个线程,需要通过线程启动 JobControl jc = new JobControl("my_jobcontrol"); jc.addJob(cjob1); jc.addJob(cjob2); Thread th = new Thread(jc); th.start(); //等到所有的job都执行完成后在退出 while(!jc.allFinished()) { Thread.sleep(5000); } System.exit(0); }}class FindMapper extends Mapper{ Text m1 = new Text(); Text m2 = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String line = value.toString(); String[] tmp1 = line.split(":"); String outval = tmp1[0]; String[] outkeys = tmp1[1].split(","); for(int i = 0 ; i { StringBuilder sb = new StringBuilder(); NullWritable nul = NullWritable.get(); Text outval = new Text(); String spector = ":"; @Override protected void reduce(Text txt, Iterable txtiter, Reducer .Context context) throws IOException, InterruptedException { sb.delete(0, sb.length()); sb.append(txt.toString()); Iterator it = txtiter.iterator(); while(it.hasNext()) { sb.append(spector+it.next().toString()); } outval.set(sb.toString()); context.write(outval, nul); } }class SecondFindMapper extends Mapper { Text keyout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String[] fs = value.toString().split(":"); valueout.set(fs[0]); if(fs.length>0) { for(int i = 1;i (int)fs[j].toCharArray()[0]) { keyout.set(fs[j]+"-"+fs[i]); }else { keyout.set(fs[i]+"-"+fs[j]); } context.write(keyout, valueout); } } } } }class SecondFindReducer extends Reducer { StringBuilder sb = new StringBuilder(); Text outvalue = new Text(); @Override protected void reduce(Text key, Iterable iter, Reducer .Context context) throws IOException, InterruptedException { sb.delete(0, sb.length()); Iterator it = iter.iterator(); if(it.hasNext()) { sb.append(it.next().toString()); } while(it.hasNext()) { sb.append(","+it.next().toString()); } outvalue.set(sb.toString()); context.write(key, outvalue); } }
以上是"hadoop中mapreduce如何实现串联执行"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
篇文章
内容
线程
不怎么
只有
多个
大部分
控制器
方法
时候
更多
知识
行业
资讯
资讯频道
频道
作业
包装
参考
学习
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发mr 什么意思
不会切换回先前连接到的数据库
后面网络安全课观后感
网络安全最新热点
石家庄软件开发前端实习生招聘
哪些数据库是国产的
普陀区综合软件开发协议
郑州中奥网络技术有限公司
ros 数据库
一个人玩云游戏服务器怎么配
c# 获取sql数据库表名
c++是用哪个软件开发
可检索学位论文全文的数据库中文
对于软件开发的认识
怎样设计网络安全图标
如何更改电脑服务器配置
光伏软件开发项目书
前端 获取页面一组数据库
汕头通信软件开发批发价
同业业务 软件开发
德州拓维软件开发
那个云服务器比较便宜
java软件开发原则
机关网络安全执法检查自查表
建立cad数据库
数据库企业是否为国有
辽宁数据库空投箱生产厂家
网络技术是哪一级的
数据库r什么意思
网络安全清风行动总结