千家信息网

如何实现RecordReader按行读取

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,小编给大家分享一下如何实现RecordReader按行读取,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!public cl
千家信息网最后更新 2025年02月02日如何实现RecordReader按行读取

小编给大家分享一下如何实现RecordReader按行读取,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

public class CustomLineRecordReader     extends RecordReader {     private long start;    private long pos;    private long end;    private LineReader in;    private int maxLineLength;    private LongWritable key = new LongWritable();    private Text value = new Text();     private static final Log LOG = LogFactory.getLog(            CustomLineRecordReader.class);     /**     * From Design Pattern, O'Reilly...     * This method takes as arguments the map task's assigned InputSplit and     * TaskAttemptContext, and prepares the record reader. For file-based input     * formats, this is a good place to seek to the byte position in the file to     * begin reading.     */    @Override    public void initialize(            InputSplit genericSplit,             TaskAttemptContext context)            throws IOException {         // This InputSplit is a FileInputSplit        FileSplit split = (FileSplit) genericSplit;         // Retrieve configuration, and Max allowed        // bytes for a single record        Configuration job = context.getConfiguration();        this.maxLineLength = job.getInt(                "mapred.linerecordreader.maxlength",                Integer.MAX_VALUE);         // Split "S" is responsible for all records        // starting from "start" and "end" positions        start = split.getStart();        end = start + split.getLength();         // Retrieve file containing Split "S"        final Path file = split.getPath();        FileSystem fs = file.getFileSystem(job);        FSDataInputStream fileIn = fs.open(split.getPath());         // If Split "S" starts at byte 0, first line will be processed        // If Split "S" does not start at byte 0, first line has been already        // processed by "S-1" and therefore needs to be silently ignored        boolean skipFirstLine = false;        if (start != 0) {            skipFirstLine = true;            // Set the file pointer at "start - 1" position.            // This is to make sure we won't miss any line            // It could happen if "start" is located on a EOL            --start;            fileIn.seek(start);        }         in = new LineReader(fileIn, job);         // If first line needs to be skipped, read first line        // and stores its content to a dummy Text        if (skipFirstLine) {            Text dummy = new Text();            // Reset "start" to "start + line offset"            start += in.readLine(dummy, 0,                    (int) Math.min(                            (long) Integer.MAX_VALUE,                             end - start));        }         // Position is the actual start        this.pos = start;     }     /**     * From Design Pattern, O'Reilly...     * Like the corresponding method of the InputFormat class, this reads a     * single key/ value pair and returns true until the data is consumed.     */    @Override    public boolean nextKeyValue() throws IOException {         // Current offset is the key        key.set(pos);         int newSize = 0;         // Make sure we get at least one record that starts in this Split        while (pos < end) {             // Read first line and store its content to "value"            newSize = in.readLine(value, maxLineLength,                    Math.max((int) Math.min(                            Integer.MAX_VALUE, end - pos),                            maxLineLength));             // No byte read, seems that we reached end of Split            // Break and return false (no key / value)            if (newSize == 0) {                break;            }             // Line is read, new position is set            pos += newSize;             // Line is lower than Maximum record line size            // break and return true (found key / value)            if (newSize < maxLineLength) {                break;            }             // Line is too long            // Try again with position = position + line offset,            // i.e. ignore line and go to next one            // TODO: Shouldn't it be LOG.error instead ??            LOG.info("Skipped line of size " +                     newSize + " at pos "                    + (pos - newSize));        }                  if (newSize == 0) {            // We've reached end of Split            key = null;            value = null;            return false;        } else {            // Tell Hadoop a new line has been found            // key / value will be retrieved by            // getCurrentKey getCurrentValue methods            return true;        }    }     /**     * From Design Pattern, O'Reilly...     * This methods are used by the framework to give generated key/value pairs     * to an implementation of Mapper. Be sure to reuse the objects returned by     * these methods if at all possible!     */    @Override    public LongWritable getCurrentKey() throws IOException,            InterruptedException {        return key;    }     /**     * From Design Pattern, O'Reilly...     * This methods are used by the framework to give generated key/value pairs     * to an implementation of Mapper. Be sure to reuse the objects returned by     * these methods if at all possible!     */    @Override    public Text getCurrentValue() throws IOException, InterruptedException {        return value;    }     /**     * From Design Pattern, O'Reilly...     * Like the corresponding method of the InputFormat class, this is an     * optional method used by the framework for metrics gathering.     */    @Override    public float getProgress() throws IOException, InterruptedException {        if (start == end) {            return 0.0f;        } else {            return Math.min(1.0f, (pos - start) / (float) (end - start));        }    }     /**     * From Design Pattern, O'Reilly...     * This method is used by the framework for cleanup after there are no more     * key/value pairs to process.     */    @Override    public void close() throws IOException {        if (in != null) {            in.close();        }    } }

以上是"如何实现RecordReader按行读取"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!

0