hadoop中mapreduce如何实现串联执行
发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,小编给大家分享一下hadoop中mapreduce如何实现串联执行,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!impor
千家信息网最后更新 2025年02月01日hadoop中mapreduce如何实现串联执行
小编给大家分享一下hadoop中mapreduce如何实现串联执行,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
import java.io.IOException;import java.util.Iterator;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PickMain { private static final Log LOG = LogFactory.getLog(PickMain.class); public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {/* * Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(PickMain.class); job1.setMapperClass(FindMapper.class); job1.setReducerClass(FindReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); boolean flag1 = job1.waitForCompletion(true); //下面这种方法也可以实现串联执行job if(flag1) { Job job2 = Job.getInstance(conf); job2.setJarByClass(PickMain.class); job2.setMapperClass(SecondFindMapper.class); job2.setReducerClass(SecondFindReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); boolean flag2 = job2.waitForCompletion(true); System.out.println(flag2?0:1); if(flag2) { LOG.info("The job is done!"); System.exit(0); }else { LOG.info("The Second job is wrong!"); System.exit(1); } }else { LOG.info("The firt job is Running Wrong job break!"); System.exit(1); } */ //下面通过使用ContolledJob和JobControl来实现提交多个作业 Configuration conf = new Configuration(); Job job1 = Job.getInstance(conf); job1.setJarByClass(PickMain.class); job1.setMapperClass(FindMapper.class); job1.setReducerClass(FindReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1])); Configuration conf2 = new Configuration(); Job job2 = Job.getInstance(conf2); job2.setJarByClass(PickMain.class); job2.setMapperClass(SecondFindMapper.class); job2.setReducerClass(SecondFindReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(args[1])); FileOutputFormat.setOutputPath(job2, new Path(args[2])); //创建ControlledJob对job进行包装 ControlledJob cjob1 = new ControlledJob(conf); ControlledJob cjob2 = new ControlledJob(conf2); cjob1.setJob(job1); cjob2.setJob(job2); //设置依赖关系,这个时候只有等到job1执行完成后job2才会执行 cjob2.addDependingJob(cjob1); //JobControl该类相当于一个job控制器,它是一个线程,需要通过线程启动 JobControl jc = new JobControl("my_jobcontrol"); jc.addJob(cjob1); jc.addJob(cjob2); Thread th = new Thread(jc); th.start(); //等到所有的job都执行完成后在退出 while(!jc.allFinished()) { Thread.sleep(5000); } System.exit(0); }}class FindMapper extends Mapper{ Text m1 = new Text(); Text m2 = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String line = value.toString(); String[] tmp1 = line.split(":"); String outval = tmp1[0]; String[] outkeys = tmp1[1].split(","); for(int i = 0 ; i { StringBuilder sb = new StringBuilder(); NullWritable nul = NullWritable.get(); Text outval = new Text(); String spector = ":"; @Override protected void reduce(Text txt, Iterable txtiter, Reducer .Context context) throws IOException, InterruptedException { sb.delete(0, sb.length()); sb.append(txt.toString()); Iterator it = txtiter.iterator(); while(it.hasNext()) { sb.append(spector+it.next().toString()); } outval.set(sb.toString()); context.write(outval, nul); } }class SecondFindMapper extends Mapper { Text keyout = new Text(); Text valueout = new Text(); @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String[] fs = value.toString().split(":"); valueout.set(fs[0]); if(fs.length>0) { for(int i = 1;i (int)fs[j].toCharArray()[0]) { keyout.set(fs[j]+"-"+fs[i]); }else { keyout.set(fs[i]+"-"+fs[j]); } context.write(keyout, valueout); } } } } }class SecondFindReducer extends Reducer { StringBuilder sb = new StringBuilder(); Text outvalue = new Text(); @Override protected void reduce(Text key, Iterable iter, Reducer .Context context) throws IOException, InterruptedException { sb.delete(0, sb.length()); Iterator it = iter.iterator(); if(it.hasNext()) { sb.append(it.next().toString()); } while(it.hasNext()) { sb.append(","+it.next().toString()); } outvalue.set(sb.toString()); context.write(key, outvalue); } }
以上是"hadoop中mapreduce如何实现串联执行"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
篇文章
内容
线程
不怎么
只有
多个
大部分
控制器
方法
时候
更多
知识
行业
资讯
资讯频道
频道
作业
包装
参考
学习
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
个人述职 软件开发
浪潮服务器bios
网络技术工程师岗位理解
以后什么软件开发挣钱
南通app手机软件开发公司
三级网络技术免费网课
网络技术发展的重要性
石家庄学院数据库考试题
蚌埠软件开发公司哪家好
对前沿网络技术的探究
计算机网络安全 教案
抽盒软件开发
联想服务器虚拟化建设
网络安全实验教程沈鑫电子版
天融信服务器管理口
谷歌系列进军网络安全
阿里云服务器企业版费用
网络安全管理职责
网络安全cfs比赛
云计算需要网络技术的支持吗
软件开发转Linux
南京网络服务器机柜生产厂家
断网后服务器怎么联网
计算机网络安全技术具有
软件开发的文档可有可无
数据库建表约束怎么建
上海服务器电源哪家可靠
广医二院网络安全
网络中英文检索数据库
2020网络安全知识内容