MapReduce中如何处理跨行的Block和InputSplit

1 提出问题

Map最小输入数据单元是InputSplit。比如对于那么对于一个记录行形式的文本大于128M时,HDFS将会分成多块存储(block),同时分片并非到每行行尾。这样就会产生两个问题:
1、Hadoop的一个Block默认是128M,那么对于一个记录行形式的文本,会不会造成一行记录被分到两个Block当中?
2、在把文件从Block中读取出来进行切分时,会不会造成一行记录被分成两个InputSplit,如果被分成两个InputSplit,这样一个InputSplit里面就有一行不完整的数据,那么处理这个InputSplit的Map会不会得出不正确的结果?

对于上面的两个问题,必须明确两个概念:Block和InputSplit:
1、Block是HDFS存储文件的单位(默认是128M)
2、InputSplit是MapReduce对文件进行处理和运算的输入单位,只是一个逻辑概念,每个InputSplit并没有对文件实际的切割,只是记录了要处理的数据的位置(包括文件的path和hosts)和长度(由start和length决定)
因此以行记录形式的文本,可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。

2 分析源码

org.apache.hadoop.mapreduce.lib.input.FileInputFormat源码分析

  /**
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List<InputSplit> splits = new ArrayList<InputSplit>();
    List<FileStatus> files = listStatus(job);
    for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                                     blkLocations[blkIndex].getHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts()));
          }
        } else { // not splitable
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
        }
      } else {
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    LOG.debug("Total # of splits: " + splits.size());
    return splits;
  }

从上面的代码可以看出,对文件进行切分其实很简单:获取文件在HDFS上的路径和Block信息,然后根据splitSize对文件进行切分,splitSize = computeSplitSize(blockSize, minSize, maxSize);maxSize,minSize,blockSize都可以配置,默认splitSize 就等于blockSize的默认值(128m)。
   
FileInputFormat对文件的切分是严格按照偏移量来的,因此一行记录比较长的话,可能被切分到不同的InputSplit。但这并不会对Map造成影响,尽管一行记录可能被拆分到不同的InputSplit,但是与FileInputFormat关联的RecordReader被设计的足够健壮,当一行记录跨InputSplit时,其能够到读取不同的InputSplit,直到把这一行记录读取完成。
我们拿最常见的TextInputFormat源码分析如何处理跨行InputSplit的,TextInputFormat关联的是LineRecordReader,下面先看LineRecordReader的的nextKeyValue方法里读取文件的代码:

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " +
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

1、其读取文件是通过LineReader(in就是一个LineReader实例)的readLine方法完成的。关键的逻辑就在这个readLine方法里,这个方法主要的逻辑归纳起来是3点:
A 总是从buffer里读取数据,如果buffer里的数据读完了,先加载下一批数据到buffer。
B 在buffer中查找"行尾",将开始位置至行尾处的数据拷贝给str(也就是最后的Value)。若为遇到"行尾",继续加载新的数据到buffer进行查找。
C 关键点在于:给到buffer的数据是直接从文件中读取的,完全不会考虑是否超过了split的界限,而是一直读取到当前行结束为止

	  /**
	   * Read a line terminated by one of CR, LF, or CRLF.
	   */
	  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
	  throws IOException {
	    /* We're reading data from in, but the head of the stream may be
	     * already buffered in buffer, so we have several cases:
	     * 1. No newline characters are in the buffer, so we need to copy
	     *    everything and read another buffer from the stream.
	     * 2. An unambiguously terminated line is in buffer, so we just
	     *    copy to str.
	     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
	     *    in CR.  In this case we copy everything up to CR to str, but
	     *    we also need to see what follows CR: if it's LF, then we
	     *    need consume LF as well, so next call to readLine will read
	     *    from after that.
	     * We use a flag prevCharCR to signal if previous character was CR
	     * and, if it happens to be at the end of the buffer, delay
	     * consuming it until we have a chance to look at the char that
	     * follows.
	     */
	    str.clear();
	    int txtLength = 0; //tracks str.getLength(), as an optimization
	    int newlineLength = 0; //length of terminating newline
	    boolean prevCharCR = false; //true of prev char was CR
	    long bytesConsumed = 0;
	    do {
	      int startPosn = bufferPosn; //starting from where we left off the last time
	      //如果buffer中的数据读完了,先加载一批数据到buffer里
	      if (bufferPosn >= bufferLength) {
	        startPosn = bufferPosn = 0;
	        if (prevCharCR) {
	          ++bytesConsumed; //account for CR from previous read
	        }
	        bufferLength = in.read(buffer);
	        if (bufferLength <= 0) {
	          break; // EOF
	        }
	      }
	      //注意:由于不同操作系统对“行结束符“的定义不同:
	      //UNIX: '\n'  (LF)
	      //Mac:  '\r'  (CR)
	      //Windows: '\r\n'  (CR)(LF)
	      //为了准确判断一行的结尾,程序的判定逻辑是:
	      //1.如果当前符号是LF,可以确定一定是到了行尾,但是需要参考一下前一个
	      //字符,因为如果前一个字符是CR,那就是windows文件,“行结束符的长度”
	      //(即变量:newlineLength)应该是2,否则就是UNIX文件,“行结束符的长度”为1。
	      //2.如果当前符号不是LF,看一下前一个符号是不是CR,如果是也可以确定一定上个字符就是行尾了,这是一个mac文件。
	      //3.如果当前符号是CR的话,还需要根据下一个字符是不是LF判断“行结束符的长度”,所以只是标记一下prevCharCR=true,供读取下个字符时参考
	      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
	        if (buffer[bufferPosn] == LF) {//存在'\n'换行字符
	          newlineLength = (prevCharCR) ? 2 : 1;
	          ++bufferPosn; // at next invocation proceed from following byte
	          break;
	        }
	        if (prevCharCR) { //CR + notLF, we are at notLF
	          newlineLength = 1;
	          break;
	        }
	        prevCharCR = (buffer[bufferPosn] == CR);//存在'\r'回车字符
	      }
	      int readLength = bufferPosn - startPosn;
	      if (prevCharCR && newlineLength == 0) {
	        --readLength; //CR at the end of the buffer
	      }
	      bytesConsumed += readLength;
	      int appendLength = readLength - newlineLength;
	      if (appendLength > maxLineLength - txtLength) {
	        appendLength = maxLineLength - txtLength;
	      }
	      if (appendLength > 0) {
	        str.append(buffer, startPosn, appendLength);
	        txtLength += appendLength;
	      }
	      //newlineLength == 0 就意味着始终没有读到行尾,程序会继续通过文件输入流继续从文件里读取数据。
	      //这里有一个非常重要的地方:in的实例创建自构造函数
       //org.apache.hadoop.mapreduce.LineRecordReader.lib.input.LineRecordReader.initialize(InputSplit, TaskAttemptContext)
	      //方法内:FSDataInputStream fileIn = fs.open(split.getPath()); 我们以看到:
	      //对于LineRecordReader:当它对取一行时,一定是读取到完整的行,不会受filesplit的任何影响,
       //因为它读取是filesplit所在的文件,而不是限定在filesplit的界限范围内。
	      //所以不会出现“断行”的问题!
	    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);

	    if (bytesConsumed > (long)Integer.MAX_VALUE) {
	      throw new IOException("Too many bytes before newline: " + bytesConsumed);
	    }
	    return (int)bytesConsumed;
	  }

