千家信息网

hadoop如何自定义格式化输出

发表于:2024-12-12 作者:千家信息网编辑
千家信息网最后更新 2024年12月12日,这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。import java.io.IOException;import
千家信息网最后更新 2024年12月12日hadoop如何自定义格式化输出

这篇文章给大家分享的是有关hadoop如何自定义格式化输出的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

import java.io.IOException;import java.net.URI;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CustomizeOutputFormat {        static final Log LOG = LogFactory.getLog(CustomizeOutputFormat.class);        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {                Configuration conf = new Configuration();                Job job = Job.getInstance(conf);                                job.setJarByClass(CustomizeOutputFormat.class);                job.setMapperClass(CustMapper.class);                                job.setMapOutputKeyClass(Text.class);                job.setMapOutputValueClass(Text.class);                //此处这只自定义的格式化输出                job.setOutputFormatClass(CustOutputFormat.class);                String jobName = "Customize outputformat test!";                job.setJobName(jobName);                FileInputFormat.addInputPath(job, new Path(args[0]));                FileOutputFormat.setOutputPath(job, new Path(args[1]));                                boolean b = job.waitForCompletion(true);                if(b) {                        LOG.info("Job "+ jobName +" is done.");                                        }else {                        LOG.info("Job "+ jobName +"is going wrong,now exit.");                        System.exit(0);                }                        }}class CustMapper extends Mapper{        String[] textIn = null;        Text outkey = new Text();        Text outvalue = new Text();        @Override        protected void map(LongWritable key, Text value, Mapper.Context context)                        throws IOException, InterruptedException {                /**                 * 假设文件的内容为如下:                 * boys       girls                 * firends goodbye                 * down up                 * fly to                 * neibors that                 *                  */                textIn = value.toString().split("\t");                outkey.set(textIn[0]);                outvalue.set(textIn[1]);                context.write(outkey, outvalue);                   }       }//自定义OutoutFormatclass CustOutputFormat extends FileOutputFormat{        @Override        public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {                //获得configration                Configuration conf = context.getConfiguration();                //获得FileSystem                FileSystem fs =  FileSystem.newInstance(conf);                //获得输出路径                Path path = CustOutputFormat.getOutputPath(context);                URI uri = path.toUri();                //创建两个文件,得到写入流                FSDataOutputStream foa = fs.create(new Path(uri.toString()+"/out.a"));                FSDataOutputStream fob = fs.create(new Path(uri.toString()+"/out.b"));                    //创建自定义RecordWriter  传入 两个流                CustRecordWriter rw = new CustRecordWriter(foa,fob);                return rw;                        }                        class CustRecordWriter extends RecordWriter{                 FSDataOutputStream foa = null;                 FSDataOutputStream fob = null;                CustRecordWriter(FSDataOutputStream foa,FSDataOutputStream fob){                        this.foa = foa;                        this.fob = fob;                }                @Override                public void write(Text key, Text value) throws IOException, InterruptedException {                        String mText  = key.toString();                        //根据可以长度的不同分别输入到不同的文件                        if(mText.length()>=5) {                                foa.writeUTF(mText+"\t"+value.toString()+"\n");                        }else {                                fob.writeUTF(mText+"\t"+value.toString()+"\n");                        }                }                @Override                public void close(TaskAttemptContext context) throws IOException, InterruptedException {                        //最后将两个写入流关闭                        if(foa!=null) {                                foa.close();                        }                        if(fob!=null) {                                fob.close();                        }                }        }        }//使用MultipleInputs,c处理多个来源的文件package hgs.multipuleinput;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import hgs.custsort.SortBean;import hgs.custsort.SortDriver;import hgs.custsort.SortMapper;import hgs.custsort.SortReducer;public class MultipuleInputDriver {        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {                Configuration conf = new Configuration();                Job job = Job.getInstance(conf);                                job.setJarByClass(SortDriver.class);                job.setMapperClass(SortMapper.class);                job.setReducerClass(SortReducer.class);                job.setOutputKeyClass(SortBean.class);                job.setOutputValueClass(NullWritable.class);                                MultipleInputs.addInputPath(job, new Path("/sort"), TextInputFormat.class,SortMapper.class);                MultipleInputs.addInputPath(job, new Path("/sort1"), TextInputFormat.class,SortMapper.class);                //FileInputFormat.setInputPaths(job, new Path("/sort"));                FileOutputFormat.setOutputPath(job, new Path("/sortresult"));                System.exit(job.waitForCompletion(true)==true?0:1);        }}

感谢各位的阅读!关于"hadoop如何自定义格式化输出"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

0