千家信息网

hadoop中mapreduce如何实现串联执行

发表于:2024-12-03 作者:千家信息网编辑
千家信息网最后更新 2024年12月03日,小编给大家分享一下hadoop中mapreduce如何实现串联执行,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!impor
千家信息网最后更新 2024年12月03日hadoop中mapreduce如何实现串联执行

小编给大家分享一下hadoop中mapreduce如何实现串联执行,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

import java.io.IOException;import java.util.Iterator;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PickMain {        private static final Log LOG = LogFactory.getLog(PickMain.class);        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {/*               * Configuration conf = new Configuration();                Job job1 = Job.getInstance(conf);                                job1.setJarByClass(PickMain.class);                job1.setMapperClass(FindMapper.class);                job1.setReducerClass(FindReducer.class);                job1.setOutputKeyClass(Text.class);                job1.setOutputValueClass(Text.class);                                FileInputFormat.addInputPath(job1, new Path(args[0]));                FileOutputFormat.setOutputPath(job1, new Path(args[1]));                                boolean flag1 = job1.waitForCompletion(true);                //下面这种方法也可以实现串联执行job                if(flag1) {                        Job job2 = Job.getInstance(conf);                                job2.setJarByClass(PickMain.class);                        job2.setMapperClass(SecondFindMapper.class);                        job2.setReducerClass(SecondFindReducer.class);                        job2.setOutputKeyClass(Text.class);                        job2.setOutputValueClass(Text.class);                                                FileInputFormat.addInputPath(job2, new Path(args[1]));                        FileOutputFormat.setOutputPath(job2, new Path(args[2]));                                                boolean flag2 = job2.waitForCompletion(true);                        System.out.println(flag2?0:1);                        if(flag2) {                                LOG.info("The job is done!");                                                          System.exit(0);                        }else {                                LOG.info("The Second job is wrong!");                                System.exit(1);                        }                                                         }else {                        LOG.info("The firt job is Running Wrong  job break!");                        System.exit(1);                }                                */                                                //下面通过使用ContolledJob和JobControl来实现提交多个作业                                                Configuration conf = new Configuration();                Job job1 = Job.getInstance(conf);                                job1.setJarByClass(PickMain.class);                job1.setMapperClass(FindMapper.class);                job1.setReducerClass(FindReducer.class);                job1.setOutputKeyClass(Text.class);                job1.setOutputValueClass(Text.class);                                FileInputFormat.addInputPath(job1, new Path(args[0]));                FileOutputFormat.setOutputPath(job1, new Path(args[1]));                                Configuration conf2 = new Configuration();                                Job job2 = Job.getInstance(conf2);                                job2.setJarByClass(PickMain.class);                job2.setMapperClass(SecondFindMapper.class);                job2.setReducerClass(SecondFindReducer.class);                job2.setOutputKeyClass(Text.class);                job2.setOutputValueClass(Text.class);                                FileInputFormat.addInputPath(job2, new Path(args[1]));                FileOutputFormat.setOutputPath(job2, new Path(args[2]));                //创建ControlledJob对job进行包装                ControlledJob cjob1 = new ControlledJob(conf);                ControlledJob cjob2 = new ControlledJob(conf2);                cjob1.setJob(job1);                cjob2.setJob(job2);                //设置依赖关系,这个时候只有等到job1执行完成后job2才会执行                cjob2.addDependingJob(cjob1);                                //JobControl该类相当于一个job控制器,它是一个线程,需要通过线程启动                JobControl jc = new JobControl("my_jobcontrol");                jc.addJob(cjob1);                jc.addJob(cjob2);                Thread th = new Thread(jc);                th.start();                //等到所有的job都执行完成后在退出                while(!jc.allFinished()) {                        Thread.sleep(5000);                }                System.exit(0);                        }}class FindMapper extends Mapper{        Text m1 = new Text();        Text m2 = new Text();        @Override        protected void map(LongWritable key, Text value, Mapper.Context context)                        throws IOException, InterruptedException {                String line = value.toString();                String[] tmp1 = line.split(":");                String outval = tmp1[0];                String[] outkeys = tmp1[1].split(",");                for(int i = 0 ; i{        StringBuilder sb = new StringBuilder();        NullWritable nul = NullWritable.get();        Text outval = new Text();        String spector = ":";        @Override        protected void reduce(Text txt, Iterable txtiter, Reducer.Context context)                        throws IOException, InterruptedException {                sb.delete(0, sb.length());                sb.append(txt.toString());                Iterator it = txtiter.iterator();                while(it.hasNext()) {                        sb.append(spector+it.next().toString());                }                outval.set(sb.toString());                context.write(outval, nul);        }        }class SecondFindMapper extends Mapper{        Text keyout = new Text();        Text valueout = new Text();        @Override        protected void map(LongWritable key, Text value, Mapper.Context context)                        throws IOException, InterruptedException {                String[] fs = value.toString().split(":");                valueout.set(fs[0]);                if(fs.length>0) {                        for(int i = 1;i(int)fs[j].toCharArray()[0]) {                                                keyout.set(fs[j]+"-"+fs[i]);                                        }else {                                                keyout.set(fs[i]+"-"+fs[j]);                                        }                                                                                       context.write(keyout, valueout);                                                                        }                        }                                        }                       }       }class  SecondFindReducer extends Reducer{        StringBuilder sb = new StringBuilder();        Text outvalue = new Text();        @Override        protected void reduce(Text key, Iterable iter, Reducer.Context context)                        throws IOException, InterruptedException {                sb.delete(0, sb.length());                Iterator it =  iter.iterator();                if(it.hasNext()) {                        sb.append(it.next().toString());                }                                while(it.hasNext()) {                        sb.append(","+it.next().toString());                }                outvalue.set(sb.toString());                context.write(key, outvalue);                      }       }

以上是"hadoop中mapreduce如何实现串联执行"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0