2、按照readLine的上述行为,在遇到跨split的行时,会将下一个split开始行数据读取出来构成一行完整的数据,那么下一个split怎么判定开头的一行有没有被上一个split的LineRecordReader读取过从而避免漏读或重复读取开头一行呢?这方面LineRecordReader使用了一个简单而巧妙的方法:既然无法断定每一个split开始的一行是独立的一行还是被切断的一行的一部分,那就跳过每个split的开始一行(当然要除第一个split之外),从第二行开始读取,然后在到达split的结尾端时总是再多读一行,这样数据既能接续起来又避开了断行带来的麻烦.以下是相关的源码:

// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {//非第一个InputSplit忽略掉第一行
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;

3、相应地,在LineRecordReader判断是否还有下一行的方法:org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue()中,while使用的判定条件保证了InputSplit读取跨界的问题:当前位置小于或等于split的结尾位置,也就说:当前已处于split的结尾位置上时,while依然会执行一次,这一次读到显然已经是下一个split的开始行了。

  public boolean nextKeyValue() throws IOException {
	    if (key == null) {
	      key = new LongWritable();
	    }
	    key.set(pos);
	    if (value == null) {
	      value = new Text();
	    }
	    int newSize = 0;
	    // We always read one extra line, which lies outside the upper
	    // split limit i.e. (end - 1)
	    while (getFilePosition() <= end) {//保证InputSplit读取边界的问题
	      newSize = in.readLine(value, maxLineLength,
	          Math.max(maxBytesToConsume(pos), maxLineLength));
	      pos += newSize;
	      if (newSize < maxLineLength) {
	        break;
	      }

	      // line too long. try again
	      LOG.info("Skipped line of size " + newSize + " at pos " +
	               (pos - newSize));
	    }
	    if (newSize == 0) {
	      key = null;
	      value = null;
	      return false;
	    } else {
	      return true;
	    }
	  }

至此通过上面的源码分析我们清楚了解到TextInputFormat是如何解决跨行Block和InputSplit的,因此当我们需要实现自己的InputFormat时,也会面临在切分数据时的连续性解析问题。

原贴地址:http://liangjf85-163-com.iteye.com/blog/2122583

时间: 2024-09-23 04:39:38

MapReduce中如何处理跨行的Block和InputSplit的相关文章

Asp.net中如何处理一个站点不同Web应用通用Session的问题

