HDFS源码分析数据块复制之PendingReplicationBlocks

        PendingReplicationBlocks实现了所有正在复制的数据块的记账工作。它实现以下三个主要功能:

        1、记录此时正在复制的块;

        2、一种对复制请求进行跟踪的粗粒度计时器;

        3、一个定期识别未执行复制请求的线程。

        我们先看下它内部有哪些成员变量,如下:

  // 块和正在进行的块复制信息的映射集合
  private final Map<Block, PendingBlockInfo> pendingReplications;

  // 复制请求超时的块列表
  private final ArrayList<Block> timedOutItems;

  // 后台工作线程
  Daemon timerThread = null;

  // 文件系统是否正在运行的标志位
  private volatile boolean fsRunning = true;

  //
  // It might take anywhere between 5 to 10 minutes before
  // a request is timed out.
  // 在一个请求超时之前可能需要5到10分钟

  // 请求超时阈值,默认为5分钟
  private long timeout = 5 * 60 * 1000;

  // 超时检查固定值:5分钟
  private final static long DEFAULT_RECHECK_INTERVAL = 5 * 60 * 1000;

        首先是pendingReplications,它是块和正在进行的块复制信息的映射集合,所有正在复制的数据块及其对应复制信息都会被加入到这个集合。数据块复制信息PendingBlockInfo是对数据块开始复制时间timeStamp、待复制的目标数据节点列表List<DatanodeDescriptor>实例targets的一个封装,代码如下:

  /**
   * An object that contains information about a block that
   * is being replicated. It records the timestamp when the
   * system started replicating the most recent copy of this
   * block. It also records the list of Datanodes where the
   * replication requests are in progress.
   *
   * 正在被复制的块信息。它记录系统开始复制块最新副本的时间,也记录复制请求正在执行的数据节点列表。
   */
  static class PendingBlockInfo {

	// 时间戳
    private long timeStamp;
    // 待复制的目标数据节点列表
    private final List<DatanodeDescriptor> targets;

    // 构造方法
    PendingBlockInfo(DatanodeDescriptor[] targets) {
      // 时间戳赋值为当前时间
      this.timeStamp = now();
      this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
          : new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));
    }

    long getTimeStamp() {
      return timeStamp;
    }

    // 设置时间戳为当前时间
    void setTimeStamp() {
      timeStamp = now();
    }

    // 增加复制数量,即增加目标数据节点
    void incrementReplicas(DatanodeDescriptor... newTargets) {
      if (newTargets != null) {
        for (DatanodeDescriptor dn : newTargets) {
          targets.add(dn);
        }
      }
    }

    // 减少复制数量,即减少目标数据节点
    void decrementReplicas(DatanodeDescriptor dn) {
      targets.remove(dn);
    }

    // 获取复制数量,即或许待复制的数据节点数目
    int getNumReplicas() {
      return targets.size();
    }
  }

        它的构造方法中,即将时间戳timeStamp赋值为当前时间,并且提供了设置时间戳为当前时间的setTimeStamp()方法。同时提供了增加复制数量、减少复制数量、获取复制数量相关的三个方法,均是对待复制的目标数据节点列表的增加、减少与计数操作,上面注释很清楚,不再详述!

        另外两个比较重要的变量就是复制请求超时的块列表timedOutItems和后台工作线程timerThread。由后台工作线程周期性的检查pendingReplications列表中的待复制数据块,看看其是否超时,如果超时的话,将其加入timedOutItems列表。后台工作线程timerThread的初始化如下:

  // 启动块复制监控线程
  void start() {
    timerThread = new Daemon(new PendingReplicationMonitor());
    timerThread.start();
  }

        它实际上是借助PendingReplicationMonitor来完成的。PendingReplicationMonitor实现了Runnable接口,是一个周期性工作的线程,用于浏览从未完成它们复制请求的数据块,这个从未完成实际上就是在规定时间内还未完成的数据块复制信息。PendingReplicationMonitor的实现如下:

  /*
   * A periodic thread that scans for blocks that never finished
   * their replication request.
   * 一个周期性线程,用于浏览从未完成它们复制请求的数据块
   */
  class PendingReplicationMonitor implements Runnable {
    @Override
    public void run() {

      // 如果标志位fsRunning为true,即文件系统正常运行,则while循环一直进行
      while (fsRunning) {
    	// 检查周期:取timeout,最高为5分钟
        long period = Math.min(DEFAULT_RECHECK_INTERVAL, timeout);
        try {

          // 检查方法
          pendingReplicationCheck();

          // 线程休眠period
          Thread.sleep(period);
        } catch (InterruptedException ie) {
          if(LOG.isDebugEnabled()) {
            LOG.debug("PendingReplicationMonitor thread is interrupted.", ie);
          }
        }
      }
    }

    /**
     * Iterate through all items and detect timed-out items
     * 通过所有项目迭代检测超时项目
     */
    void pendingReplicationCheck() {

      // 使用synchronized关键字对pendingReplications进行同步
      synchronized (pendingReplications) {

    	// 获取集合pendingReplications的迭代器
        Iterator<Map.Entry<Block, PendingBlockInfo>> iter =
                                    pendingReplications.entrySet().iterator();

        // 记录当前时间now
        long now = now();
        if(LOG.isDebugEnabled()) {
          LOG.debug("PendingReplicationMonitor checking Q");
        }

        // 遍历pendingReplications集合中的每个元素
        while (iter.hasNext()) {

          // 取出每个<Block, PendingBlockInfo>条目
          Map.Entry<Block, PendingBlockInfo> entry = iter.next();

          // 取出Block对应的PendingBlockInfo实例pendingBlock
          PendingBlockInfo pendingBlock = entry.getValue();

          // 判断pendingBlock自其生成时的timeStamp以来到现在,是否已超过timeout时间
          if (now > pendingBlock.getTimeStamp() + timeout) {

        	// 超过的话,
        	// 取出timeout实例block
            Block block = entry.getKey();

            // 使用synchronized关键字对timedOutItems进行同步
            synchronized (timedOutItems) {

              // 将block添加入复制请求超时的块列表timedOutItems
              timedOutItems.add(block);
            }
            LOG.warn("PendingReplicationMonitor timed out " + block);

            // 从迭代器中移除该条目
            iter.remove();
          }
        }
      }
    }
  }

        在它的run()方法内,如果标志位fsRunning为true,即文件系统正常运行,则while循环一直进行,然后在while循环内:

        1、先取检查周期period:取timeout,最高为5分钟;

        2、调用pendingReplicationCheck()方法进行检查;

        3、线程休眠period时间,再次进入while循环。

        pendingReplicationCheck的实现逻辑也很简单,如下:

        使用synchronized关键字对pendingReplications进行同步:

        1、获取集合pendingReplications的迭代器iter;

        2、记录当前时间now;

        3、遍历pendingReplications集合中的每个元素:

              3.1、取出每个<Block, PendingBlockInfo>条目;

              3.2、取出Block对应的PendingBlockInfo实例pendingBlock;

              3.3、判断pendingBlock自其生成时的timeStamp以来到现在,是否已超过timeout时间,超过的话:

                       3.3.1、取出timeout实例block;

                       3.3.2、使用synchronized关键字对timedOutItems进行同步,使用synchronized关键字对timedOutItems进行同步;

                       3.3.3、从迭代器中移除该条目。

        PendingReplicationBlocks还提供了获取复制超时块数组的getTimedOutBlocks()方法,代码如下:

  /**
   * Returns a list of blocks that have timed out their
   * replication requests. Returns null if no blocks have
   * timed out.
   * 返回一个其复制请求已超时的数据块列表,如果没有则返回null
   */
  Block[] getTimedOutBlocks() {

	// 使用synchronized关键字对timedOutItems进行同步
    synchronized (timedOutItems) {

      // 如果timedOutItems中没有数据,则直接返回null
      if (timedOutItems.size() <= 0) {
        return null;
      }

      // 将Block列表timedOutItems转换成Block数组
      Block[] blockList = timedOutItems.toArray(
          new Block[timedOutItems.size()]);

      // 清空Block列表timedOutItems
      timedOutItems.clear();

      // 返回Block数组
      return blockList;
    }
  }

        PendingReplicationBlocks另外还提供了增加一个块到正在进行的块复制信息列表中的increment()方法和减少正在复制请求的数量的decrement()方法,代码如下:

  /**
   * Add a block to the list of pending Replications
   * 增加一个块到正在进行的块复制信息列表中
   *
   * @param block The corresponding block
   * @param targets The DataNodes where replicas of the block should be placed
   */
  void increment(Block block, DatanodeDescriptor[] targets) {

	// 使用synchronized关键字对pendingReplications进行同步
	synchronized (pendingReplications) {

	  // 根据Block实例block先从集合pendingReplications中查找
      PendingBlockInfo found = pendingReplications.get(block);
      if (found == null) {
    	// 如果没有找到,直接put进去,利用DatanodeDescriptor[]的实例targets构造PendingBlockInfo对象
        pendingReplications.put(block, new PendingBlockInfo(targets));
      } else {
    	// 如果之前存在,增加复制数量,即增加目标数据节点
        found.incrementReplicas(targets);
        // 设置时间戳为当前时间
        found.setTimeStamp();
      }
    }
  }

  /**
   * One replication request for this block has finished.
   * Decrement the number of pending replication requests
   * for this block.
   * 针对给定数据块的一个复制请求已完成。针对该数据块,减少正在复制请求的数量。
   *
   * @param The DataNode that finishes the replication
   */
  void decrement(Block block, DatanodeDescriptor dn) {

	// 使用synchronized关键字对pendingReplications进行同步
	synchronized (pendingReplications) {

	  // 根据Block实例block先从集合pendingReplications中查找
      PendingBlockInfo found = pendingReplications.get(block);
      if (found != null) {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Removing pending replication for " + block);
        }

        // 减少复制数量,即减少目标数据节点
        found.decrementReplicas(dn);

        // 如果数据块对应的复制数量总数小于等于0,复制工作完成,
        // 直接从pendingReplications集合中移除该数据块及其对应信息
        if (found.getNumReplicas() <= 0) {
          pendingReplications.remove(block);
        }
      }
    }
  }

        以及统计块数量和块复制数量的方法,如下:

  /**
   * The total number of blocks that are undergoing replication
   * 正在被复制的块的总数
   */
  int size() {
    return pendingReplications.size();
  } 

  /**
   * How many copies of this block is pending replication?
   * 块复制的总量
   */
  int getNumReplicas(Block block) {
    synchronized (pendingReplications) {
      PendingBlockInfo found = pendingReplications.get(block);
      if (found != null) {
        return found.getNumReplicas();
      }
    }
    return 0;
  }

        上述方法代码逻辑都很简单,而且注释也很详细,此处不再过多赘述!

