HBase源码分析之HRegion上compact流程分析(三)

        在《HBase源码分析之HRegion上compact流程分析(二)》一文中,我们没有讲解真正执行合并的CompactionContext的compact()方法。现在我们来分析下它的具体实现。

        首先,CompactionContext表示合并的上下文信息,它只是一个抽象类,其compact()并没有实现,代码如下:

/**
   * Runs the compaction based on current selection. select/forceSelect must have been called.
   * @return The new file paths resulting from compaction.
   */
  public abstract List<Path> compact() throws IOException;

        那么,我们来找下它的实现类。它一共有两种实现类:DefaultCompactionContext和StripeCompaction,今天我们以DefaultCompactionContext为例来讲解。

        首先看下DefaultCompactionContext中compact()方法的实现:

    @Override
    public List<Path> compact() throws IOException {
      return compactor.compact(request);
    }

        这个compactor可以根据参数hbase.hstore.defaultengine.compactor.class配置,但是默认实现为DefaultCompactor。那么,接下来,我们看下它的实现:

  /**
   * Do a minor/major compaction on an explicit set of storefiles from a Store.
   * 在一个Store中明确的storefiles集合中执行一个minor或者major合并
   */
  public List<Path> compact(final CompactionRequest request) throws IOException {

	  // 从请求中获取文件详情fd,fd是FileDetails类型
	  FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());

	  // 构造合并过程追踪器CompactionProgress
	  this.progress = new CompactionProgress(fd.maxKeyCount);

    // Find the smallest read point across all the Scanners.
	  // 找到scanners中的最小的可读点,实际上就是找到最小能够读取数据的点
    long smallestReadPoint = getSmallestReadPoint();

    List<StoreFileScanner> scanners;
    Collection<StoreFile> readersToClose;

    // 根据参数hbase.regionserver.compaction.private.readers确定是否使用私有readers
    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", false)) {

      // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
      // HFileFiles, and their readers
      // 克隆所有的StoreFiles,以便我们将在StoreFiles、HFileFiles以及它们的readers等一个独立的副本上执行合并

      // 根据请求中待合并文件的数目创建一个StoreFile列表:readersToClose
      readersToClose = new ArrayList<StoreFile>(request.getFiles().size());

      // 将待合并文件复制一份加入readersToClose列表
      for (StoreFile f : request.getFiles()) {
        readersToClose.add(new StoreFile(f));
      }

      // 根据readersToClose列表,即待合并文件的副本创建文件浏览器FileScanners
      scanners = createFileScanners(readersToClose, smallestReadPoint);
    } else {
      // 创建空的列表readersToClose
      readersToClose = Collections.emptyList();

      // 根据实际请求中的待合并文件列表创建文件浏览器FileScanners
      scanners = createFileScanners(request.getFiles(), smallestReadPoint);
    }

    StoreFile.Writer writer = null;
    List<Path> newFiles = new ArrayList<Path>();
    boolean cleanSeqId = false;
    IOException e = null;
    try {
      InternalScanner scanner = null;
      try {
        /* Include deletes, unless we are doing a compaction of all files */

    	// 确定scan类型scanType:
    	// 如果compact请求是MAJOR或ALL_FILES合并,则scanType为COMPACT_DROP_DELETES;
    	// 如果compact请求是MINOR合并,则scanType为COMPACT_RETAIN_DELETES。
    	ScanType scanType =
            request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;

    	// 如果有协处理器,调用协处理器的preCreateCoprocScanner()方法
    	scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
        if (scanner == null) {
          // 如果协处理器中未创建scanner,调用createScanner()方法创建一个
          scanner = createScanner(store, scanners, scanType, smallestReadPoint, fd.earliestPutTs);
        }

        // 如果有协处理器,调用协处理器的preCompact()方法
        scanner = postCreateCoprocScanner(request, scanType, scanner);
        if (scanner == null) {
          // NULL scanner returned from coprocessor hooks means skip normal processing.
          return newFiles;
        }
        // Create the writer even if no kv(Empty store file is also ok),
        // because we need record the max seq id for the store file, see HBASE-6059

        // 确定最小读取点smallestReadPoint
        if(fd.minSeqIdToKeep > 0) {
          smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
          cleanSeqId = true;
        }

        // When all MVCC readpoints are 0, don't write them.
        // See HBASE-8166, HBASE-12600, and HBASE-13389.
        // 调用HStore的createWriterInTmp()方法,获取writer
        writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
          fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0);

        // 调用performCompaction()方法,执行合并
        boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId);

        // 如果没有完成合并
        if (!finished) {
          // 关闭writer
          writer.close();

          // 删除writer中的临时文件
          store.getFileSystem().delete(writer.getPath(), false);
          writer = null;

          // 抛出异常
          throw new InterruptedIOException( "Aborting compaction of store " + store +
              " in region " + store.getRegionInfo().getRegionNameAsString() +
              " because it was interrupted.");
         }
       } finally {

    	 // 关闭scanner
         if (scanner != null) {
           scanner.close();
         }
      }
    } catch (IOException ioe) {
      e = ioe;
      // Throw the exception
      throw ioe;
    }
    finally {
      try {
        if (writer != null) {
          if (e != null) {
        	// 无异常的话,关闭writer
            writer.close();
          } else {

        	// 存在异常的话,写入元数据,关闭writer,并将写入地址加入newFiles
            writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
            writer.close();
            newFiles.add(writer.getPath());
          }
        }
      } finally {

    	// 依次关闭readersToClose中StoreFile的Reader
        for (StoreFile f : readersToClose) {
          try {
            f.closeReader(true);
          } catch (IOException ioe) {
            LOG.warn("Exception closing " + f, ioe);
          }
        }
      }
    }

    // 返回newFiles
    return newFiles;
  }

        总结下DefaultCompactor的compact()方法的处理流程,大体有如下几点:

        1、通过父类Compactor的getFileDetails()方法从请求中获取文件详情fd,fd是FileDetails类型,这个FileDetails类型的文件详情中主要包含如下信息:

              (1)合并之后总的keyvalue数目:maxKeyCount;

              (2)如果是major合并,最早的Put时间戳earliestPutTs;

              (3)合并时文件中最大的序列号maxSeqId;

              (4)相关文件中最新的MemStore数据读取点maxMVCCReadpoint;

              (5)最大的tag长度maxTagsLength;

              (6)在major合并期间需要保持的最小序列号minSeqIdToKeep。

        2、构造合并过程追踪器CompactionProgress,用于追踪合并过程;

        3、通过父类Compactor的getSmallestReadPoint()方法找到所有scanners中的最小的可读点,实际上就是找到最小能够读取数据的点smallestReadPoint;

        4、根据参数hbase.regionserver.compaction.private.readers确定是否使用私有readers,默认为false不使用:

              4.1、如果需要使用,即参数配置为true的话,克隆所有的StoreFiles,以便我们将在StoreFiles、HFileFiles以及它们的readers等一个独立的副本上执行合并;

                       4.1.1、根据请求中待合并文件的数目创建一个StoreFile列表:readersToClose;

                       4.1.2、将请求中待合并文件逐一复制加入readersToClose列表;

                       4.1.3、根据readersToClose列表,即待合并文件的副本创建文件浏览器FileScanners;

              4.2、如果不需要使用,即参数配置为false的话,使用请求中实际发送的文件列表;

                       4.2.1、创建空的列表readersToClose;

                       4.2.2、根据实际请求中的待合并文件列表创建文件浏览器FileScanners;

         5、根据compact请求类型确定scan类型scanType:

               如果compact请求是MAJOR或ALL_FILES合并,则scanType为COMPACT_DROP_DELETES;

               如果compact请求是MINOR合并,则scanType为COMPACT_RETAIN_DELETES。

         6、如果有协处理器,调用协处理器的preCreateCoprocScanner()方法,获得scanner,如果协处理器中未创建scanner,调用createScanner()方法创建一个;

         7、如果有协处理器,调用协处理器的preCompact()方法;

         8、根据之前获取的smallestReadPoint和文件详情fd中的minSeqIdToKeep确定最小读取点smallestReadPoint,并置状态位cleanSeqId;

         9、调用HStore的createWriterInTmp()方法,获取writer;

        10、调用父类Compactor的performCompaction()方法,利用scanner、writer、smallestReadPoint、cleanSeqId执行合并:

                实际上就是利用scanner读取旧文件数据,利用writer写入新文件数据。

        11、如果没有完成合并:关闭writer、删除writer中的临时文件并抛出异常;

        12、关闭scanner;

        13、无异常的话,关闭writer;存在异常的话,写入元数据,关闭writer,并将写入地址加入newFiles;

        14、依次关闭readersToClose中StoreFile的Reader;

        15、返回newFiles。

        大体流程就是如此。针对其中的某些细节,我们逐一进行分析。

        首先说下这个文件详情FileDetails,它是通过getFileDetails()方法获取的。文件详情FileDetails类定义如下:

  /** The sole reason this class exists is that java has no ref/out/pointer parameters. */
  protected static class FileDetails {
    /** Maximum key count after compaction (for blooms) */
	// 合并之后总的keyvalue数目
    public long maxKeyCount = 0;
    /** Earliest put timestamp if major compaction */
    // 如果是major合并,最早的Put时间戳earliestPutTs
    public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
    /** The last key in the files we're compacting. */
    // 合并时文件中最大的序列号
    public long maxSeqId = 0;
    /** Latest memstore read point found in any of the involved files */
    // 相关文件中最新的MemStore数据读取点maxMVCCReadpoint
    public long maxMVCCReadpoint = 0;
    /** Max tags length**/
    // 最大的tag长度maxTagsLength
    public int maxTagsLength = 0;
    /** Min SeqId to keep during a major compaction **/
    // 在major合并期间需要保持的最小序列号minSeqIdToKeep
    public long minSeqIdToKeep = 0;
  }

        而它的获取方法如下:

