天天看点

详解Hadoop中的LineReader的readLine函数

Hadoop中的LineReader的readLine函数可以说写的很不错,这里结合自己的理解,详细的添加了注释。该函数最精彩的一点就是保证了读取分片的时候不会出现断行,针对不同的文件系统,能够做到正确的判断行结束的位置,从而准确的读出文本中的一行内容。

1、如果当前字符是’\r’,虽然不能立即确定是不是读到行尾了(后面可能跟着’\n’),但是这行的内容已经确定了,就是行结束符的长度没有确定,这个时候需要标记一下,prevCharCR = true,表示读到了一个’\r’,再读下一个字符看是不是’\n’,如果是,根据标记可知这是一个Windows文件,行结束符长度就是2,若不是,可判定为Mac文件,行结束符长度就是1了。

这里有一个特殊的情况,如果’\r’在buffer最后面的一个位置,这时不能确定下一个buffer的开始是一个’\n’,因此需要再加载数据到个buffer(可能已经到了下一个分片)才能确定。

//LineReader类中的部分定义
/*
  private InputStream in;
  private byte[] buffer;
  // the number of bytes of real data in the buffer
  private int bufferLength = ;
  // the current position in the buffer
  private int bufferPosn = ;
 private static final byte CR = '\r';
/private static final byte LF = '\n';
*/
/**
   * Read one line from the InputStream into the given Text.  A line
   * can be terminated by one of the following: '\n' (LF) , '\r' (CR),
   * or '\r\n' (CR+LF).  EOF also terminates an otherwise unterminated
   * line.
   *
   * @param str the object to store the given line (without newline)
   * @param maxLineLength the maximum number of bytes to store into str;
   *  the rest of the line is silently discarded.
   * @param maxBytesToConsume the maximum number of bytes to consume
   *  in this call.  This is only a hint, because if the line cross
   *  this threshold, we allow it to happen.  It can overshoot
   *  potentially by as much as one buffer length.
   *
   * @return the number of bytes read including the (longest) newline
   * found.
   *
   * @throws IOException if the underlying stream throws
   */
  public int readLine(Text str, int maxLineLength,
                      int maxBytesToConsume) throws IOException {
    /* We're reading data from in, but the head of the stream may be
     * already buffered in buffer, so we have several cases:
     *  No newline characters are in the buffer, so we need to copy
     *    everything and read another buffer from the stream.
     *  An unambiguously terminated line is in buffer, so we just
     *    copy to str.
     *  Ambiguously terminated line is in buffer, i.e. buffer ends
     *    in CR.  In this case we copy everything up to CR to str, but
     *    we also need to see what follows CR: if it's LF, then we
     *    need consume LF as well, so next call to readLine will read
     *    from after that.
     * We use a flag prevCharCR to signal if previous character was CR
     * and, if it happens to be at the end of the buffer, delay
     * consuming it until we have a chance to look at the char that
     * follows.
     */
    str.clear();
    int txtLength = ; //tracks str.getLength(), as an optimization
    int newlineLength = ; //length of terminating newline
    boolean prevCharCR = false; //true of prev char was CR
    long bytesConsumed = ;
    do {
      int startPosn = bufferPosn; //bufferPosn这个类成员变量记录着读取buffer的具体位置
      if (bufferPosn >= bufferLength) {//如果之前读取过一个buffer,此时bufferLength=bufferPosn ,或者第一个分片时bufferPosn =bufferLength=0
        startPosn = bufferPosn = ;//重新读取一个buffer
        if (prevCharCR)//这里对应上面说到的特殊情况,如果上一个buffer最后一个字符是'\r',它在上一个buffer中是没有
          ++bytesConsumed; //account for CR from previous read//算进bytesConsumed(为了读取一行数据从buffer中实际读取的字符数),所以这里要算进去
        bufferLength = in.read(buffer);//从缓冲区中读取数据
        if (bufferLength <= )
          break; // EOF
      }
      for (; bufferPosn < bufferLength; ++bufferPosn) { //从缓冲区读取的数据中寻找换行符,对应于上面提到的几种情况
        if (buffer[bufferPosn] == LF) {                        //如果是'\n',确定找到了一行的结束符,看前面的字符是不是'\r',如果是,行结束符长度为2,否则行结束符为1
          newlineLength = (prevCharCR) ?  : ;
          ++bufferPosn; // at next invocation proceed from following byte//读取位置由当前位置bufferPosn向前进一个,此时bufferPosn指示的位置还没有做判断
          break;//找到了一行的结束符,跳出循环
        }
        if (prevCharCR) { //CR + notLF, we are at notLF//如果上一个位置为'\r',当前位置不是'\n',
//那么也得到了一行的结束符,跳出循环,此时bufferPosn指示的位置还没有做判断
          newlineLength = ;
          break;                                                 //只要确定了行结束符的长度,newlineLength(初始值为0)就会保存它,
        }
        prevCharCR = (buffer[bufferPosn] == CR);
      }
      int readLength = bufferPosn - startPosn;
      if (prevCharCR && newlineLength == )//只有在buffer中读取的所有的字符都无法确定行结束符的长度时才会进去,这时说明读到buffer结尾也没有出现'\n','\r\n'
        --readLength; //CR at the end of the buffer//但是有可能最后一个字符是'\r',此时在上面的循环中prevCharCR 便为true,需要读取下一个buffer才知道后面跟的是不是'\n'
      bytesConsumed += readLength;//不管那种情况,上面共读取到多少数据就要把这个数据保存起来,读取的总字符数由bytesConsumed 记录,内容由str在后面追加
      int appendLength = readLength - newlineLength;//将需要追加的内容长度算出来(读取到的长度减去行结束符的长度)
      if (appendLength > maxLineLength - txtLength) {
        appendLength = maxLineLength - txtLength;
      }
      if (appendLength > ) {
        str.append(buffer, startPosn, appendLength);
        txtLength += appendLength;
      }
    } while (newlineLength ==  && bytesConsumed < maxBytesToConsume);


    if (bytesConsumed > (long)Integer.MAX_VALUE)//如果这一行太长,会抛出异常
      throw new IOException("Too many bytes before newline: " + bytesConsumed);    
    return (int)bytesConsumed;//返回读到的字符数
  }
           

继续阅读