时间: 2024-09-25 21:25:10

HDFS源码分析数据块复制之PendingReplicationBlocks的相关文章

HDFS源码分析数据块复制监控线程ReplicationMonitor(一)

        ReplicationMonitor是HDFS中关于数据块复制的监控线程,它的主要作用就是计算DataNode工作,并将复制请求超时的块重新加入到待调度队列.其定义及作为线程核心的run()方法如下: /** * Periodically calls computeReplicationWork(). * 周期性调用computeReplicationWork()方法 */ private class ReplicationMonitor implements Runnable

HDFS源码分析数据块复制选取复制源节点

        数据块的复制当然需要一个源数据节点,从其上拷贝数据块至目标数据节点.那么数据块复制是如何选取复制源节点的呢?本文我们将针对这一问题进行研究.         在BlockManager中,chooseSourceDatanode()方法就是用来选取数据块复制时的源节点的,它负责解析数据块所属数据节点列表,并选择一个,用它作为数据块的复制源.其核心逻辑如下:         我们优先选择正处于退役过程中的数据节点而不是其他节点,因为前者没有写数据传输量因此相对不是很繁忙.我们不使用

HDFS源码分析数据块校验之DataBlockScanner

        DataBlockScanner是运行在数据节点DataNode上的一个后台线程.它为所有的块池管理块扫描.针对每个块池,一个BlockPoolSliceScanner对象将会被创建,其运行在一个单独的线程中,为该块池扫描.校验数据块.当一个BPOfferService服务变成活跃或死亡状态,该类中的blockPoolScannerMap将会更新.         我们先看下DataBlockScanner的成员变量,如下: // 所属数据节点DataNode实例 private