/**
   * Extracts some details about the files to compact that are commonly needed by compactors.
   * 提取文件合并的一些细节
   * @param filesToCompact Files.
   * @param allFiles Whether all files are included for compaction
   * @return The result.
   */
  protected FileDetails getFileDetails(
      Collection<StoreFile> filesToCompact, boolean allFiles) throws IOException {

	// 构造一个FileDetails实例fd
	FileDetails fd = new FileDetails();

	// 计算保持MVCC的最新HFile时间戳:当前时间-24小时 * keepSeqIdPeriod
	// keepSeqIdPeriod为一个参数,即被指定的在major合并期间MVCC值可以保持多少天
    long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() -
      (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);  

    // 遍历需要合并的文件
    for (StoreFile file : filesToCompact) {

      // 如果allFiles为true,即所有文件都需要检测,且文件的修改时间小于上述保持MVCC的最新HFile时间戳
      if(allFiles && (file.getModificationTimeStamp() < oldestHFileTimeStampToKeepMVCC)) {
        // when isAllFiles is true, all files are compacted so we can calculate the smallest
        // MVCC value to keep

    	// 如果文件细节中需要保持的最小序列号小于文件MemStore的时间戳
        if(fd.minSeqIdToKeep < file.getMaxMemstoreTS()) {
          // 将文件MemStore的时间戳赋值给fd的需要保持的最小序列号minSeqIdToKeep
          fd.minSeqIdToKeep = file.getMaxMemstoreTS();
        }
      }

      // 获取文件的最大序列号ID
      long seqNum = file.getMaxSequenceId();

      // 赋值给文件细节fd中的maxSeqId,记录待合并文件的最大序列号ID
      fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);

      // 获取Reader
      StoreFile.Reader r = file.getReader();
      if (r == null) {
        LOG.warn("Null reader for " + file.getPath());
        continue;
      }
      // NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
      // blooms can cause progress to be miscalculated or if the user switches bloom
      // type (e.g. from ROW to ROWCOL)

      // 获取文件中的keyvalue数量,实际上就是列的数量,
      // HBase底层对每个列都是按照keyvalue格式存储的,key包含rowkey+column family+quality+tm等,value即列值
      long keyCount = r.getEntries();

      // 累加keyvalue数目maxKeyCount
      fd.maxKeyCount += keyCount;

      // calculate the latest MVCC readpoint in any of the involved store files
      // 计算所有相关存储文件的最新mvcc读取点maxMVCCReadpoint

      // 先加载文件信息fileInfo
      Map<byte[], byte[]> fileInfo = r.loadFileInfo();

      byte tmp[] = null;
      // Get and set the real MVCCReadpoint for bulk loaded files, which is the
      // SeqId number.

      // 如果是Bulk导入的,maxMVCCReadpoint为fd的maxMVCCReadpoint和文件SequenceID中较大者
      if (r.isBulkLoaded()) {
        fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
      }
      else {
    	// 否则,读取文件信息中最大的memstore时间戳MAX_MEMSTORE_TS_KEY
        tmp = fileInfo.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
        if (tmp != null) {
          // maxMVCCReadpoint就是fd的maxMVCCReadpoint和文件信息中最大的memstore时间戳MAX_MEMSTORE_TS_KEY中较大者
          fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
        }
      }

      // 更新最大标签长度maxTagsLength
      tmp = fileInfo.get(FileInfo.MAX_TAGS_LEN);
      if (tmp != null) {
        fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
      }
      // If required, calculate the earliest put timestamp of all involved storefiles.
      // This is used to remove family delete marker during compaction.
      long earliestPutTs = 0;

      // 获取最早的Put时间戳earliestPutTs
      if (allFiles) {
        tmp = fileInfo.get(StoreFile.EARLIEST_PUT_TS);
        if (tmp == null) {
          // There's a file with no information, must be an old one
          // assume we have very old puts
          fd.earliestPutTs = earliestPutTs = HConstants.OLDEST_TIMESTAMP;
        } else {
          earliestPutTs = Bytes.toLong(tmp);
          fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
        }
      }
      if (LOG.isDebugEnabled()) {
        LOG.debug("Compacting " + file +
          ", keycount=" + keyCount +
          ", bloomtype=" + r.getBloomFilterType().toString() +
          ", size=" + StringUtils.humanReadableInt(r.length()) +
          ", encoding=" + r.getHFileReader().getDataBlockEncoding() +
          ", seqNum=" + seqNum +
          (allFiles ? ", earliestPutTs=" + earliestPutTs: ""));
      }
    }

    // 返回合并细节fd
    return fd;
  }

        接下来再看下找到scanners中的最小的可读点,实际上就是找到最小能够读取数据的点,它是通过父类Compactor的getSmallestReadPoint()方法实现的,代码如下:

