如何实现RecordReader按行读取
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,小编给大家分享一下如何实现RecordReader按行读取,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!public cl
千家信息网最后更新 2025年02月02日如何实现RecordReader按行读取
小编给大家分享一下如何实现RecordReader按行读取,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
public class CustomLineRecordReader extends RecordReader{ private long start; private long pos; private long end; private LineReader in; private int maxLineLength; private LongWritable key = new LongWritable(); private Text value = new Text(); private static final Log LOG = LogFactory.getLog( CustomLineRecordReader.class); /** * From Design Pattern, O'Reilly... * This method takes as arguments the map task's assigned InputSplit and * TaskAttemptContext, and prepares the record reader. For file-based input * formats, this is a good place to seek to the byte position in the file to * begin reading. */ @Override public void initialize( InputSplit genericSplit, TaskAttemptContext context) throws IOException { // This InputSplit is a FileInputSplit FileSplit split = (FileSplit) genericSplit; // Retrieve configuration, and Max allowed // bytes for a single record Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt( "mapred.linerecordreader.maxlength", Integer.MAX_VALUE); // Split "S" is responsible for all records // starting from "start" and "end" positions start = split.getStart(); end = start + split.getLength(); // Retrieve file containing Split "S" final Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); // If Split "S" starts at byte 0, first line will be processed // If Split "S" does not start at byte 0, first line has been already // processed by "S-1" and therefore needs to be silently ignored boolean skipFirstLine = false; if (start != 0) { skipFirstLine = true; // Set the file pointer at "start - 1" position. // This is to make sure we won't miss any line // It could happen if "start" is located on a EOL --start; fileIn.seek(start); } in = new LineReader(fileIn, job); // If first line needs to be skipped, read first line // and stores its content to a dummy Text if (skipFirstLine) { Text dummy = new Text(); // Reset "start" to "start + line offset" start += in.readLine(dummy, 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start)); } // Position is the actual start this.pos = start; } /** * From Design Pattern, O'Reilly... * Like the corresponding method of the InputFormat class, this reads a * single key/ value pair and returns true until the data is consumed. */ @Override public boolean nextKeyValue() throws IOException { // Current offset is the key key.set(pos); int newSize = 0; // Make sure we get at least one record that starts in this Split while (pos < end) { // Read first line and store its content to "value" newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min( Integer.MAX_VALUE, end - pos), maxLineLength)); // No byte read, seems that we reached end of Split // Break and return false (no key / value) if (newSize == 0) { break; } // Line is read, new position is set pos += newSize; // Line is lower than Maximum record line size // break and return true (found key / value) if (newSize < maxLineLength) { break; } // Line is too long // Try again with position = position + line offset, // i.e. ignore line and go to next one // TODO: Shouldn't it be LOG.error instead ?? LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); } if (newSize == 0) { // We've reached end of Split key = null; value = null; return false; } else { // Tell Hadoop a new line has been found // key / value will be retrieved by // getCurrentKey getCurrentValue methods return true; } } /** * From Design Pattern, O'Reilly... * This methods are used by the framework to give generated key/value pairs * to an implementation of Mapper. Be sure to reuse the objects returned by * these methods if at all possible! */ @Override public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } /** * From Design Pattern, O'Reilly... * This methods are used by the framework to give generated key/value pairs * to an implementation of Mapper. Be sure to reuse the objects returned by * these methods if at all possible! */ @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } /** * From Design Pattern, O'Reilly... * Like the corresponding method of the InputFormat class, this is an * optional method used by the framework for metrics gathering. */ @Override public float getProgress() throws IOException, InterruptedException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } /** * From Design Pattern, O'Reilly... * This method is used by the framework for cleanup after there are no more * key/value pairs to process. */ @Override public void close() throws IOException { if (in != null) { in.close(); } } }
以上是"如何实现RecordReader按行读取"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
篇文章
内容
不怎么
大部分
更多
知识
行业
资讯
资讯频道
频道
i.e.
参考
学习
帮助
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
抖音视频发布在服务器保存多久
军备竞争恐怖主义网络安全
网络安全张建辉
无权限连接数据库
腾讯网络安全总监
有关网络安全宣传作文
数据库中怎么创建选择查询
企业网络安全方案设计与实现
自己搭建多ip服务器
ppt课件下载网络安全
软件开发的主要模式有哪些
来钱快好学的网络技术广告
辽宁通讯软件开发服务推广
盘锦网络技术有限公司
农村无线广播服务器
石家庄软件开发学多久
高淇软件开发常用词汇密码
DB2数据库vchar
泉州互联网科技有限公司
学校网络安全监督说明
家居智能软件开发公司简介
湖南网络安全培训高品质的教学
阿里特招网络安全
火山软件开发案例教程
王者荣耀任天堂服务器
系统小说软件开发
南宁调度服务器费用
周村客户管理软件开发公司
网络安全配备工具
Db2数据库导出文件错行