千家信息网

Hadoop的多文件输出及自定义文件名方法是什么

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,本篇内容介绍了"Hadoop的多文件输出及自定义文件名方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够
千家信息网最后更新 2025年02月06日Hadoop的多文件输出及自定义文件名方法是什么

本篇内容介绍了"Hadoop的多文件输出及自定义文件名方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

首先是输出格式的类,也就是job.setOutputFormatClass(……)参数列表中的类:

public class MoreFileOutputFormat extends Multiple{  @Override  protected String generateFileNameForKeyValue(Text key, Text value,Configuration conf)   {      return "Your name";  }}

这里,继承Multiple类后必须重写generateFileNameForKeyValue()方法,这个方法返回的字符串作为输出文件的文件名。内容有各位自己根据需要编写。同时,key和value的值也根据自己的需要更换。

接下来是Multiple模板类的代码:

import java.io.DataOutputStream;import java.io.IOException;import java.util.HashMap;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.GzipCodec;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.ReflectionUtils;public abstract class Multiple, V extends Writable>  extends FileOutputFormat {   // 接口类,需要在调用程序中实现generateFileNameForKeyValue来获取文件名   private MultiRecordWriter writer = null;   public RecordWriter getRecordWriter(TaskAttemptContext job)     throws IOException, InterruptedException    {        if (writer == null)         {             writer = new MultiRecordWriter(job, getTaskOutputPath(job));        }        return writer;   }       /**    * get task output path    *     * @param conf    * @return    * @throws IOException    */   private Path getTaskOutputPath(TaskAttemptContext conf) throws IOException   {        Path workPath = null;        OutputCommitter committer = super.getOutputCommitter(conf);        if (committer instanceof FileOutputCommitter)         {             workPath = ((FileOutputCommitter) committer).getWorkPath();        }         else         {             Path outputPath = super.getOutputPath(conf);             if (outputPath == null)              {                  throw new IOException("Undefined job output-path");             }             workPath = outputPath;        }        return workPath;   }       //继承后重写以获得文件名   protected abstract String generateFileNameForKeyValue(K key, V value,Configuration conf);       //实现记录写入器RecordWriter类 (内部类)   public class MultiRecordWriter extends RecordWriter    {        /** RecordWriter的缓存 */        private HashMap> recordWriters = null;        private TaskAttemptContext job = null;                /** 输出目录 */        private Path workPath = null;        public MultiRecordWriter(TaskAttemptContext job, Path workPath)         {             super();             this.job = job;             this.workPath = workPath;             recordWriters = new HashMap>();        }                  @Override        public void close(TaskAttemptContext context) throws IOException,          InterruptedException         {             Iterator> values = this.recordWriters.values().iterator();             while (values.hasNext())              {                  values.next().close(context);             }             this.recordWriters.clear();        }                  @Override        public void write(K key, V value) throws IOException,          InterruptedException         {             // 得到输出文件名             String baseName = generateFileNameForKeyValue(key, value,job.getConfiguration());             // 如果recordWriters里没有文件名,那么就建立。否则就直接写值。             RecordWriter rw = this.recordWriters.get(baseName);             if (rw == null)              {                  rw = getBaseRecordWriter(job, baseName);                  this.recordWriters.put(baseName, rw);             }             rw.write(key, value);        }                  // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension}        private RecordWriter getBaseRecordWriter(TaskAttemptContext job,          String baseName) throws IOException, InterruptedException         {             Configuration conf = job.getConfiguration();             // 查看是否使用解码器             boolean isCompressed = getCompressOutput(job);             RecordWriter recordWriter = null;             if (isCompressed)              {                  Class codecClass = getOutputCompressorClass(                    job, GzipCodec.class);                  CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf);                  Path file = new Path(workPath, baseName + codec.getDefaultExtension());                  FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);                  // 这里我使用的自定义的OutputFormat                  recordWriter = new MyRecordWriter(new DataOutputStream(                    codec.createOutputStream(fileOut)));             }              else              {                  Path file;                  System.out.println("workPath = " + workPath + ", basename = " + baseName);                  file = new Path(workPath, baseName);                  FSDataOutputStream fileOut = file.getFileSystem(conf).create(file, false);                  // 这里我使用的自定义的OutputFormat                  recordWriter = new MyRecordWriter(fileOut);             }             return recordWriter;        }   }}

现在来实现Multiple的内部类MultiRecordWriter中的MyRecordWriter类以实现自己想要的输出方式:

public class MyRecordWriter extends RecordWriter{   private static final String utf8 = "UTF-8";//定义字符编码格式        protected DataOutputStream out;            public MyRecordWriter(DataOutputStream out)    {        this.out = out;     }          private void writeObject(Object o) throws IOException    {        if (o instanceof Text)        {             Text to = (Text) o;             out.write(to.getBytes(), 0, to.getLength());        }        else        {               //输出成字节流。如果不是文本类的,请更改此处             out.write(o.toString().getBytes(utf8));        }   }        /**     * 将mapreduce的key,value以自定义格式写入到输出流中     */   public synchronized void write(K key, V value) throws IOException   {        writeObject(value);   }          public synchronized void close(TaskAttemptContext context) throws IOException   {        out.close();   } }

这个类中还有其它集中方法,不过笔者不需要那些方法,所以把它们都删除了,但最初的文件也删除了- -,所以现在找不到了。请大家见谅。

现在,只需在main()或者run()函数中将job的输出格式设置成MoreFileOutputFormat类就行了,如下:

job.setOutputFormatClass(MoreFileOutputFormatClass);

"Hadoop的多文件输出及自定义文件名方法是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0