protected long getSmallestReadPoint() {
	// 获取的是HStore中的SmallestReadPoint
    return store.getSmallestReadPoint();
  }

        可以看出,父类的该方法实际上还是通过HStore中的getSmallestReadPoint()方法实现的,如下:

@Override
  public long getSmallestReadPoint() {
	// 获取的是Region中的SmallestReadPoint,因为HBase是行级事务,SmallestReadPoint应该也是行级的
    return this.region.getSmallestReadPoint();
  }

        而HStore实际上最终获取的是Region中的SmallestReadPoint,这也从侧面反映了那个我们熟知的问题:因为HBase是行级事务,SmallestReadPoint应该也是行级的。而具体的SmallestReadPoint该如何获取,我们在以后的多版本控制协议MVCC中再细讲。

        接下来,我们再看下如何创建文件浏览器FileScanners,它是通过父类Compactor的createFileScanners()方法来构造的,代码如下:

  /**
   * Creates file scanners for compaction.
   * @param filesToCompact Files.
   * @return Scanners.
   */
  protected List<StoreFileScanner> createFileScanners(
      final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
    return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
      smallestReadPoint);
  }

        它是一个专门为合并创建scanner的方法,这个scanner区别于客户端的scanner,我们继续看StoreFileScanner的getScannersForStoreFiles()方法,如下:

