HDFS源码分析心跳汇报之数据结构初始化

        在《HDFS源码分析心跳汇报之整体结构》一文中,我们详细了解了HDFS中关于心跳的整体结构,知道了BlockPoolManager、BPOfferService和BPServiceActor三者之间的关系。那么,HDFS心跳相关的这些数据结构,都是如何被初始化的呢?本文,我们就开始研究HDFS心跳汇报之数据结构初始化。

        首先,在DataNode节点启动时所必须执行的startDataNode()方法中,有如下代码:

    // DataNode启动时执行的startDataNode()方法
    // 构造一个BlockPoolManager实例
    // 调用其refreshNamenodes()方法
    blockPoolManager = new BlockPoolManager(this);
    blockPoolManager.refreshNamenodes(conf);

        它构造了一个BlockPoolManager实例,并调用其refreshNamenodes()方法,完成NameNodes的刷新。我们来看下这个方法:

  void refreshNamenodes(Configuration conf)
      throws IOException {
    LOG.info("Refresh request received for nameservices: " + conf.get
            (DFSConfigKeys.DFS_NAMESERVICES));

    // 从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap
    Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil
            .getNNServiceRpcAddressesForCluster(conf);

    // 需要通过使用synchronized关键字在refreshNamenodesLock上加互斥锁
    synchronized (refreshNamenodesLock) {
      // 调用doRefreshNamenodes()方法执行集合newAddressMap中的刷新
      doRefreshNamenodes(newAddressMap);
    }
  }

        很简单,两大步骤:第一步,从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap,第二步调用doRefreshNamenodes()方法执行集合newAddressMap中NameNodes的刷新。

        首先,我们看下如何从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap,相关代码如下:

  /**
   * Returns list of InetSocketAddresses corresponding to the namenode
   * that manages this cluster. Note this is to be used by datanodes to get
   * the list of namenode addresses to talk to.
   *
   * Returns namenode address specifically configured for datanodes (using
   * service ports), if found. If not, regular RPC address configured for other
   * clients is returned.
   *
   * @param conf configuration
   * @return list of InetSocketAddress
   * @throws IOException on error
   */
  public static Map<String, Map<String, InetSocketAddress>>
    getNNServiceRpcAddressesForCluster(Configuration conf) throws IOException {

	// Use default address as fall back
    String defaultAddress;
    try {
      // 获取默认地址defaultAddress
      defaultAddress = NetUtils.getHostPortString(NameNode.getAddress(conf));
    } catch (IllegalArgumentException e) {
      defaultAddress = null;
    }

    // 获取hdfs的内部命名服务:dfs.internal.nameservices,得到集合parentNameServices
    Collection<String> parentNameServices = conf.getTrimmedStringCollection
            (DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);

    if (parentNameServices.isEmpty()) {// 如果没有配置dfs.internal.nameservices
      // 获取dfs.nameservices,赋值给集合parentNameServices
      parentNameServices = conf.getTrimmedStringCollection
              (DFSConfigKeys.DFS_NAMESERVICES);
    } else {
      // Ensure that the internal service is ineed in the list of all available
      // nameservices.

      // 获取dfs.nameservices
      Set<String> availableNameServices = Sets.newHashSet(conf
              .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES));

      // 验证parentNameServices中的每个nsId在dfs.nameservices中是否都存在
      // 即参数dfs.internal.nameservices包含在参数dfs.nameservices范围内
      for (String nsId : parentNameServices) {
        if (!availableNameServices.contains(nsId)) {
          throw new IOException("Unknown nameservice: " + nsId);
        }
      }
    }

    // 调用getAddressesForNsIds()方法,获取nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合
    // dfs.namenode.servicerpc-address
    // dfs.namenode.rpc-address
    Map<String, Map<String, InetSocketAddress>> addressList =
            getAddressesForNsIds(conf, parentNameServices, defaultAddress,
                    DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY);
    if (addressList.isEmpty()) {
      throw new IOException("Incorrect configuration: namenode address "
              + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or "
              + DFS_NAMENODE_RPC_ADDRESS_KEY
              + " is not configured.");
    }
    return addressList;
  }

        这个方法的处理逻辑如下:

        1、首先,根据NameNode类的静态方法getAddress()从配置信息中获取默认地址defaultAddress;

        2、然后,获取hdfs的内部命名服务:dfs.internal.nameservices,得到集合parentNameServices:

              2.1、如果没有配置dfs.internal.nameservices,获取dfs.nameservices,赋值给集合parentNameServices;

              2.2、如果配置了dfs.internal.nameservices,再获取获取dfs.nameservices,得到availableNameServices,验证parentNameServices中的每个nsId在availableNameServices中是否都存在,即参数dfs.internal.nameservices包含在参数dfs.nameservices范围内;

        3、调用getAddressesForNsIds()方法,利用conf、parentNameServices、defaultAddress等获取nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合addressList,并返回。

        下面,我们再看下getAddressesForNsIds()方法,代码如下:

  /**
   * Returns the configured address for all NameNodes in the cluster.
   * @param conf configuration
   * @param nsIds
   *@param defaultAddress default address to return in case key is not found.
   * @param keys Set of keys to look for in the order of preference   @return a map(nameserviceId to map(namenodeId to InetSocketAddress))
   */
  private static Map<String, Map<String, InetSocketAddress>>
    getAddressesForNsIds(Configuration conf, Collection<String> nsIds,
                         String defaultAddress, String... keys) {
    // Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>]
    // across all of the configured nameservices and namenodes.

	// dfs.namenode.servicerpc-address
	// dfs.namenode.rpc-address
    Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap();

    // 遍历每个nameserviceId,做以下处理:
    for (String nsId : emptyAsSingletonNull(nsIds)) {

      // 通过getAddressesForNameserviceId()方法获取nameNodeId->InetSocketAddress的对应关系,nameNodeId来自参数dfs.ha.namenodes.nsId
      Map<String, InetSocketAddress> isas =
        getAddressesForNameserviceId(conf, nsId, defaultAddress, keys);
      if (!isas.isEmpty()) {

    	// 将nameserviceId->{nameNodeId->InetSocketAddress}的对应关系放入集合ret
        ret.put(nsId, isas);
      }
    }

    // 返回nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合ret
    return ret;
  }

        非常简单,遍历每个nameserviceId,做以下处理:

        1、通过getAddressesForNameserviceId()方法获取nameNodeId->InetSocketAddress的对应关系,nameNodeId来自参数dfs.ha.namenodes.nsId;

        2、将nameserviceId->{nameNodeId->InetSocketAddress}的对应关系放入集合ret;

        3、最后返回nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合ret。

        继续看getAddressesForNameserviceId()方法,如下:

  private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
      Configuration conf, String nsId, String defaultValue,
      String... keys) {
	// keys
	// dfs.namenode.servicerpc-address
	// dfs.namenode.rpc-address

	// 获取dfs.ha.namenodes.nsId
    Collection<String> nnIds = getNameNodeIds(conf, nsId);
    Map<String, InetSocketAddress> ret = Maps.newHashMap();
    for (String nnId : emptyAsSingletonNull(nnIds)) {
      String suffix = concatSuffixes(nsId, nnId);

      // 根据keys获取address
      String address = getConfValue(defaultValue, suffix, conf, keys);
      if (address != null) {

    	// 将address封装成InetSocketAddress,得到isa
        InetSocketAddress isa = NetUtils.createSocketAddr(address);
        if (isa.isUnresolved()) {
          LOG.warn("Namenode for " + nsId +
                   " remains unresolved for ID " + nnId +
                   ".  Check your hdfs-site.xml file to " +
                   "ensure namenodes are configured properly.");
        }

        // 将nnId->InetSocketAddress的对应关系放入到Map中
        ret.put(nnId, isa);
      }
    }
    return ret;
  }

        它通过参数获取dfs.ha.namenodes.nsId获取到NameNodeId的集合nnIds,然后针对每个NameNode,根据keys获取address,这keys传递进来的就是dfs.namenode.servicerpc-address、dfs.namenode.rpc-address,也就是优先取前一个参数,前一个取不到的话,再取第二个参数,然后将address封装成InetSocketAddress,得到isa,将nnId->InetSocketAddress的对应关系放入到Map中,最终返回给上层应用。

        至此,从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap就分析完了。下面,我们再看下初始化的重点:调用doRefreshNamenodes()方法执行集合newAddressMap中的刷新。代码如下:

  private void doRefreshNamenodes(
      Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException {

	// 确保当前线程在refreshNamenodesLock上拥有互斥锁
	assert Thread.holdsLock(refreshNamenodesLock);

	// 定义三个集合,分别为待刷新的toRefresh、待添加的toAdd和待移除的toRemove
    Set<String> toRefresh = Sets.newLinkedHashSet();
    Set<String> toAdd = Sets.newLinkedHashSet();
    Set<String> toRemove;

    // 使用synchronized关键字在当前对象上获得互斥锁
    synchronized (this) {
      // Step 1. For each of the new nameservices, figure out whether
      // it's an update of the set of NNs for an existing NS,
      // or an entirely new nameservice.
      // 第一步,针对所有新的nameservices中的每个nameservice,
      // 确认它是一个已存在nameservice中的被更新了的NN集合,还是完全的一个新的nameservice
      // 判断的依据就是对应nameserviceId是否在bpByNameserviceId结合中存在

      // 循环addrMap,放入待添加或者待刷新集合
      for (String nameserviceId : addrMap.keySet()) {

    	// 如果bpByNameserviceId结合中存在nameserviceId,加入待刷新集合toRefresh,否则加入到待添加集合toAdd
        if (bpByNameserviceId.containsKey(nameserviceId)) {
          toRefresh.add(nameserviceId);
        } else {
          toAdd.add(nameserviceId);
        }
      }

      // Step 2. Any nameservices we currently have but are no longer present
      // need to be removed.
      // 第二步,删除所有我们目前拥有但是现在不再需要的,也就是bpByNameserviceId中存在,而配置信息addrMap中没有的

      // 加入到待删除集合toRemove
      toRemove = Sets.newHashSet(Sets.difference(
          bpByNameserviceId.keySet(), addrMap.keySet()));

      // 验证,待刷新集合toRefresh的大小与待添加集合toAdd的大小必须等于配置信息addrMap中的大小
      assert toRefresh.size() + toAdd.size() ==
        addrMap.size() :
          "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) +
          "  toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) +
          "  toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh);

      // Step 3. Start new nameservices
      // 第三步,启动所有新的nameservices
      if (!toAdd.isEmpty()) {// 待添加集合toAdd不为空

        LOG.info("Starting BPOfferServices for nameservices: " +
            Joiner.on(",").useForNull("<default>").join(toAdd));

        // 针对待添加集合toAdd中的每个nameserviceId,做以下处理:
        for (String nsToAdd : toAdd) {

          // 从addrMap中根据nameserviceId获取对应Socket地址InetSocketAddress,创建集合addrs
          ArrayList<InetSocketAddress> addrs =
            Lists.newArrayList(addrMap.get(nsToAdd).values());

          // 根据addrs创建BPOfferService
          BPOfferService bpos = createBPOS(addrs);

          // 将nameserviceId->BPOfferService的对应关系添加到集合bpByNameserviceId中
          bpByNameserviceId.put(nsToAdd, bpos);

          // 将BPOfferService添加到集合offerServices中
          offerServices.add(bpos);
        }
      }

      // 启动所有BPOfferService,实际上是通过调用它的start()方法启动
      startAll();
    }

    // Step 4. Shut down old nameservices. This happens outside
    // of the synchronized(this) lock since they need to call
    // back to .remove() from another thread
    // 第4步,停止所有旧的nameservices。这个是发生在synchronized代码块外面的,是因为它们需要回调另外一个线程的remove()方法

    if (!toRemove.isEmpty()) {
      LOG.info("Stopping BPOfferServices for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toRemove));

      // 遍历待删除集合toRemove中的每个nameserviceId
      for (String nsToRemove : toRemove) {

    	// 根据nameserviceId从集合bpByNameserviceId中获取BPOfferService
    	BPOfferService bpos = bpByNameserviceId.get(nsToRemove);

    	// 调用BPOfferService的stop()和join()方法停止服务
        bpos.stop();
        bpos.join();
        // they will call remove on their own
        // 它们会调用本身的remove()方法
      }
    }

    // Step 5. Update nameservices whose NN list has changed
    // 第5步,更新NN列表已变化的nameservices
    if (!toRefresh.isEmpty()) {// 待更新集合toRefresh不为空时
      LOG.info("Refreshing list of NNs for nameservices: " +
          Joiner.on(",").useForNull("<default>").join(toRefresh));

      // 遍历待更新集合toRefresh中的每个nameserviceId
      for (String nsToRefresh : toRefresh) {

    	// 根据nameserviceId从集合bpByNameserviceId中取出对应的BPOfferService
        BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);

        // 根据BPOfferService从配置信息addrMap中取出NN的Socket地址InetSocketAddress,形成列表addrs
        ArrayList<InetSocketAddress> addrs =
          Lists.newArrayList(addrMap.get(nsToRefresh).values());

        // 调用BPOfferService的refreshNNList()方法根据addrs刷新NN列表
        bpos.refreshNNList(addrs);
      }
    }
  }

        整个doRefreshNamenodes()方法比较长,但是主体逻辑很清晰,主要分五大步骤,分别如下:

        1、第一步,针对nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap中每个nameserviceid,确认它是一个完全新加的nameservice,还是一个其NameNode列表被更新的nameservice,分别加入待添加toAdd和待刷新toRefresh集合;

        2、第二步,针对newAddressMap中没有,而目前DataNode内存bpByNameserviceId中存在的nameservice,需要删除,添加到待删除toRemove集合;

        3、第三步,处理待添加toAdd集合,启动所有新的nameservices:根据addrs创建BPOfferService,维护BPOfferService相关映射集合,然后启动所有的BPOfferService;

        4、第四步,处理待删除toRemove集合,停止所有旧的nameservices;

        5、第五步,处理待刷新toRefresh集合,更新NN列表已变化的nameservices。

        对,就是这么简单,将需要处理的nameservice分别加入到不同的集合,然后按照添加、删除、更新的顺序针对处理类型相同的nameservice一并处理即可。

        接下来,我们分别研究下每一步的细节:

        1、第一步,针对nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap中每个nameserviceid,确认它是一个完全新加的nameservice,还是一个其NameNode列表被更新的nameservice,分别加入待添加toAdd和待刷新toRefresh集合;

        它的处理思路是,循环addrMap中每个nameserviceid,放入待添加toAdd或者待刷新toRefresh集合;如果bpByNameserviceId结合中存在nameserviceId,加入待刷新集合toRefresh,否则加入到待添加集合toAdd。

        2、第二步,针对newAddressMap中没有,而目前DataNode内存bpByNameserviceId中存在的nameservice,需要删除,添加到待删除toRemove集合;

        它的处理思路是:利用Sets的difference()方法,比较bpByNameserviceId和addrMap两个集合的keySet,找出bpByNameserviceId中存在,但是addrMap中不存在的nameserviceid,生成待删除集合toRemove。

        3、第三步,处理待添加toAdd集合,启动所有新的nameservices:根据addrs创建BPOfferService,维护BPOfferService相关映射集合,然后启动所有的BPOfferService;

        这一步针对待添加集合toAdd中的每个nameserviceId,做以下处理:

              3.1、从addrMap中根据nameserviceId获取对应Socket地址InetSocketAddress,创建集合addrs;

              3.2、根据addrs创建BPOfferService实例bpos;

              3.3、将nameserviceId->BPOfferService的对应关系添加到集合bpByNameserviceId中

              3.4、将BPOfferService添加到集合offerServices中;

        最后,调用startAll()方法启动所有BPOfferService,实际上是通过调用它的start()方法启动。

        其中,创建BPOfferService实例bpos时,BPOfferService的构造方法如下:

  // 构造方法
  BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
    Preconditions.checkArgument(!nnAddrs.isEmpty(),
        "Must pass at least one NN.");
    this.dn = dn;

    // 遍历nnAddrs,为每个namenode添加一个构造的BPServiceActor线城实例,加入到bpServices列表
    for (InetSocketAddress addr : nnAddrs) {
      this.bpServices.add(new BPServiceActor(addr, this));
    }
  }

        它实际上是遍历nnAddrs,为每个namenode添加一个构造的BPServiceActor线城实例,加入到bpServices列表。
        而调用startAll()方法启动所有BPOfferService时,执行的代码如下:

  synchronized void startAll() throws IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(
          new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {

              // 遍历offerServices,启动所有的BPOfferService
              for (BPOfferService bpos : offerServices) {
                bpos.start();
              }
              return null;
            }
          });
    } catch (InterruptedException ex) {
      IOException ioe = new IOException();
      ioe.initCause(ex.getCause());
      throw ioe;
    }
  }

        它会遍历offerServices,启动所有的BPOfferService,而BPOfferService的启动,实际上就是将其所持有的每个NameNode对应的BPServiceActor线程启动,代码如下:

  //This must be called only by blockPoolManager
  void start() {
    for (BPServiceActor actor : bpServices) {
      actor.start();
    }
  }

        4、第四步,处理待删除toRemove集合,停止所有旧的nameservices;

        在这一步中,遍历待删除集合toRemove中的每个nameserviceId:

               4.1、根据nameserviceId从集合bpByNameserviceId中获取BPOfferService;

               4.2、调用BPOfferService的stop()和join()方法停止服务,它们会调用本身的remove()方法;

        而BPOfferService的stop()和join()方法,则是依次调用BPOfferService所包含的所有BPServiceActor线程的stop()和join()方法,代码如下:

  //This must be called only by blockPoolManager.
  void stop() {
    for (BPServiceActor actor : bpServices) {
      actor.stop();
    }
  }

  //This must be called only by blockPoolManager
  void join() {
    for (BPServiceActor actor : bpServices) {
      actor.join();
    }
  }

        5、第五步,处理待刷新toRefresh集合,更新NN列表已变化的nameservices;

        在最后一步中,遍历待更新集合toRefresh中的每个nameserviceId:

               5.1、根据nameserviceId从集合bpByNameserviceId中取出对应的BPOfferService;

               5.2、根据BPOfferService从配置信息addrMap中取出NN的Socket地址InetSocketAddress,形成列表addrs;

               5.3、调用BPOfferService的refreshNNList()方法根据addrs刷新NN列表。

        好了,HDFS心跳相关数据结构的初始化已分析完毕,至此,涉及到每个命名空间服务中每个NameNode相关的BPServiceActor线程均已启动,它是真正干活的苦力,真正的底层劳动人民啊!至于它是怎么运行来完成HDFS心跳的,我们下一节再分析吧!