asp.net|session|web|问题|站点 Asp.net中如何处理一个站点不同Web应用通用Session的问题 1.问题描述: 系统S中有M1,M2,M3,M4四个模块,每个模块都是一个web应用.其中一个模块中设置Session后在其他模块中无法读取. 2.问题原因: 一个WEB应用相当于一个站点,应用与应用之间不可能共享Session. 3.解决方法: 1) 将四个web应用包含在同一个解决方案中,如图所示: 实际目录存储结构如下: (注:调整.webinfo文件使解决方案能构正

Windows phone 8.1开发中如何处理摄像头翻转的问题

模拟器就像我们儿时的梦境,在其上运行应用程序时,一切总是那么美好的:而真机测试如同我们这个纷乱无章的现实世界,你会遇到各种小人和畜生,常常会遭受莫名的挫折.面对挫折,有人迎难而上,或不予理采,走自己的路:有的人则打退堂鼓. 面对摄像头翻转的问题,有些人也会选择逃避.我为什么不喜欢现在的某些程序员,就是因为这些人只会逃避和制造问题,遇到问题不是去寻找解决方案,而是坐在那里喊爹骂娘.虽然不可能所有问题都可以解决,但是,有许多问题是可以解决的,而这些人总心浮气躁,不愿意静下心来好好思考. N+6年前我

在xp系统中如何处理隐藏硬盘分区的方法

在xp系统中如何处理隐藏硬盘分区的方法         在xp隐藏硬盘分区的问题上,其实许多的网友们都提到或者是遇到了的.那么xp隐藏硬盘分区应该怎么做呢?接下来来介绍一下xp隐藏硬盘分区的方法,一起去看看吧! 1. 点击开始→运行,输入gpedit.msc,打开组策略. 2. 依次展开本地计算机策略→用户配置→管理模板→windows组件→windows资源管理器,打开右侧窗口的"隐藏我的电脑中的这些指定的驱动器".   3. 选择已启用,然后选择一个组合即可.

js中如何处理数据,使其在页面中换行显示

问题描述 js中如何处理数据,使其在页面中换行显示 查出数据是一段string类型的数字,如(26,110,9745,1964,8357,2654,45698,45698,12564,45894,45698,56455,45698,51236,21598,45875,54687,456987),显示在表格中的一列中,显然是显示不下的,现在判断长度大于6时,在表格的这列中换行显示,这个操作写在js中,怎么实现?(如果有不清楚的地方,请指出,,,) 解决方案 你是想在表格的当前列中换行显示吗? 可以

vbnet-VB.NET中如何处理串口通信接收的数据?

问题描述 VB.NET中如何处理串口通信接收的数据? 现在上位机的代码如下: Private Sub comm_DataReceived(ByVal sender As Object, ByVal e As SerialDataReceivedEventArgs) Dim n As Integer = comm.BytesToRead Dim buf(n) As Byte comm.Read(buf, 0, n) builder.Length = 0 Invoke(Sub() If checkB

mongodb-MongoDB中的mapReduce中reduce文件只作用在一个文档上

问题描述 MongoDB中的mapReduce中reduce文件只作用在一个文档上 小白刚接触MongoDB,看到mapReduce这里不是很清楚,自己编了一段代码,想做个计数,然后发觉map方法出来的value如果只有一个元素的时候,好像不参与到reduce函数的计算中.如: var map=function(){ emit(this.name{count:1})}var reduce=function(keyvalues){ var count=0; for(var i in values)

sed-shell脚本中,如何处理一个文件

问题描述 shell脚本中,如何处理一个文件 从数据库中unload出来一些记录. 放在了hello.txt文件中. 里面的记录如下 123456|20130520|-1 123456|20130521|-1 123456|20130522|-1 123456|20130523|-1 123456|20130524|-1 123456|20130525|-1 之后,解析hello.txt文件,分别取出里面的时间.也就是2013052x..放在下面的ftp的传文件的记录中. ftp -nv 1.1

线程-C#用Task代替Thread查找文本文件,在Task中如何处理文件锁定的问题呢?

问题描述 C#用Task代替Thread查找文本文件,在Task中如何处理文件锁定的问题呢? C#用Task代替Thread查找文本文件,在Task中如何处理文件锁定的问题呢? 解决方案 对文件读取做同步,一次性读取一块到内存,各自在内存中查找. 解决方案二: 锁定了等待处理完毕啊!你的锁定是什么意思

hadoop map-reduce中的文件并发操作_数据库其它

这样的操作在map端或者reduce端均可.下面以一个实际业务场景中的例子来简要说明. 问题简要描述: 假如reduce输入的key是Text(String),value是BytesWritable(byte[]),不同key的种类为100万个,value的大小平均为30k左右,每个key大概对应 100个value,要求对每一个key建立两个文件,一个用来不断添加value中的二进制数据,一个用来记录各个value在文件中的位置索引.(大量的小文件会影响HDFS的性能,所以最好对这些小文件进行