千家信息网

hadoop中的recordreader和split以及block的关系是怎样的

发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,这篇文章主要讲解了"hadoop中的recordreader和split以及block的关系是怎样的",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"ha
千家信息网最后更新 2025年02月07日hadoop中的recordreader和split以及block的关系是怎样的

这篇文章主要讲解了"hadoop中的recordreader和split以及block的关系是怎样的",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"hadoop中的recordreader和split以及block的关系是怎样的"吧!

recordreader的作用不言而喻。

通常来讲,Inputformat会为没有一个split产生一个recordreader来提供给maptask使用,进而,MapTask能够读取属于自己管辖处理的那部分split。

这里面,我们以linerecordreader为例子进行讲解:

几个核心的方法

定义了linerecordreader基本的作用,即,是否有下一对kv,获得下一个key,获得下一个value。

而这个三个方法的使用地方如下。

暂时忽略...


因为文件在hdfs上分块存放的,那么split和block什么鬼?为啥不直接按照block去处理就行了呗。原因呢,是block中的数据可能不是连续的。可能某个重要的信息被两个block分隔了。因此,我们使用逻辑上的概念,即split来处理。

而split并不是真的将文件split了...而是逻辑上的标记下start,length,filepath等即可。

根据Path,可以过得到FileSystem

final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);


每个maptask呢,都会使用使用一个linerecordreader,处理对应的split,中间通过了

private FSDataInputStream fileIn;

来维护一个流。

切记:这里的流并不是只针对这个split的,我们之前说过,split只是标记而已,没有分隔。

因此,这个流fileIn其实是指向整个文件的。

并且呢,这个流呢,会实现jdk中标准的方法,啥read啊之类的。读取到缓冲区中,但是如果涉及到不同的block呢,这个流会自动帮我们去找对应的block的,这个太复杂。反正记住fileIn屏蔽了顶层的不同block之前的切换,对我们来讲就像处理一个大的文件一样。


既然是流,那么就能够定位了,因此,不同的maptask就可以根据自己的split中的start位置,通过fileIn流直接定位到要处理文件的那个地方。

fileIn.seek(start);in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes);filePosition = fileIn;



可以看到其中的in对象,是借助fileIn生成,相比,in内部一定借助了这个fileIn流来实现某个功能。


典型的,readLine,

in对象负责一行的读取逻辑,,而fileIn则负责从文件读取字符到byte缓冲区。

readline函数,最终会有一个这样的抵用,可以看到

bufferLength = fillBuffer(in, buffer, prevCharCR);

调用fillbuffer函数,从in.read()中读取东西到buffer中,

private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)  throws IOException {    /* We're reading data from in, but the head of the stream may be     * already buffered in buffer, so we have several cases:     * 1. No newline characters are in the buffer, so we need to copy     *    everything and read another buffer from the stream.     * 2. An unambiguously terminated line is in buffer, so we just     *    copy to str.     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends     *    in CR.  In this case we copy everything up to CR to str, but     *    we also need to see what follows CR: if it's LF, then we     *    need consume LF as well, so next call to readLine will read     *    from after that.     * We use a flag prevCharCR to signal if previous character was CR     * and, if it happens to be at the end of the buffer, delay     * consuming it until we have a chance to look at the char that     * follows.     */    str.clear();    int txtLength = 0; //tracks str.getLength(), as an optimization    int newlineLength = 0; //length of terminating newline    boolean prevCharCR = false; //true of prev char was CR    long bytesConsumed = 0;    do {      int startPosn = bufferPosn; //starting from where we left off the last time      if (bufferPosn >= bufferLength) {        startPosn = bufferPosn = 0;        if (prevCharCR) {          ++bytesConsumed; //account for CR from previous read        }        bufferLength = fillBuffer(in, buffer, prevCharCR);        if (bufferLength <= 0) {          break; // EOF        }      }      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline        if (buffer[bufferPosn] == LF) {          newlineLength = (prevCharCR) ? 2 : 1;          ++bufferPosn; // at next invocation proceed from following byte          break;        }        if (prevCharCR) { //CR + notLF, we are at notLF          newlineLength = 1;          break;        }        prevCharCR = (buffer[bufferPosn] == CR);      }      int readLength = bufferPosn - startPosn;      if (prevCharCR && newlineLength == 0) {        --readLength; //CR at the end of the buffer      }      bytesConsumed += readLength;      int appendLength = readLength - newlineLength;      if (appendLength > maxLineLength - txtLength) {        appendLength = maxLineLength - txtLength;      }      if (appendLength > 0) {        str.append(buffer, startPosn, appendLength);        txtLength += appendLength;      }    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);    if (bytesConsumed > Integer.MAX_VALUE) {      throw new IOException("Too many bytes before newline: " + bytesConsumed);    }    return (int)bytesConsumed;  }



OK,那之后的linerecordreader三个主要的方法就简答了,读取就行了。略屌。

但是,有一个问题还没说。即一行信息如果被某个block分隔了咋办。

或者这个问题,这样说,我们知道Inputformat中的getSplit方法呢,就是根据文件的length等属性直接划分split的。

参照FileInputformat的getSplits方法


那么一行数据,可能在不同的splits中,也可能在不同的block中。

在不同的block中呢,这个有fileIn对象帮我们处理的了,主要是读取read到缓冲区,属于物理上的问题,不是考虑的地方。

处于不同的split呢?这个情况有些问题,因为不同的split就是不同的划分,并且由不同的map task执行。

那么我们recordreader如何解决这个问题呢?

解决办法便是,突破split的start和end限制。

linerecordreader的解决办法:

只不start指向的位置不是文件的第一行,则默认的过滤掉一行(start位置可能是一行中的某一个位置)。

initialize()方法

if (start != 0) {      start += in.readLine(new Text(), 0, maxBytesToConsume(start));    }this.pos = start;


在nextKeyvalue方法中,多读取一些数据,补充完整的一行。

while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {      if (pos == 0) {        newSize = skipUtfByteOrderMark();      } else {        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));        pos += newSize;      }      if ((newSize == 0) || (newSize < maxLineLength)) {        break;      }      // line too long. try again      LOG.info("Skipped line of size " + newSize + " at pos " +                (pos - newSize));    }

OK,通过过滤掉一行,和多读取一行,就能保证被split分隔的一行,能够完成的读取,同时也不会重复处理一些数据。因为,所有的mapTask的linerecordreader都遵循这个方法。

感谢各位的阅读,以上就是"hadoop中的recordreader和split以及block的关系是怎样的"的内容了,经过本文的学习后,相信大家对hadoop中的recordreader和split以及block的关系是怎样的这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0