HDFS源码分析心跳汇报之数据块汇报

        在《HDFS源码分析心跳汇报之数据块增量汇报》一文中,我们详细介绍了数据块增量汇报的内容,了解到它是时间间隔更长的正常数据块汇报周期内一个smaller的数据块汇报,它负责将DataNode上数据块的变化情况及时汇报给NameNode。那么,时间间隔更长的正常数据块汇报都做了些什么呢?本文,我们将开始研究下时间间隔更长的正常数据块汇报。

        首先,看下正常数据块汇报是如何发起的?我们先看下BPServiceActor工作线程的offerService()方法:

  /**
   * Main loop for each BP thread. Run until shutdown,
   * forever calling remote NameNode functions.
   */
  private void offerService() throws Exception {

    //
    // Now loop for a long time....
    //
    while (shouldRun()) {// 又是一个利用shouldRun()判断的while循环
      try {
        // 省略部分代码
        ...

        // 调用blockReport()方法,进行数据块汇报,放返回来自名字节点NameNode的相关命令cmds
        List<DatanodeCommand> cmds = blockReport();

        // 调用processCommand()方法处理来自名字节点NameNode的相关命令cmds
        processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));

        // 省略部分代码

        //
        // There is no work to do;  sleep until hearbeat timer elapses,
        // or work arrives, and then iterate again.
        // 计算等待时间waitTime:心跳时间间隔减去上次心跳后截至到现在已过去的时间
        long waitTime = dnConf.heartBeatInterval -
        (Time.now() - lastHeartbeat);

        synchronized(pendingIncrementalBRperStorage) {
          if (waitTime > 0 && !sendImmediateIBR) {// 如果等待时间大于0,且不是立即发送数据块增量汇报
            try {

              // 利用pendingIncrementalBRperStorage进行等待,并加synchronized关键字进行同步
              pendingIncrementalBRperStorage.wait(waitTime);
            } catch (InterruptedException ie) {
              LOG.warn("BPOfferService for " + this + " interrupted");
            }
          }
        } // synchronized

      } catch(RemoteException re) {
<pre name="code" class="java">       // 省略部分代码

} catch (IOException e) {

        // 省略部分代码

} } // while (shouldRun())


         可以看出,在BPServiceActor工作线程offerService()方法的while循环内,数据块汇报blockReport()方法执行时,仅有下面的waitTime的等待时间,其他情况下都是立即执行的。那么等待时间waitTime是如何计算的呢?它就是心跳时间间隔减去上次心跳后截至到现在已过去的时间,并且,如果等待时间waitTime大于0,且不是立即发送数据块增量汇报(标志位sendImmediateIBR为false),那么才会利用pendingIncrementalBRperStorage进行等待,并加synchronized关键字进行同步。在这里,我们就可以大胆猜测,数据块汇报的时间间隔应该是大于心跳时间间隔的,并且两者之间的距离肯定不小。

        那么,我们开始研究实现正常数据块汇报的blockReport()方法吧,代码如下:

  /**
   * Report the list blocks to the Namenode
   * @return DatanodeCommands returned by the NN. May be null.
   * @throws IOException
   */
  List<DatanodeCommand> blockReport() throws IOException {
    // send block report if timer has expired.
	// 到期就发送数据块汇报

	// 取当前开始时间startTime
    final long startTime = now();

    // 如果当前时间startTime减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回null,
    // 数据节点配置的数据块汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时
    if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
      return null;
    }

    // 构造数据节点命令ArrayList列表cmds,存储数据块汇报返回的命令DatanodeCommand
    ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();

    // Flush any block information that precedes the block report. Otherwise
    // we have a chance that we will miss the delHint information
    // or we will report an RBW replica after the BlockReport already reports
    // a FINALIZED one.

    // 调用reportReceivedDeletedBlocks()方法发送数据块增量汇报
    reportReceivedDeletedBlocks();

    // 记录上次数据块增量汇报时间lastDeletedReport
    lastDeletedReport = startTime;

    // 设置数据块汇报起始时间brCreateStartTime为当前时间
    long brCreateStartTime = now();

    // 从数据节点DataNode根据线程对应块池ID获取数据块汇报集合perVolumeBlockLists,
    // key为数据节点存储DatanodeStorage,value为数据节点存储所包含的Long类数据块数组BlockListAsLongs
    Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
        dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());

    // Convert the reports to the format expected by the NN.
    int i = 0;
    int totalBlockCount = 0;

    // 创建数据块汇报数组StorageBlockReport,大小为上述perVolumeBlockLists的大小
    StorageBlockReport reports[] =
        new StorageBlockReport[perVolumeBlockLists.size()];

    // 遍历perVolumeBlockLists
    for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {

      // 取出value:BlockListAsLongs
      BlockListAsLongs blockList = kvPair.getValue();

      // 将BlockListAsLongs封装成StorageBlockReport加入数据块汇报数组reports,
      // StorageBlockReport包含数据节点存储DatanodeStorage和其上数据块数组
      reports[i++] = new StorageBlockReport(
          kvPair.getKey(), blockList.getBlockListAsLongs());

      // 累加数据块数目totalBlockCount
      totalBlockCount += blockList.getNumberOfBlocks();
    }

    // Send the reports to the NN.
    int numReportsSent;
    long brSendStartTime = now();

    // 根据数据块总数目判断是否需要多次发送消息
    if (totalBlockCount < dnConf.blockReportSplitThreshold) {// 如果数据块总数目在split阈值之下,则将所有的数据块汇报信息放在一个消息中发送
      // split阈值取参数dfs.blockreport.split.threshold,参数未配置的话默认为1000*1000
      // Below split threshold, send all reports in a single message.

      // 发送的数据块汇报消息数numReportsSent设置为1
      numReportsSent = 1;

      // 通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息
      DatanodeCommand cmd =
          bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(), reports);

      // 将数据块汇报后返回的命令cmd加入到命令列表cmds
      if (cmd != null) {
        cmds.add(cmd);
      }
    } else {
      // Send one block report per message.

      // 发送的数据块汇报消息数numReportsSent设置为1
      numReportsSent = i;

      // 遍历reports,取出每个StorageBlockReport
      for (StorageBlockReport report : reports) {
        StorageBlockReport singleReport[] = { report };

        // 通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息
        DatanodeCommand cmd = bpNamenode.blockReport(
            bpRegistration, bpos.getBlockPoolId(), singleReport);

        // 将数据块汇报后返回的命令cmd加入到命令列表cmds
        if (cmd != null) {
          cmds.add(cmd);
        }
      }
    }

    // Log the block report processing stats from Datanode perspective

    // 计算数据块汇报耗时并记录在日志Log、数据节点Metrics指标体系中
    long brSendCost = now() - brSendStartTime;
    long brCreateCost = brSendStartTime - brCreateStartTime;
    dn.getMetrics().addBlockReport(brSendCost);
    LOG.info("Sent " + numReportsSent + " blockreports " + totalBlockCount +
        " blocks total. Took " + brCreateCost +
        " msec to generate and " + brSendCost +
        " msecs for RPC and NN processing. " +
        " Got back commands " +
            (cmds.size() == 0 ? "none" : Joiner.on("; ").join(cmds)));

    // 调用scheduleNextBlockReport()方法,调度下一次数据块汇报
    scheduleNextBlockReport(startTime);

    // 返回命令cmds
    return cmds.size() == 0 ? null : cmds;
  }

        数据块汇报的blockReport()方法处理流程大体如下:

        1、取当前开始时间startTime;

        2、如果当前时间startTime减去上次数据块汇报时间小于数据节点配置的数据块汇报时间间隔的话,直接返回null:

              数据节点配置的数据块汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时;

        3、构造数据节点命令ArrayList列表cmds,存储数据块汇报返回的命令DatanodeCommand;

        4、调用reportReceivedDeletedBlocks()方法发送数据块增量汇报;

        5、记录上次数据块增量汇报时间lastDeletedReport;

        6、设置数据块汇报起始时间brCreateStartTime为当前时间;

        7、从数据节点DataNode根据线程对应块池ID获取数据块汇报集合perVolumeBlockLists:

              key为数据节点存储DatanodeStorage,value为数据节点存储所包含的Long类数据块数组BlockListAsLongs;

        8、创建数据块汇报数组StorageBlockReport,大小为上述perVolumeBlockLists的大小;

        9、取出value:BlockListAsLongs:

              9.1、取出value:BlockListAsLongs;

              9.2、将BlockListAsLongs封装成StorageBlockReport加入数据块汇报数组reports,StorageBlockReport包含数据节点存储DatanodeStorage和其上数据块数组;

              9.3、累加数据块数目totalBlockCount;

        10、根据数据块总数目判断是否需要多次发送消息:

                10.1、如果数据块总数目在split阈值之下,则将所有的数据块汇报信息放在一个消息中发送(split阈值取参数dfs.blockreport.split.threshold,参数未配置的话默认为1000*1000):

                           10.1.1、发送的数据块汇报消息数numReportsSent设置为1;

                           10.1.2、通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息;

                           10.1.3、将数据块汇报后返回的命令cmd加入到命令列表cmds;

                 10.2、如果数据块总数目在split阈值之上,将数据块汇报按照DatanodeStorage分多个消息来发送:

                            10.2.1、发送的数据块汇报消息数numReportsSent设置为i,即DatanodeStorage数目;

                            10.2.2、遍历reports,取出每个StorageBlockReport:

                                           10.2.2.1、通过NameNode代理bpNamenode的blockReport()方法向NameNode发送数据块汇报信息;

                                           10.2.2.2、将数据块汇报后返回的命令cmd加入到命令列表cmds;

        11、计算数据块汇报耗时并记录在日志Log、数据节点Metrics指标体系中;

        12、调用scheduleNextBlockReport()方法,调度下一次数据块汇报;

        13、返回命令cmds。

