千家信息网

hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数

发表于:2025-02-14 作者:千家信息网编辑
千家信息网最后更新 2025年02月14日,小编给大家分享一下hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大
千家信息网最后更新 2025年02月14日hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数

小编给大家分享一下hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

//map读入的键package hgs.combinefileinputformat.test;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;public class CombineFileKey implements  WritableComparable {        private String fileName;        private long offset;                        public String getFileName() {                return fileName;        }        public void setFileName(String fileName) {                this.fileName = fileName;        }        public long getOffset() {                return offset;        }        public void setOffset(long offset) {                this.offset = offset;        }        @Override        public void readFields(DataInput input) throws IOException {                this.fileName = Text.readString(input);                this.offset = input.readLong();                        }        @Override        public void write(DataOutput output) throws IOException {                Text.writeString(output, fileName);                output.writeLong(offset);                        }        @Override        public int compareTo(CombineFileKey obj) {                int f = this.fileName.compareTo(obj.fileName);                if(f==0)                        return (int)Math.signum((double)(this.offset-obj.offset));                return f;        }        @Override        public int hashCode() {                //摘自于 http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/                final int prime = 31;            int result = 1;            result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());            result = prime * result + (int) (offset ^ (offset >>> 32));            return result;        }                @Override        public boolean equals(Object o) {                if(o instanceof CombineFileKey)                        return this.compareTo((CombineFileKey)o)==0;                return false;        }}
package hgs.combinefileinputformat.test;import java.io.IOException;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;import org.apache.hadoop.util.LineReader;public class CombineFileReader extends RecordReader{        private long startOffset; //offset of the chunk;        private long end; //end of the chunk;        private long position; // current pos        private FileSystem fs;        private Path path;         private CombineFileKey key;        private Text value;        private FSDataInputStream input;        private LineReader reader;        public CombineFileReader(CombineFileSplit split,TaskAttemptContext context ,                        Integer index) throws IOException {                //初始化path fs startOffset end                this.path = split.getPath(index);                this.fs = this.path.getFileSystem(context.getConfiguration());                this.startOffset = split.getOffset(index);                this.end = split.getLength()+this.startOffset;                //判断现在开始的位置是否在一行的内部                boolean skipFirstLine = false;                                //open the file                this.input = fs.open(this.path);                //不等于0说明读取位置在一行的内部                if(this.startOffset !=0 ){                        skipFirstLine = true;                        --(this.startOffset);                        //定位到开始读取的位置                        this.input.seek(this.startOffset);                }                //初始化reader                this.reader = new LineReader(input);                if(skipFirstLine){ // skip first line and re-establish "startOffset".                        //这里着这样做的原因是 一行可能包含了这个文件的所有的数据,猜测如果遇到一行的话,还是会读取一行                        //将其实位置调整到一行的开始,这样的话会舍弃部分数据                        this.startOffset += this.reader.readLine(new Text(), 0, (int)Math.min                                                     ((long)Integer.MAX_VALUE, this.end - this.startOffset));                }                this.position = this.startOffset;        }                @Override        public void close() throws IOException {}        @Override        public void initialize(InputSplit splite, TaskAttemptContext context) throws IOException, InterruptedException {}        //返回当前的key        @Override        public CombineFileKey getCurrentKey() throws IOException, InterruptedException {                return key;        }        //返回当前的value        @Override        public Text getCurrentValue() throws IOException, InterruptedException {                return value;        }        //执行的进度        @Override        public float getProgress() throws IOException, InterruptedException {                //返回的类型为float                if(this.startOffset==this.end){                        return 0.0f;                }else{                        return Math.min(1.0f, (this.position - this.startOffset)/(float)(this.end - this.startOffset));                }        }        //该方法判断是否有下一个key value        @Override        public boolean nextKeyValue() throws IOException, InterruptedException {                //对key和value初始化                if(this.key == null){                        this.key = new CombineFileKey();                        this.key.setFileName(this.path.getName());                }                this.key.setOffset(this.position);                if(this.value == null){                        this.value = new Text();                }                //读取一行数据,如果读取的newSieze=0说明split的数据已经处理完成                int newSize = 0;                if(this.position
package hgs.combinefileinputformat.test;import java.io.IOException;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;public class CustCombineInputFormat extends CombineFileInputFormat {        public CustCombineInputFormat(){                super();                //最大切片大小                this.setMaxSplitSize(67108864);//64 MB        }        @Override        public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {                              return new CombineFileRecordReader((CombineFileSplit)split,context,CombineFileReader.class);        }        @Override        protected boolean isSplitable(JobContext context, Path file) {                return false;        }}//驱动类package hgs.test;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import hgs.combinefileinputformat.test.CustCombineInputFormat;public class LetterCountDriver {        public static void main(String[] args) throws Exception {                Configuration conf = new Configuration();                //conf.set("mapreduce.map.log.level", "INFO");                ///conf.set("mapreduce.reduce.log.level", "INFO");                Job job = Job.getInstance(conf, "LetterCount");                job.setJarByClass(hgs.test.LetterCountDriver.class);                // TODO: specify a mapper                job.setMapperClass(LetterCountMapper.class);                // TODO: specify a reducer                job.setReducerClass(LetterReducer.class);                // TODO: specify output types                job.setOutputKeyClass(Text.class);                job.setOutputValueClass(IntWritable.class);                if(args[0].equals("1"))                        job.setInputFormatClass(CustCombineInputFormat.class);                else{}                // TODO: specify input and output DIRECTORIES (not files)                FileInputFormat.setInputPaths(job, new Path("/words"));                FileOutputFormat.setOutputPath(job, new Path("/result"));                if (!job.waitForCompletion(true))                        return;        }}

hdfs文件:

运行结果:不使用自定义的:CustCombineInputFormat

运行结果:在使用自定义的:CustCombineInputFormat

以上是"hadoop如何通过CombineFileInputFormat实现小文件合并减少map的个数"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0