/**
   * Return an array of scanners corresponding to the given set of store files,
   * And set the ScanQueryMatcher for each store file scanner for further
   * optimization
   */
  public static List<StoreFileScanner> getScannersForStoreFiles(
      Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
      boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
    List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
        files.size());

    // 遍历StoreFile文件files
    for (StoreFile file : files) {
      // 获取每个文件的Reader
      StoreFile.Reader r = file.createReader();

      // 根据Reader获取StoreFileScanner类型的scanner,这个scanner专门用于读取StoreFile
      StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
          isCompaction, readPt);
      scanner.setScanQueryMatcher(matcher);

      // 加入scanner列表scanners
      scanners.add(scanner);
    }

    // 返回scanner列表
    return scanners;
  }

        很简单,不再赘述,读者可以自己阅读源码。

        继续,我们再看下如果获取一个内部InternalScanner类型的scanner,它是通过createScanner()来获取的,代码如下:

  /**
   * 创建一个scanner
   *
   * @param store store
   * @param scanners Store file scanners.
   * @param scanType Scan type.
   * @param smallestReadPoint Smallest MVCC read point.
   * @param earliestPutTs Earliest put across all files.
   * @return A compaction scanner.
   */
  protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {

	// 构造一个Scan实例scan
	Scan scan = new Scan();

    // 设置最大版本号,即列簇被设置的最大版本号(是不是从这里就能看出,compact时会做数据清理工作呢,O(∩_∩)O)
	scan.setMaxVersions(store.getFamily().getMaxVersions());

	// 返回一个StoreScanner实例
    return new StoreScanner(store, store.getScanInfo(), scan, scanners,
        scanType, smallestReadPoint, earliestPutTs);
  }

        这里的scanner,实际上是StoreScanner类型的实例,它是针对Store的内部Scanner,而且,这里有一个重点,创建scan时会设置最大版本号,即列簇被设置的最大版本号,那么我们是不是从这里就能看出,compact时会做数据清理工作呢,答案当然是肯定的。所以HBase在数据修改时,并不是简单的删除,而是增加一个版本,而过期数据则会在compact过程中,通过scanner设置最大版本号的方式来过滤掉,这种处理方式是很高效的,它体现了HBase低延迟的特点。

        有了读数据的scanner,我们接着来看下写数据的writer。毕竟数据得有读有写,才能将旧文件合并成新文件,而writer是通过HStore的createWriterInTmp()方法来创建的,如下:

