hadoop中mapreduce如何自定义InputFormat
发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,这篇文章主要介绍了hadoop中mapreduce如何自定义InputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。首先我
千家信息网最后更新 2025年02月06日hadoop中mapreduce如何自定义InputFormat
这篇文章主要介绍了hadoop中mapreduce如何自定义InputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
首先我们要先定义一个类继承FileInputFormat,并重写createRecordReader方法返回RecordReader,然后定义一个类继承RecordReader,createRecordReader方法返回也就是我们定义的RecordReader的子类的对象。
代码如下
public class TrackInputFormat extends FileInputFormat{ @Override public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { // TODO Auto-generated method stub return new TrackRecordReader(); }}
package input;import java.io.IOException;import java.io.InputStream;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.log4j.Logger;/** * Treats keys as offset in file and value as line. * * @deprecated Use * {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader} * instead. */public class TrackRecordReader extends RecordReader{ Logger logger = Logger.getLogger(TrackRecordReader.class.getName()); private CompressionCodecFactory compressionCodecs = null; private long start; private long pos; private long end; private NewLineReader in; private int maxLineLength; private LongWritable key = null; private Text value = null; // ---------------------- // 行分隔符,即一条记录的分隔符 private byte[] separator = "]@\n".getBytes(); // -------------------- public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); //mapreduce.input.linerecordreader.line.maxlength this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE); start = split.getStart(); end = start + split.getLength(); final Path file = split.getPath(); //logger.info("path========================="+file.toString()); compressionCodecs = new CompressionCodecFactory(job); final CompressionCodec codec = compressionCodecs.getCodec(file); FileSystem fs = file.getFileSystem(job); FSDataInputStream fileIn = fs.open(split.getPath()); boolean skipFirstLine = false; //logger.info("codec========================="+codec); if (codec != null) { in = new NewLineReader(codec.createInputStream(fileIn), job); end = Long.MAX_VALUE; } else { if (start != 0) { skipFirstLine = true; this.start -= separator.length;// // --start; fileIn.seek(start); } in = new NewLineReader(fileIn, job); } if (skipFirstLine) { // skip first line and re-establish "start". start += in.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE, end - start)); } this.pos = start; /*if (skipFirstLine) { int newSize = in.readLine(new Text(), 0, (int) Math.min( (long) Integer.MAX_VALUE, end - start)); if(newSize > 0){ start += newSize; } }*/ } public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; while (pos < end) { newSize = in.readLine(value, maxLineLength, Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), maxLineLength)); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } } if (newSize == 0) { //读取下一个buffer key = null; value = null; return false; } else { //读同一个buffer的下一个记录 return true; } } @Override public LongWritable getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** * Get the progress within the split */ public float getProgress() { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float) (end - start)); } } public synchronized void close() throws IOException { if (in != null) { in.close(); } } public class NewLineReader { private static final int DEFAULT_BUFFER_SIZE = 256 * 1024* 1024; private int bufferSize = DEFAULT_BUFFER_SIZE; private InputStream in; private byte[] buffer; private int bufferLength = 0; private int bufferPosn = 0; public NewLineReader(InputStream in) { this(in, DEFAULT_BUFFER_SIZE); } public NewLineReader(InputStream in, int bufferSize) { this.in = in; this.bufferSize = bufferSize; this.buffer = new byte[this.bufferSize]; } public NewLineReader(InputStream in, Configuration conf) throws IOException { this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); } public void close() throws IOException { in.close(); } public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { str.clear(); Text record = new Text(); int txtLength = 0; long bytesConsumed = 0L; boolean newline = false; int sepPosn = 0; do { // 已经读到buffer的末尾了,读下一个buffer if (this.bufferPosn >= this.bufferLength) { bufferPosn = 0; bufferLength = in.read(buffer); // 读到文件末尾了,则跳出,进行下一个文件的读取 if (bufferLength <= 0) { break; } } int startPosn = this.bufferPosn; for (; bufferPosn < bufferLength; bufferPosn++) { // 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题) if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) { sepPosn = 0; } // 遇到行分隔符的第一个字符 if (buffer[bufferPosn] == separator[sepPosn]) { bufferPosn++; int i = 0; // 判断接下来的字符是否也是行分隔符中的字符 for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) { // buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半 if (bufferPosn + i >= bufferLength) { bufferPosn += i - 1; break; } // 一旦其中有一个字符不相同,就判定为不是分隔符 if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) { sepPosn = 0; break; } } // 的确遇到了行分隔符 if (sepPosn == separator.length) { bufferPosn += i; newline = true; sepPosn = 0; break; } } } int readLength = this.bufferPosn - startPosn; bytesConsumed += readLength; // 行分隔符不放入块中 if (readLength > maxLineLength - txtLength) { readLength = maxLineLength - txtLength; } if (readLength > 0) { record.append(this.buffer, startPosn, readLength); txtLength += readLength; // 去掉记录的分隔符 if (newline) { str.set(record.getBytes(), 0, record.getLength() - separator.length); } } } while (!newline && (bytesConsumed < maxBytesToConsume)); if (bytesConsumed > (long) Integer.MAX_VALUE) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } return (int) bytesConsumed; } public int readLine(Text str, int maxLineLength) throws IOException { return readLine(str, maxLineLength, Integer.MAX_VALUE); } public int readLine(Text str) throws IOException { return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); } }}
private byte[] separator = "]@\n".getBytes();
感谢你能够认真阅读完这篇文章,希望小编分享的"hadoop中mapreduce如何自定义InputFormat"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!
分隔符
字符
篇文章
文件
方法
相同
接下来
也就是
代码
价值
兴趣
同时
子类
对象
更多
朋友
末尾
知识
编带
行业
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器编码是多少
数据库中如何按照尾字母排序
on在数据库中是什么意思
桔子互联网科技有限公司
c控服务器
我国软件开发公司
hivemc服务器
7000计算机网络技术
网络安全教育培训机制
网络技术人员留学理由书
数据库 百分比
教育部大学生网络安全宣传周
大华网络安全研究院怎么样
数据库最大表行数据类型
东南大学考研网络安全专业
拓力软件开发
护苗 网络安全 标语
软件开发的阶段会计划分
南宁的互联网科技公司有哪些
软件开发教程自学
数据库系统怎么看表的分布
pop制作软件怎么连接数据库
央企网络安全大会 公安部
网络安全手抄报有关内容
自动化服务器主机维修
星际家园免费服务器商城
访问数据库常用工具
是不是软件开发坑很多
王者荣耀角色怎么更改服务器
芯片验证和芯片软件开发