时间: 2024-09-20 08:43:23

HDFS源码分析心跳汇报之数据块汇报的相关文章

HDFS源码分析DataXceiver之读数据块

         在<HDFS源码分析DataXceiver之整体流程>一文中我们知道,无论来自客户端还是其他数据节点的请求达到DataNode时,DataNode上的后台线程DataXceiverServer均为每个请求创建一个单独的后台工作线程来处理,这个工作线程就是DataXceiver.并且,在线程DataXceiver处理请求的主方法run()方法内,会先读取操作符op,然后根据操作符op分别调用相应的方法进行请求的处理.而决定什么样的操作符op该调用何种方法的逻辑,则是在DataX

HDFS源码分析心跳汇报之数据块增量汇报

        在<HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程>一文中,我们详细了解了数据节点DataNode周期性发送心跳给名字节点NameNode的BPServiceActor工作线程,了解了它实现心跳的大体流程:         1.与NameNode握手:               1.1.第一阶段:获取命名空间信息并验证.设置:               1.2.第二阶段:DataNode注册:         2.周期性调用sendHeartBeat

HDFS源码分析心跳汇报之数据结构初始化

        在<HDFS源码分析心跳汇报之整体结构>一文中,我们详细了解了HDFS中关于心跳的整体结构,知道了BlockPoolManager.BPOfferService和BPServiceActor三者之间的关系.那么,HDFS心跳相关的这些数据结构,都是如何被初始化的呢?本文,我们就开始研究HDFS心跳汇报之数据结构初始化.         首先,在DataNode节点启动时所必须执行的startDataNode()方法中,有如下代码: // DataNode启动时执行的startD

HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程

        在<HDFS源码分析心跳汇报之数据结构初始化>一文中,我们了解到HDFS心跳相关的BlockPoolManager.BPOfferService.BPServiceActor三者之间的关系,并且知道最终HDFS的心跳是通过BPServiceActor线程实现的.那么,这个BPServiceActor线程到底是如何工作的呢?本文,我们将继续HDFS心跳分析之BPServiceActor工作线程运行流程.         首先,我们先看下         那么,BPServiceA

HDFS源码分析心跳汇报之整体结构

        我们知道,HDFS全称是Hadoop Distribute FileSystem,即Hadoop分布式文件系统.既然它是一个分布式文件系统,那么肯定存在很多物理节点,而这其中,就会有主从节点之分.在HDFS中,主节点是名字节点NameNode,它负责存储整个HDFS中文件元数据信息,保存了名字节点第一关系和名字节点第二关系.名字节点第一关系是文件与数据块的对应关系,在HDFS正常运行期间,保存在NameNode内存和FSImage文件中,并且在NameNode启动时就由FSIma

HDFS源码分析数据块汇报之损坏数据块检测checkReplicaCorrupt()

        无论是第一次,还是之后的每次数据块汇报,名字名字节点都会对汇报上来的数据块进行检测,看看其是否为损坏的数据块.那么,损坏数据块是如何被检测的呢?本文,我们将研究下损坏数据块检测的checkReplicaCorrupt()方法.         关于数据块及其副本的状态,请阅读<HDFS源码分析之数据块及副本状态BlockUCState.ReplicaState>一文.         checkReplicaCorrupt()方法专门用于损坏数据块检测,代码如下: /** *

HDFS源码分析数据块校验之DataBlockScanner

        DataBlockScanner是运行在数据节点DataNode上的一个后台线程.它为所有的块池管理块扫描.针对每个块池,一个BlockPoolSliceScanner对象将会被创建,其运行在一个单独的线程中,为该块池扫描.校验数据块.当一个BPOfferService服务变成活跃或死亡状态,该类中的blockPoolScannerMap将会更新.         我们先看下DataBlockScanner的成员变量,如下: // 所属数据节点DataNode实例 private

HDFS源码分析数据块复制监控线程ReplicationMonitor(一)

        ReplicationMonitor是HDFS中关于数据块复制的监控线程,它的主要作用就是计算DataNode工作,并将复制请求超时的块重新加入到待调度队列.其定义及作为线程核心的run()方法如下: /** * Periodically calls computeReplicationWork(). * 周期性调用computeReplicationWork()方法 */ private class ReplicationMonitor implements Runnable

HDFS源码分析之UnderReplicatedBlocks(一)

        UnderReplicatedBlocks是HDFS中关于块复制的一个重要数据结构.在HDFS的高性能.高容错性体系中,总有一些原因促使HDFS系统内进行块复制工作,比如基于高性能的负载均衡.基于容错性的数据块副本数恢复等.普遍的,任何工作都会有一个优先级的问题,特别是这里的数据块复制,不可能简单的按照先入先出或者其他简单策略,比方说,基于容错性的数据块副本数恢复,特别是数据块副本仅有一个的数据块副本数恢复,其优先级肯定要比基于高性能的负载均衡高,所以数据块复制要有个优先级的概念