/*
   * @param maxKeyCount
   * @param compression Compression algorithm to use
   * @param isCompaction whether we are creating a new file in a compaction
   * @param includesMVCCReadPoint - whether to include MVCC or not
   * @param includesTag - includesTag or not
   * @return Writer for a new StoreFile in the tmp dir.
   */
  @Override
  public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
      boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
  throws IOException {
    final CacheConfig writerCacheConf;

    // 是否为合并
    if (isCompaction) {// 如果是合并,不在writerCacheConf上缓存数据
      // Don't cache data on write on compactions.
      writerCacheConf = new CacheConfig(cacheConf);
      writerCacheConf.setCacheDataOnWrite(false);
    } else {
      writerCacheConf = cacheConf;
    }

    InetSocketAddress[] favoredNodes = null;

    // 获取有利节点
    if (region.getRegionServerServices() != null) {
      favoredNodes = region.getRegionServerServices().getFavoredNodesForRegion(
          region.getRegionInfo().getEncodedName());
    }

    // 创建HFile上下文HFileContext
    HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag,
      cryptoContext);

    // 创建StoreFile的StoreFile,需要使用上述信息,比如文件系统、文件路径、合并器、最大keyvalue数目、有利节点等
    StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
        this.getFileSystem())// 文件系统
            .withFilePath(fs.createTempName())// 文件路径
            .withComparator(comparator)// 合并器
            .withBloomType(family.getBloomFilterType())
            .withMaxKeyCount(maxKeyCount)// 最大keyvalue数目
            .withFavoredNodes(favoredNodes)// 有利节点
            .withFileContext(hFileContext)// HFile上下文信息
            .build();
    return w;
  }

        这个writer本质上是StoreFile的Writer,它是针对存储文件的写入者,其中包含很多关键信息,比如文件系统、文件路径、合并器、最大keyvalue数目、有利节点、HFile上下文信息等。

        有了scanner,可以读数据了,又有了writer,也可以写数据了,那么我们就可以开始合并了:由旧文件读取数据,往新文件写入数据。我们看下Compactor的performCompaction()方法,代码如下:

  /**
   * Performs the compaction.
   * 执行合并
   *
   * @param scanner Where to read from.
   * @param writer Where to write to.
   * @param smallestReadPoint Smallest read point.
   * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint
   * @return Whether compaction ended; false if it was interrupted for some reason.
   */
  protected boolean performCompaction(InternalScanner scanner,
      CellSink writer, long smallestReadPoint, boolean cleanSeqId) throws IOException {

	// 已写字节数
	long bytesWritten = 0;

	// 处于写过程的字节数
    long bytesWrittenProgress = 0;
    // Since scanner.next() can return 'false' but still be delivering data,
    // we have to use a do/while loop.

    // Cell列表
    List<Cell> cells = new ArrayList<Cell>();

    // 周期性检测的阈值:合并已被处理的数据量大小,取参数hbase.hstore.close.check.interval,默认为10M
    long closeCheckInterval = HStore.getCloseCheckInterval();

    long lastMillis = 0;
    if (LOG.isDebugEnabled()) {
      lastMillis = EnvironmentEdgeManager.currentTime();
    }
    long now = 0;

    // 进入一个do...while循环,一直循环的条件是hasMore为true,即scanner中还有数据
    boolean hasMore;
    do {
      // scanner中是否还存在数据,取出到cells中
      hasMore = scanner.next(cells, compactionKVMax);
      if (LOG.isDebugEnabled()) {
        now = EnvironmentEdgeManager.currentTime();
      }
      // output to writer:
      // 遍历cells,写入writer
      for (Cell c : cells) {
        if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
          CellUtil.setSequenceId(c, 0);
        }

        // 写入writer
        writer.append(c);
        // keyvalue大小
        int len = KeyValueUtil.length(c);

        // 计数器累加:kv累计数目和累计大小
        ++progress.currentCompactedKVs;
        progress.totalCompactedSize += len;

        if (LOG.isDebugEnabled()) {
          bytesWrittenProgress += len;
        }

        // check periodically to see if a system stop is requested
        // 周期性检测是否一个系统停止被请求
        if (closeCheckInterval > 0) {

          // 累加已写字节数bytesWritten
          bytesWritten += len;

          // 如果已写字节数bytesWritten大于closeCheckInterval
          if (bytesWritten > closeCheckInterval) {

        	// 重置已写字节数bytesWritten
        	bytesWritten = 0;

        	// 判断HStore是否可写,不可写的话,说明一个system stop请求已发起,则通过progress取消合并
            if (!store.areWritesEnabled()) {
              progress.cancel();
              return false;
            }
          }
        }
      }
      // Log the progress of long running compactions every minute if
      // logging at DEBUG level
      if (LOG.isDebugEnabled()) {
        if ((now - lastMillis) >= 60 * 1000) {
          LOG.debug("Compaction progress: " + progress + String.format(", rate=%.2f kB/sec",
            (bytesWrittenProgress / 1024.0) / ((now - lastMillis) / 1000.0)));
          lastMillis = now;
          bytesWrittenProgress = 0;
        }
      }

      // 情况cell列表
      cells.clear();
    } while (hasMore);

    // 合并过程progress标记已完成
    progress.complete();
    return true;
  }

        这个合并执行的过程还是比较简单的,它通过一个do...while循环,不断的从scanner中读取数据,放入cell列表,然后遍历cells,将Cell依次写入writer,并累加kv数目和大小,直到scanner中数据被处理完。如此,旧文件数据不断的被读取出来,然后将其不断的写入新文件,最好通过合并过程progress标记合并已完成。大致就是这个流程。

        这里有个需要特别说明的地方,在数据合并过程中,还需要周期性的检测是否有外部发起系统关系的请求,如果是的话,则需要取消合并。这个周期性不是针对时间的,而是针对一个已合并数据量的阈值closeCheckInterval,这个closeCheckInterval取自参数hbase.hstore.close.check.interval,默认为10M。在合并过程中,被合并数据大小bytesWritten不断的被累加,直到超过阈值closeCheckInterval,清空,并且根据HStore的可写状态来判断是否有外部发起系统停止的请求,如果有的话,通过progress取消合并,否则继续进入下一个累加至阈值再进行判断的周期。

        接下来,根据上述合并的结果finished,来判断后续处理步骤:如果没有完成合并:关闭writer、删除writer中的临时文件并抛出异常。

        最好,如果存在异常e,写入元数据,关闭writer,并将写入地址加入newFiles;如果不存在异常e,则关闭writer,返回合并后的文件列表newFiles。不管结果如何,最终依次关闭readersToClose中StoreFile的Reader。

        至此,整个HRegion中精确到HStore上的compact流程就分析完毕了。限于篇幅的原因,可能部分细节简单掠过或者没有提及,留待以后再慢慢分析吧!

        