HDFS源码分析数据块汇报之损坏数据块检测checkReplicaCorrupt()

        无论是第一次,还是之后的每次数据块汇报,名字名字节点都会对汇报上来的数据块进行检测,看看其是否为损坏的数据块.那么,损坏数据块是如何被检测的呢?本文,我们将研究下损坏数据块检测的checkReplicaCorrupt()方法.         关于数据块及其副本的状态,请阅读<HDFS源码分析之数据块及副本状态BlockUCState.ReplicaState>一文.         checkReplicaCorrupt()方法专门用于损坏数据块检测,代码如下: /** *

HDFS源码分析数据块之CorruptReplicasMap

        CorruptReplicasMap用于存储文件系统中所有损坏数据块的信息.仅当它的所有副本损坏时一个数据块才被认定为损坏.当汇报数据块的副本时,我们隐藏所有损坏副本.一旦一个数据块被发现完好副本达到预期,它将从CorruptReplicasMap中被移除.         我们先看下CorruptReplicasMap都有哪些成员变量,如下所示: // 存储损坏数据块Block与它对应每个数据节点与损坏原因集合映射关系的集合 private final SortedMap<Bl

HDFS源码分析DataXceiver之读数据块

         在<HDFS源码分析DataXceiver之整体流程>一文中我们知道,无论来自客户端还是其他数据节点的请求达到DataNode时,DataNode上的后台线程DataXceiverServer均为每个请求创建一个单独的后台工作线程来处理,这个工作线程就是DataXceiver.并且,在线程DataXceiver处理请求的主方法run()方法内,会先读取操作符op,然后根据操作符op分别调用相应的方法进行请求的处理.而决定什么样的操作符op该调用何种方法的逻辑,则是在DataX

HDFS源码分析心跳汇报之数据块汇报

        在<HDFS源码分析心跳汇报之数据块增量汇报>一文中,我们详细介绍了数据块增量汇报的内容,了解到它是时间间隔更长的正常数据块汇报周期内一个smaller的数据块汇报,它负责将DataNode上数据块的变化情况及时汇报给NameNode.那么,时间间隔更长的正常数据块汇报都做了些什么呢?本文,我们将开始研究下时间间隔更长的正常数据块汇报.         首先,看下正常数据块汇报是如何发起的?我们先看下BPServiceActor工作线程的offerService()方法: /*

HDFS源码分析心跳汇报之数据块增量汇报

        在<HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程>一文中,我们详细了解了数据节点DataNode周期性发送心跳给名字节点NameNode的BPServiceActor工作线程,了解了它实现心跳的大体流程:         1.与NameNode握手:               1.1.第一阶段:获取命名空间信息并验证.设置:               1.2.第二阶段:DataNode注册:         2.周期性调用sendHeartBeat

HDFS源码分析之UnderReplicatedBlocks(一)

        UnderReplicatedBlocks是HDFS中关于块复制的一个重要数据结构.在HDFS的高性能.高容错性体系中,总有一些原因促使HDFS系统内进行块复制工作,比如基于高性能的负载均衡.基于容错性的数据块副本数恢复等.普遍的,任何工作都会有一个优先级的问题,特别是这里的数据块复制,不可能简单的按照先入先出或者其他简单策略,比方说,基于容错性的数据块副本数恢复,特别是数据块副本仅有一个的数据块副本数恢复,其优先级肯定要比基于高性能的负载均衡高,所以数据块复制要有个优先级的概念