hive中对多字节的处理
现在有一批数据:
01||zs||1802||ls||1903||jj||10
每一行的数据的分割符是||,是一个多字节的分隔符,默认的hive只支持单字节的分隔符,上面的数据时||多字节,不支持。
解决方案:
method01:使用 RegexSerDe 通过正则表达式来抽取字段
#建表语句create table t_bi_reg(id string,name string,age string ) row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe' with serdeproperties('input.regex'='((.\*)\\\\|\\\\|(.\*)\\\\|\\\\|(.\*))','output.format.string'='%1$s %2$s %3s')#input.regex:指定切分的正则#output.format.string:切分之后输出的字段#加载数据load data local inpath '/Hadoop/data/1.txt' into table t_bi_reg#查询select * from t_bi_reg;
method2:修改源码:
原理是在 inputformat 读取行的时候将数据中的"多字节分隔符"替换为 hive 默认的分隔 符(ctrl+A 亦即 \001)或用于替代的单字符分隔符,以便 hive 在 serde 操作时按照默认的 单字节分隔符进行字段抽取。
代码
com.zy.hive.delimit2.BiDelimiterInputFormat:
package com.zy.hive.delimit2;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.InputSplit;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.RecordReader;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextInputFormat;public class BiDelimiterInputFormat extends TextInputFormat {@Overridepublic RecordReader getRecordReader(InputSplit genericSplit,JobConf job, Reporter reporter)throws IOException {reporter.setStatus(genericSplit.toString());BiRecordReader reader = new BiRecordReader(job,(FileSplit)genericSplit);// MyRecordReader reader = new MyRecordReader(job,(FileSplit)genericSplit);return reader;}}
com.zy.hive.delimit2.BiRecordReader
package com.zy.hive.delimit2;import java.io.IOException;import java.io.InputStream;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.Seekable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.compress.CodecPool;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.io.compress.Decompressor;import org.apache.hadoop.io.compress.SplitCompressionInputStream;import org.apache.hadoop.io.compress.SplittableCompressionCodec;import org.apache.hadoop.mapred.FileSplit;import org.apache.hadoop.mapred.LineRecordReader;import org.apache.hadoop.mapred.RecordReader;public class BiRecordReader implements RecordReader {private static final Log LOG = LogFactory.getLog(LineRecordReader.class.getName());private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private LineReader in;int maxLineLength;private Seekable filePosition;private CompressionCodec codec;private Decompressor decompressor;/*** A class that provides a line reader from an input stream.* @deprecated Use {@link org.apache.hadoop.util.LineReader} instead.*/@Deprecatedpublic static class LineReader extends org.apache.hadoop.util.LineReader {LineReader(InputStream in) {super(in);}LineReader(InputStream in, int bufferSize) {super(in, bufferSize);}public LineReader(InputStream in, Configuration conf)throws IOException {Stay hungry Stay foolish -- http://blog.csdn.net/zhongqi2513super(in, conf);}}public BiRecordReader(Configuration job, FileSplit split) throws IOException {this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);start = split.getStart();end = start + split.getLength();final Path file = split.getPath();compressionCodecs = new CompressionCodecFactory(job);codec = compressionCodecs.getCodec(file);// open the file and seek to the start of the splitFileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(split.getPath());if (isCompressedInput()) {decompressor = CodecPool.getDecompressor(codec);if (codec instanceof SplittableCompressionCodec) {final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(fileIn, decompressor, start, end,SplittableCompressionCodec.READ_MODE.BYBLOCK);in = new LineReader(cIn, job);start = cIn.getAdjustedStart();end = cIn.getAdjustedEnd();filePosition = cIn; // take pos from compressed stream} else {in = new LineReader(codec.createInputStream(fileIn,decompressor), job);filePosition = fileIn;}} else {fileIn.seek(start);in = new LineReader(fileIn, job);filePosition = fileIn;}// If this is not the first split, we always throw away first record// because we always (except the last split) read one extra line in// next() method.if (start != 0) {start += in.readLine(new Text(), 0, maxBytesToConsume(start));}this.pos = start;Stay hungry Stay foolish -- http://blog.csdn.net/zhongqi2513}private boolean isCompressedInput() {return (codec != null);}private int maxBytesToConsume(long pos) {return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);}private long getFilePosition() throws IOException {long retVal;if (isCompressedInput() && null != filePosition) {retVal = filePosition.getPos();} else {retVal = pos;}return retVal;}public BiRecordReader(InputStream in, long offset, long endOffset,int maxLineLength) {this.maxLineLength = maxLineLength;this.in = new LineReader(in);this.start = offset;this.pos = offset;this.end = endOffset;this.filePosition = null;}public BiRecordReader(InputStream in, long offset, long endOffset,Configuration job) throws IOException {this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);this.in = new LineReader(in, job);this.start = offset;this.pos = offset;this.end = endOffset;this.filePosition = null;}public LongWritable createKey() {return new LongWritable();Stay hungry Stay foolish -- http://blog.csdn.net/zhongqi2513}public Text createValue() {return new Text();}/** Read a line. */public synchronized boolean next(LongWritable key, Text value)throws IOException {// We always read one extra line, which lies outside the upper// split limit i.e. (end - 1)while (getFilePosition() <= end) {key.set(pos);int newSize = in.readLine(value,maxLineLength,Math.max(maxBytesToConsume(pos), maxLineLength));String str = value.toString().replaceAll("\\|\\|", "\\|");value.set(str);pos += newSize;if (newSize == 0) {return false;}if (newSize < maxLineLength) {return true;}// line too long. try againLOG.info("Skipped line of size " + newSize + " at pos "+ (pos - newSize));}return false;}/*** Get the progress within the split*/public float getProgress() throws IOException {if (start == end) {return 0.0f;} else {return Math.min(1.0f, (getFilePosition() - start)Stay hungry Stay foolish -- http://blog.csdn.net/zhongqi2513/ (float) (end - start));}}public synchronized long getPos() throws IOException {return pos;}public synchronized void close() throws IOException {try {if (in != null) {in.close();}} finally {if (decompressor != null) {CodecPool.returnDecompressor(decompressor);}}}}
注意:上述代码中的 api 全部使用 hadoop 的老 api 接口 org.apache.hadoop.mapred". 然后将工程打包,并拷贝至 hive 安装目录的 lib 文件夹中,并重启 hive,使用以下语句建表 即可
具体步骤:
hive> add jar /home/hadoop/apps/hive/lib/myinput.jar #将jar包导入hive的classpath中
hive> create table new_bi(id string,name string) #建表
row format delimited
fields terminated by '|'
stored as inputformat 'com.zy.hive.delimit2.BiDelimiterInputFormat' outputformat
outputformat
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
hive> load data local inpath '/Hadoop/data/1.txt' into table t_bi_reg #导入数据
hive>select * from t_bi_reg #查询