千家信息网

hadoop中mapreduce如何自定义InputFormat

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,这篇文章主要介绍了hadoop中mapreduce如何自定义InputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。首先我
千家信息网最后更新 2025年02月06日hadoop中mapreduce如何自定义InputFormat

这篇文章主要介绍了hadoop中mapreduce如何自定义InputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

首先我们要先定义一个类继承FileInputFormat,并重写createRecordReader方法返回RecordReader,然后定义一个类继承RecordReader,createRecordReader方法返回也就是我们定义的RecordReader的子类的对象。

代码如下

public class TrackInputFormat extends FileInputFormat {        @Override        public RecordReader createRecordReader(InputSplit arg0,                        TaskAttemptContext arg1) throws IOException, InterruptedException {                // TODO Auto-generated method stub                return new TrackRecordReader();                        }}



package input;import java.io.IOException;import java.io.InputStream;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.log4j.Logger;/** * Treats keys as offset in file and value as line. *  * @deprecated Use *             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader} *             instead. */public class TrackRecordReader extends RecordReader {        Logger logger = Logger.getLogger(TrackRecordReader.class.getName());        private CompressionCodecFactory compressionCodecs = null;        private long start;        private long pos;        private long end;        private NewLineReader in;        private int maxLineLength;        private LongWritable key = null;        private Text value = null;        // ----------------------        // 行分隔符,即一条记录的分隔符        private byte[] separator = "]@\n".getBytes();        // --------------------        public void initialize(InputSplit genericSplit, TaskAttemptContext context)                        throws IOException {                FileSplit split = (FileSplit) genericSplit;                Configuration job = context.getConfiguration();                //mapreduce.input.linerecordreader.line.maxlength                this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);                start = split.getStart();                end = start + split.getLength();                final Path file = split.getPath();                //logger.info("path========================="+file.toString());                       compressionCodecs = new CompressionCodecFactory(job);                final CompressionCodec codec = compressionCodecs.getCodec(file);                FileSystem fs = file.getFileSystem(job);                FSDataInputStream fileIn = fs.open(split.getPath());                boolean skipFirstLine = false;                //logger.info("codec========================="+codec);                        if (codec != null) {                        in = new NewLineReader(codec.createInputStream(fileIn), job);                        end = Long.MAX_VALUE;                } else {                        if (start != 0) {                                skipFirstLine = true;                                this.start -= separator.length;//                                // --start;                                fileIn.seek(start);                        }                        in = new NewLineReader(fileIn, job);                }                if (skipFirstLine) { // skip first line and re-establish "start".                                start += in.readLine(new Text(), 0,                                        (int) Math.min((long) Integer.MAX_VALUE, end - start));                }                this.pos = start;                /*if (skipFirstLine) {                         int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start));                    if(newSize > 0){                            start += newSize;                    }                }*/        }        public boolean nextKeyValue() throws IOException {                if (key == null) {                        key = new LongWritable();                }                key.set(pos);                if (value == null) {                        value = new Text();                }                int newSize = 0;                while (pos < end) {                        newSize = in.readLine(value, maxLineLength,                                        Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),                                                        maxLineLength));                        if (newSize == 0) {                                break;                        }                        pos += newSize;                        if (newSize < maxLineLength) {                                break;                        }                }                if (newSize == 0) {                        //读取下一个buffer                        key = null;                        value = null;                        return false;                } else {                        //读同一个buffer的下一个记录                        return true;                }        }        @Override        public LongWritable getCurrentKey() {                return key;        }        @Override        public Text getCurrentValue() {                return value;        }        /**         * Get the progress within the split         */        public float getProgress() {                if (start == end) {                        return 0.0f;                } else {                        return Math.min(1.0f, (pos - start) / (float) (end - start));                }        }        public synchronized void close() throws IOException {                if (in != null) {                        in.close();                }        }        public class NewLineReader {                private static final int DEFAULT_BUFFER_SIZE = 256 * 1024* 1024;                private int bufferSize = DEFAULT_BUFFER_SIZE;                private InputStream in;                private byte[] buffer;                private int bufferLength = 0;                private int bufferPosn = 0;                public NewLineReader(InputStream in) {                        this(in, DEFAULT_BUFFER_SIZE);                }                public NewLineReader(InputStream in, int bufferSize) {                        this.in = in;                        this.bufferSize = bufferSize;                        this.buffer = new byte[this.bufferSize];                }                public NewLineReader(InputStream in, Configuration conf)                                throws IOException {                        this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));                }                public void close() throws IOException {                        in.close();                }                public int readLine(Text str, int maxLineLength, int maxBytesToConsume)                                throws IOException {                        str.clear();                        Text record = new Text();                        int txtLength = 0;                        long bytesConsumed = 0L;                        boolean newline = false;                        int sepPosn = 0;                        do {                                // 已经读到buffer的末尾了,读下一个buffer                                if (this.bufferPosn >= this.bufferLength) {                                        bufferPosn = 0;                                        bufferLength = in.read(buffer);                                        // 读到文件末尾了,则跳出,进行下一个文件的读取                                        if (bufferLength <= 0) {                                                break;                                        }                                }                                int startPosn = this.bufferPosn;                                for (; bufferPosn < bufferLength; bufferPosn++) {                                        // 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)                                        if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {                                                sepPosn = 0;                                        }                                        // 遇到行分隔符的第一个字符                                        if (buffer[bufferPosn] == separator[sepPosn]) {                                                bufferPosn++;                                                int i = 0;                                                // 判断接下来的字符是否也是行分隔符中的字符                                                for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {                                                        // buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半                                                        if (bufferPosn + i >= bufferLength) {                                                                bufferPosn += i - 1;                                                                break;                                                        }                                                        // 一旦其中有一个字符不相同,就判定为不是分隔符                                                        if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {                                                                sepPosn = 0;                                                                break;                                                        }                                                }                                                // 的确遇到了行分隔符                                                if (sepPosn == separator.length) {                                                        bufferPosn += i;                                                        newline = true;                                                        sepPosn = 0;                                                        break;                                                }                                        }                                }                                int readLength = this.bufferPosn - startPosn;                                bytesConsumed += readLength;                                // 行分隔符不放入块中                                if (readLength > maxLineLength - txtLength) {                                        readLength = maxLineLength - txtLength;                                }                                if (readLength > 0) {                                        record.append(this.buffer, startPosn, readLength);                                        txtLength += readLength;                                        // 去掉记录的分隔符                                        if (newline) {                                                str.set(record.getBytes(), 0, record.getLength() - separator.length);                                        }                                }                        }                         while (!newline && (bytesConsumed < maxBytesToConsume));                        if (bytesConsumed > (long) Integer.MAX_VALUE) {                                throw new IOException("Too many bytes before newline: "                                                + bytesConsumed);                        }                        return (int) bytesConsumed;                }                public int readLine(Text str, int maxLineLength) throws IOException {                        return readLine(str, maxLineLength, Integer.MAX_VALUE);                }                public int readLine(Text str) throws IOException {                        return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);                }        }}


private byte[] separator = "]@\n".getBytes();

感谢你能够认真阅读完这篇文章,希望小编分享的"hadoop中mapreduce如何自定义InputFormat"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!

0