时间: 2025-01-24 06:59:51

HBase源码分析之HRegion上compact流程分析(三)的相关文章

HBase源码分析之HRegionServer上compact流程分析

        前面三篇文章中,我们详细叙述了compact流程是如何在HRegion上进行的,了解了它的很多细节方面的问题.但是,这个compact在HRegionServer上是如何进行的?合并时文件是如何选择的呢?在这篇文章中,你将找到答案!         首先,在HRegionServer内部,我们发现,它定义了一个CompactSplitThread类型的成员变量compactSplitThread,单看字面意思,这就是一个合并分裂线程,那么它会不会就是HRegionServer上具

HBase源码分析之HRegion上MemStore的flsuh流程(二)

        继上篇<HBase源码分析之HRegion上MemStore的flsuh流程(一)>之后,我们继续分析下HRegion上MemStore flush的核心方法internalFlushcache(),它的主要流程如图所示:         其中,internalFlushcache()方法的代码如下: /** * Flush the memstore. Flushing the memstore is a little tricky. We have a lot of upda

HBase源码分析之compact请求发起时机、判断条件等详情(一)

        一般说来,任何一个比较复杂的分布式系统,针对能够使得其性能得到大幅提升的某一内部处理流程,必然有一个定期检查机制,使得该流程在满足一定条件的情况下,能够自发的进行,这样才能够很好的体现出复杂系统的自我适应与自我调节能力.我们知道,HBase内部的compact处理流程是为了解决MemStore Flush之后,文件数目太多,导致读数据性能大大下降的一种自我调节手段,它会将文件按照某种策略进行合并,大大提升HBase的数据读性能.那么,基于我刚才的陈述,compact流程是否有一个

HBase源码分析之HRegionServer上MemStore的flush处理流程(二)

        继上篇文章<HBase源码分析之HRegionServer上MemStore的flush处理流程(一)>遗留的问题之后,本文我们接着研究HRegionServer上MemStore的flush处理流程,重点讲述下如何选择一个HRegion进行flush以缓解MemStore压力,还有HRegion的flush是如何发起的.         我们先来看下第一个问题:如何选择一个HRegion进行flush以缓解MemStore压力.上文中我们讲到过flush处理线程如果从flus

HBase源码分析之HRegionServer上MemStore的flush处理流程(一)

        在<HBase源码分析之HRegion上MemStore的flsuh流程(一)>.<HBase源码分析之HRegion上MemStore的flsuh流程(二)>等文中,我们介绍了HRegion上Memstore flush的主体流程和主要细节.但是,HRegion只是HBase表中按照行的方向对一片连续的数据区域的抽象,它并不能对外提供单独的服务,供客户端或者HBase其它实体调用.而HRegion上MemStore的flush还是要通过HRegionServer来

HBase源码分析之MemStore的flush发起时机、判断条件等详情(二)

        在<HBase源码分析之MemStore的flush发起时机.判断条件等详情>一文中,我们详细介绍了MemStore flush的发起时机.判断条件等详情,主要是两类操作,一是会引起MemStore数据大小变化的Put.Delete.Append.Increment等操作,二是会引起HRegion变化的诸如Regin的分裂.合并以及做快照时的复制拷贝等,同样会触发MemStore的flush流程.同时,在<HBase源码分析之compact请求发起时机.判断条件等详情(一

HBase源码分析之MemStore的flush发起时机、判断条件等详情

        前面的几篇文章,我们详细介绍了HBase中HRegion上MemStore的flsuh流程,以及HRegionServer上MemStore的flush处理流程.那么,flush到底是在什么情况下触发的呢?本文我们将详细探究下HBase中MemStore的flush流程的发起时机,看看到底都有哪些操作,或者哪些后台服务进程会触发MemStore的flush.         首先,在<HBase源码分析之HRegionServer上MemStore的flush处理流程(一)>和

写在HBase源码分析前面的话

       本版块全部为HBase1.0.2相关源码分析文章,系个人研究源码原创写成,除对部分引用标示外,其余均为原创,或翻译源码注释.        该系列文章为与网友交流HBase学习,不做任何其他商业用途~~O(∩_∩)O哈哈~         由于水平有限,文章基本上是边读源码,边翻译注释,边分析源码写成,没有较强的前后逻辑性,我会在写完全部文章后再回头整理.         由于水平有限,文章中可能存在理解错误的地方,欢迎各位针对文章中的错误.问题或者其他指导建议,踊跃评论,多多指点

hbase源码系列(十二)Get、Scan在服务端是如何处理?

继上一篇讲了Put和Delete之后,这一篇我们讲Get和Scan, 因为我发现这两个操作几乎是一样的过程,就像之前的Put和Delete一样,上一篇我本来只打算写Put的,结果发现Delete也可以走这个过程,所以就一起写了. Get 我们打开HRegionServer找到get方法.Get的方法处理分两种,设置了ClosestRowBefore和没有设置的,一般来讲,我们都是知道了明确的rowkey,不太会设置这个参数,它默认是false的. if (get.hasClosestRowBef