时间: 2024-10-26 05:43:58

HDFS源码分析心跳汇报之数据结构初始化的相关文章

HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程

        在<HDFS源码分析心跳汇报之数据结构初始化>一文中,我们了解到HDFS心跳相关的BlockPoolManager.BPOfferService.BPServiceActor三者之间的关系,并且知道最终HDFS的心跳是通过BPServiceActor线程实现的.那么,这个BPServiceActor线程到底是如何工作的呢?本文,我们将继续HDFS心跳分析之BPServiceActor工作线程运行流程.         首先,我们先看下         那么,BPServiceA

HDFS源码分析心跳汇报之数据块增量汇报

        在<HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程>一文中,我们详细了解了数据节点DataNode周期性发送心跳给名字节点NameNode的BPServiceActor工作线程,了解了它实现心跳的大体流程:         1.与NameNode握手:               1.1.第一阶段:获取命名空间信息并验证.设置:               1.2.第二阶段:DataNode注册:         2.周期性调用sendHeartBeat

HDFS源码分析心跳汇报之数据块汇报

        在<HDFS源码分析心跳汇报之数据块增量汇报>一文中,我们详细介绍了数据块增量汇报的内容,了解到它是时间间隔更长的正常数据块汇报周期内一个smaller的数据块汇报,它负责将DataNode上数据块的变化情况及时汇报给NameNode.那么,时间间隔更长的正常数据块汇报都做了些什么呢?本文,我们将开始研究下时间间隔更长的正常数据块汇报.         首先,看下正常数据块汇报是如何发起的?我们先看下BPServiceActor工作线程的offerService()方法: /*

HDFS源码分析心跳汇报之整体结构

        我们知道,HDFS全称是Hadoop Distribute FileSystem,即Hadoop分布式文件系统.既然它是一个分布式文件系统,那么肯定存在很多物理节点,而这其中,就会有主从节点之分.在HDFS中,主节点是名字节点NameNode,它负责存储整个HDFS中文件元数据信息,保存了名字节点第一关系和名字节点第二关系.名字节点第一关系是文件与数据块的对应关系,在HDFS正常运行期间,保存在NameNode内存和FSImage文件中,并且在NameNode启动时就由FSIma

HDFS源码分析数据块汇报之损坏数据块检测checkReplicaCorrupt()

        无论是第一次,还是之后的每次数据块汇报,名字名字节点都会对汇报上来的数据块进行检测,看看其是否为损坏的数据块.那么,损坏数据块是如何被检测的呢?本文,我们将研究下损坏数据块检测的checkReplicaCorrupt()方法.         关于数据块及其副本的状态,请阅读<HDFS源码分析之数据块及副本状态BlockUCState.ReplicaState>一文.         checkReplicaCorrupt()方法专门用于损坏数据块检测,代码如下: /** *

HDFS源码分析数据块校验之DataBlockScanner

        DataBlockScanner是运行在数据节点DataNode上的一个后台线程.它为所有的块池管理块扫描.针对每个块池,一个BlockPoolSliceScanner对象将会被创建,其运行在一个单独的线程中,为该块池扫描.校验数据块.当一个BPOfferService服务变成活跃或死亡状态,该类中的blockPoolScannerMap将会更新.         我们先看下DataBlockScanner的成员变量,如下: // 所属数据节点DataNode实例 private

HDFS源码分析之UnderReplicatedBlocks(一)

        UnderReplicatedBlocks是HDFS中关于块复制的一个重要数据结构.在HDFS的高性能.高容错性体系中,总有一些原因促使HDFS系统内进行块复制工作,比如基于高性能的负载均衡.基于容错性的数据块副本数恢复等.普遍的,任何工作都会有一个优先级的问题,特别是这里的数据块复制,不可能简单的按照先入先出或者其他简单策略,比方说,基于容错性的数据块副本数恢复,特别是数据块副本仅有一个的数据块副本数恢复,其优先级肯定要比基于高性能的负载均衡高,所以数据块复制要有个优先级的概念

HDFS源码分析数据块复制监控线程ReplicationMonitor(一)

        ReplicationMonitor是HDFS中关于数据块复制的监控线程,它的主要作用就是计算DataNode工作,并将复制请求超时的块重新加入到待调度队列.其定义及作为线程核心的run()方法如下: /** * Periodically calls computeReplicationWork(). * 周期性调用computeReplicationWork()方法 */ private class ReplicationMonitor implements Runnable

HDFS源码分析之UnderReplicatedBlocks(二)

        UnderReplicatedBlocks还提供了一个数据块迭代器BlockIterator,用于遍历其中的数据块.它是UnderReplicatedBlocks的内部类,有三个成员变量,如下: // 当前迭代级别 private int level; // 标志位:是否为特定复制优先级的迭代器 private boolean isIteratorForLevel = false; // 数据块Block迭代器Iterator列表,存储各级别数据块迭代器 private fina