HDFS源码分析DataXceiver之整体流程

        在《HDFS源码分析之DataXceiverServer》一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer。它被用于接收来自客户端或其他数据节点的数据读写请求,为每个数据读写请求创建一个单独的线程去处理。而处理每次读写请求时所创建的线程,就是本文要讲的DataXceiver。本文,我们来看下DataXceiver的具体实现,着重讲解下它得到数据读写请求后的整体处理流程。

        首先,我们先看下DataXceiver的成员变量,具体如下:

  // 封装了Socket、输入流、输出流的Peer,是DataXceiver线程工作的主要依托者
  private Peer peer;

  // 通讯两端地址:远端地址remoteAddress、本地端地址localAddress,均是从peer(即socket)中获得的
  private final String remoteAddress; // address of remote side
  private final String localAddress;  // local address of this daemon

  // DataNode节点进程实例datanode
  private final DataNode datanode;

  // DataNode节点配置信息dnConf
  private final DNConf dnConf;

  // DataXceiverServer线程实例dataXceiverServer
  private final DataXceiverServer dataXceiverServer;

  // 连接DataNode是否使用主机名,取参数dfs.datanode.use.datanode.hostname,参数未配置的话默认为false,不使用
  private final boolean connectToDnViaHostname;

  // 接收到一个操作op的开始时间
  private long opStartTime; //the start time of receiving an Op

  // InputStream输入流socketIn
  private final InputStream socketIn;

  // OutputStream输出流socketOut
  private OutputStream socketOut;

  // 数据块接收器BlockReceiver对象blockReceiver
  private BlockReceiver blockReceiver = null;

  /**
   * Client Name used in previous operation. Not available on first request
   * on the socket.
   * previousOpClientName为之前操作的客户端名字,它对于socket上的第一个请求不可用
   */
  private String previousOpClientName;

        既然DataXceiver是为处理数据读写请求而创建的线程,那么Socket、输入流、输出流就是必不可少的成员。而首当其冲的Peer,便封装了Socket、输入流、输出流的Peer,是DataXceiver线程工作的主要依托者,而接下来的输入流socketIn、输出流socketOut都是来自peer的socket。另外,DataXceiver还提供了通讯两端地址:远端地址remoteAddress、本地端地址localAddress,均是从peer(即socket)中获得的。

        既然是由DataNode上的DataXceiverServer线程创建的,那么自然少不了datanode、dataXceiverServer、dnConf等变量,并且,它是专门用来处理数据读写请求的,自然也需要像数据块接收器BlockReceiver对象blockReceiver这种成员变量。dnConf是DNConf类型的数据节点DataNode上的配置信息。

        剩下的几个,便是在处理具体的数据读写请求时用到的connectToDnViaHostname、opStartTime、previousOpClientName等变量。其中,connectToDnViaHostname标识连接DataNode是否使用主机名,取参数dfs.datanode.use.datanode.hostname,参数未配置的话默认为false,不使用,opStartTime为接收到一个操作op的开始时间,最后的previousOpClientName为之前操作的客户端名字,它对于socket上的第一个请求不可用。

        下面我们再看下它的构造方法,只有一个private的,如下:

  /**
   * 私有构造函数,需要Peer、DataNode、DataXceiverServer三个参数
   */
  private DataXceiver(Peer peer, DataNode datanode,
      DataXceiverServer dataXceiverServer) throws IOException {

	// peer、datanode、dataXceiverServer等成员变量直接赋值
    this.peer = peer;
    this.dnConf = datanode.getDnConf();

    // 输入流socketIn、输出流socketOut来自peer的socket
    this.socketIn = peer.getInputStream();
    this.socketOut = peer.getOutputStream();

    this.datanode = datanode;
    this.dataXceiverServer = dataXceiverServer;

    // connectToDnViaHostname取自数据节点配置信息dnConf
    this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;

    // 远端remoteAddress和本地localAddress地址取自Peer
    remoteAddress = peer.getRemoteAddressString();
    localAddress = peer.getLocalAddressString();

    if (LOG.isDebugEnabled()) {
      LOG.debug("Number of active connections is: "
          + datanode.getXceiverCount());
    }
  }

        但是,它提供了一个类的静态create()方法,用于DataXceiver对象的构造,代码如下:

  /**
   * 提供了一个静态方法create(),调用私有构造函数构造DataXceiver对象
   */
  public static DataXceiver create(Peer peer, DataNode dn,
      DataXceiverServer dataXceiverServer) throws IOException {
    return new DataXceiver(peer, dn, dataXceiverServer);
  }

        上述构造方法及静态create()方法都很简单,不再赘述。

        接下来,我们再着重分析下,DataXceiver线程在启动后,是如何处理来自客户端或者其他数据节点发送的数据读写请求的。既然是线程,那么就不得不看看它的run()方法,代码如下:

  /**
   * Read/write data from/to the DataXceiverServer.
   * 从DataXceiverServer中读取或者往DataXceiverServer中写入数据
   */
  @Override
  public void run() {
    int opsProcessed = 0;
    Op op = null;

    try {
      // 在dataXceiverServer中增加peer与该DataXceiver实例所在线程和DataXceiver实例的映射关系
      dataXceiverServer.addPeer(peer, Thread.currentThread(), this);

      // peer中设置socket写入超时时间
      peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);

      InputStream input = socketIn;
      try {

    	// IOStreamPair为一个输入输出流对,既包含输入流,也包含输出流
        IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
          socketIn, datanode.getXferAddress().getPort(),
          datanode.getDatanodeId());

        // 包装saslStreams的输入流in为BufferedInputStream,得到输入流input,其缓冲区大小取参数io.file.buffer.size的一半,
        // 参数未配置的话默认为512,且最大也不能超过512
        input = new BufferedInputStream(saslStreams.in,
          HdfsConstants.SMALL_BUFFER_SIZE);

        // 从saslStreams中获取输出流socketOut
        socketOut = saslStreams.out;
      } catch (InvalidMagicNumberException imne) {
        LOG.info("Failed to read expected encryption handshake from client " +
            "at " + peer.getRemoteAddressString() + ". Perhaps the client " +
            "is running an older version of Hadoop which does not support " +
            "encryption");
        return;
      }

      // 调用父类initialize()方法,完成初始化,实际上就是设置父类的输入流in
      super.initialize(new DataInputStream(input));

      // We process requests in a loop, and stay around for a short timeout.
      // This optimistic behaviour allows the other end to reuse connections.
      // Setting keepalive timeout to 0 disable this behavior.

      // 在一个do...while循环内完成请求的处理。
      do {

    	// 更新当前线程名称,通过线程名标识进度的一种手段,不错
        updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));

        try {

          // 由于第一次是创建一个新的socket使用,连接的时间可能会很长,所以连接超时时间设置的比较大,
          // 而后续使用的话,是复用socket,连接的超时时间限制就没必要设置那么大了
          if (opsProcessed != 0) {

        	// 如果不是第一次出来请求,确保dnConf的socketKeepaliveTimeout大于0,
        	// 将其设置为设置peer(即socket)的读超时时间,
        	// 取参数dfs.datanode.socket.reuse.keepalive,参数为配置的话,默认为4s
            assert dnConf.socketKeepaliveTimeout > 0;
            peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
          } else {

        	// 最开始第一次处理请求时,设置peer(即socket)读超时时间为dnConf的socketTimeout
        	// 即取参数dfs.client.socket-timeout,参数未配置的话默认为60s
            peer.setReadTimeout(dnConf.socketTimeout);
          }

          // 通过readOp()方法读取操作符op
          op = readOp();
        } catch (InterruptedIOException ignored) {
          // Time out while we wait for client rpc
          // 如果是InterruptedIOException异常,跳出循环
          break;
        } catch (IOException err) {
          // Since we optimistically expect the next op, it's quite normal to get EOF here.
          if (opsProcessed > 0 &&
              (err instanceof EOFException || err instanceof ClosedChannelException)) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
            }
          } else {
            throw err;
          }
          break;
        }

        // restore normal timeout
        // 重新存储正常的超时时间,即dnConf的socketTimeout
        if (opsProcessed != 0) {
          peer.setReadTimeout(dnConf.socketTimeout);
        }

        // 设置操作的起始时间opStartTime
        opStartTime = now();

        // 通过processOp()方法根据操作符op调用相应的方法处理操作符op
        processOp(op);

        // 累加操作数
        ++opsProcessed;
      } while ((peer != null) &&
          (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
      // 循环的条件便是:peer未关闭且复用超时时间socketKeepaliveTimeout大于0

    } catch (Throwable t) {
      String s = datanode.getDisplayName() + ":DataXceiver error processing "
          + ((op == null) ? "unknown" : op.name()) + " operation "
          + " src: " + remoteAddress + " dst: " + localAddress;
      if (op == Op.WRITE_BLOCK && t instanceof ReplicaAlreadyExistsException) {
        // For WRITE_BLOCK, it is okay if the replica already exists since
        // client and replication may write the same block to the same datanode
        // at the same time.
        if (LOG.isTraceEnabled()) {
          LOG.trace(s, t);
        } else {
          LOG.info(s + "; " + t);
        }
      } else {
        LOG.error(s, t);
      }
    } finally {

      if (LOG.isDebugEnabled()) {
        LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
            + datanode.getXceiverCount());
      }

      // 更新当前线程名称
      updateCurrentThreadName("Cleaning up");

      // 关闭peer(socket)、输入流等资源
      if (peer != null) {
        dataXceiverServer.closePeer(peer);
        IOUtils.closeStream(in);
      }
    }
  }

        run()方法的处理流程逻辑十分清晰,概括如下:

        1、在dataXceiverServer中增加peer与该DataXceiver实例所在线程和DataXceiver实例的映射关系;

        2、peer中设置socket写入超时时间,取参数dfs.datanode.socket.write.timeout,参数未配置的话默认为8分钟;

        3、获取IOStreamPair类型的saslStreams,其为一个输入输出流对,既包含输入流,也包含输出流;

        4、包装saslStreams的输入流in为BufferedInputStream,得到输入流input,其缓冲区大小取参数io.file.buffer.size的一半,参数未配置的话默认为512,且最大也不能超过512;

        5、从saslStreams中获取输出流socketOut;

        6、调用父类initialize()方法,完成初始化,实际上就是设置父类的输入流in;

        7、在一个do...while循环内完成请求的处理,循环的条件便是--peer未关闭且复用超时时间socketKeepaliveTimeout大于0:

              7.1、更新当前线程名称,通过线程名标识进度的一种手段,不错,线程名此时为Waiting for operation #100(100为操作处理次数累加器的下一个值);

              7.2、处理读超时时间设置:由于第一次是创建一个新的socket使用,连接的时间可能会很长,所以连接超时时间设置的比较大,而后续使用的话,是复用socket,连接的超时时间限制就没必要设置那么大了。所以,最开始第一次处理请求时,设置peer(即socket)读超时时间为dnConf的socketTimeout,即取参数dfs.client.socket-timeout,参数未配置的话默认为60s;如果不是第一次出来请求,确保dnConf的socketKeepaliveTimeout大于0,将其设置为设置peer(即socket)的读超时时间,取参数dfs.datanode.socket.reuse.keepalive,参数为配置的话,默认为4s;

              7.3、通过readOp()方法读取操作符op;

              7.4、重新存储正常的超时时间,即dnConf的socketTimeout;

              7.5、设置操作的起始时间opStartTime,为当前时间;

              7.6、通过processOp()方法根据操作符op调用相应的方法处理操作符op;

              7.7、累加操作数opsProcessed;

        8、更新当前线程名称:Cleaning up;

        9、关闭peer(socket)、输入流等资源。

        实际上,对于读写请求的处理的一个主线,便是在socket未关闭的情况下,不停的读取操作符,然后调用相应的方法处理,也就是do...while循环内的op = readOp()-----processOp(op)这一处理主线。

        下面,我们来看下读取操作符的readOp()方法,它位于DataXceiver的父类Receiver中。代码如下:

  /** Read an Op.  It also checks protocol version. */
  protected final Op readOp() throws IOException {

	// 首先从输入流in中读入版本号version,short类型,占2个字节
	final short version = in.readShort();

	// 校验版本号version是否与DataTransferProtocol中的DATA_TRANSFER_VERSION相等,该版本中为28
    if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
      throw new IOException( "Version Mismatch (Expected: " +
          DataTransferProtocol.DATA_TRANSFER_VERSION  +
          ", Received: " +  version + " )");
    }

    // 调用Op的read()方法,从输入流in中获取操作符op
    return Op.read(in);
  }

        代码中有详细注释,不再解释。继续追踪Op的read()方法,代码如下:

  private static final int FIRST_CODE = values()[0].code;
  /** Return the object represented by the code. */
  private static Op valueOf(byte code) {
    final int i = (code & 0xff) - FIRST_CODE;
    return i < 0 || i >= values().length? null: values()[i];
  }

  /** Read from in */
  public static Op read(DataInput in) throws IOException {
    return valueOf(in.readByte());
  }

        很简单,通过read()方法从输入流读取byte,并通过valueOf()方法,首先将byte转化为int,然后减去Op操作符枚举类型的第一个值:WRITE_BLOCK,即80,得到i。如果i小于0或者大于枚举中操作符的个数,说明输入流中传入的操作符不在枚举范围内,否则利用i作为索引取出相应的操作符。枚举类型如下:

  WRITE_BLOCK((byte)80),
  READ_BLOCK((byte)81),
  READ_METADATA((byte)82),
  REPLACE_BLOCK((byte)83),
  COPY_BLOCK((byte)84),
  BLOCK_CHECKSUM((byte)85),
  TRANSFER_BLOCK((byte)86),
  REQUEST_SHORT_CIRCUIT_FDS((byte)87),
  RELEASE_SHORT_CIRCUIT_FDS((byte)88),
  REQUEST_SHORT_CIRCUIT_SHM((byte)89);

        比较简单,写数据块为80,读数据块为81等,不再一一介绍。操作符为int类型,也就意味着它占4个字节。

        接下来,我们再看下处理操作符的processOp()方法,同样在DataXceiver的父类Receiver中。代码如下:

  /** Process op by the corresponding method. */
  protected final void processOp(Op op) throws IOException {

	// 通过调用相应的方法处理操作符
    switch(op) {
    case READ_BLOCK:// 读数据块调用opReadBlock()方法
      opReadBlock();
      break;
    case WRITE_BLOCK:// 写数据块调用opWriteBlock()方法
      opWriteBlock(in);
      break;
    case REPLACE_BLOCK:// 替换数据块调用opReplaceBlock()方法
      opReplaceBlock(in);
      break;
    case COPY_BLOCK:// 复制数据块调用REPLACE()方法
      opCopyBlock(in);
      break;
    case BLOCK_CHECKSUM:// 数据块检验调用opBlockChecksum()方法
      opBlockChecksum(in);
      break;
    case TRANSFER_BLOCK:// 移动数据块调用opTransferBlock()方法
      opTransferBlock(in);
      break;
    case REQUEST_SHORT_CIRCUIT_FDS:
      opRequestShortCircuitFds(in);
      break;
    case RELEASE_SHORT_CIRCUIT_FDS:
      opReleaseShortCircuitFds(in);
      break;
    case REQUEST_SHORT_CIRCUIT_SHM:
      opRequestShortCircuitShm(in);
      break;
    default:
      throw new IOException("Unknown op " + op + " in data stream");
    }
  }

        一目了然,根据操作符的不同,调用不同的方法去处理。比如读数据块调用opReadBlock()方法,写数据块调用opWriteBlock()方法,替换数据块调用opReplaceBlock()方法等等,读者可自行阅读。
        至此,HDFS源码分析DataXceiver之整体流程全部叙述完毕。后续文章会陆续推出对于写数据块、读数数据块、替换数据块、移动数据块等的详细操作,以及DataXceiver线程中用到的数据块发送器BlockSender、数据块接收器BlockReceiver的详细分析,敬请期待!

        

        

时间: 2025-01-07 10:35:17

HDFS源码分析DataXceiver之整体流程的相关文章

HDFS源码分析DataXceiver之读数据块

         在<HDFS源码分析DataXceiver之整体流程>一文中我们知道,无论来自客户端还是其他数据节点的请求达到DataNode时,DataNode上的后台线程DataXceiverServer均为每个请求创建一个单独的后台工作线程来处理,这个工作线程就是DataXceiver.并且,在线程DataXceiver处理请求的主方法run()方法内,会先读取操作符op,然后根据操作符op分别调用相应的方法进行请求的处理.而决定什么样的操作符op该调用何种方法的逻辑,则是在DataX

HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程

        在<HDFS源码分析心跳汇报之数据结构初始化>一文中,我们了解到HDFS心跳相关的BlockPoolManager.BPOfferService.BPServiceActor三者之间的关系,并且知道最终HDFS的心跳是通过BPServiceActor线程实现的.那么,这个BPServiceActor线程到底是如何工作的呢?本文,我们将继续HDFS心跳分析之BPServiceActor工作线程运行流程.         首先,我们先看下         那么,BPServiceA

HDFS源码分析EditLog之获取编辑日志输入流

        在<HDFS源码分析之EditLogTailer>一文中,我们详细了解了编辑日志跟踪器EditLogTailer的实现,介绍了其内部编辑日志追踪线程EditLogTailerThread的实现,及其线程完成编辑日志跟踪所依赖的最重要的方法,执行日志追踪的doTailEdits()方法.在该方法的处理流程中,首先需要从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据.那么这个编辑日志输入流集合streams是如何获取的呢?本文

HDFS源码分析EditLog之读取操作符

        在<HDFS源码分析EditLog之获取编辑日志输入流>一文中,我们详细了解了如何获取编辑日志输入流EditLogInputStream.在我们得到编辑日志输入流后,是不是就该从输入流中获取数据来处理呢?答案是显而易见的!在<HDFS源码分析之EditLogTailer>一文中,我们在讲编辑日志追踪同步时,也讲到了如下两个连续的处理流程:         4.从编辑日志editLog中获取编辑日志输入流集合streams,获取的输入流为最新事务ID加1之后的数据  

HDFS源码分析数据块校验之DataBlockScanner

        DataBlockScanner是运行在数据节点DataNode上的一个后台线程.它为所有的块池管理块扫描.针对每个块池,一个BlockPoolSliceScanner对象将会被创建,其运行在一个单独的线程中,为该块池扫描.校验数据块.当一个BPOfferService服务变成活跃或死亡状态,该类中的blockPoolScannerMap将会更新.         我们先看下DataBlockScanner的成员变量,如下: // 所属数据节点DataNode实例 private

HDFS源码分析心跳汇报之数据块汇报

        在<HDFS源码分析心跳汇报之数据块增量汇报>一文中,我们详细介绍了数据块增量汇报的内容,了解到它是时间间隔更长的正常数据块汇报周期内一个smaller的数据块汇报,它负责将DataNode上数据块的变化情况及时汇报给NameNode.那么,时间间隔更长的正常数据块汇报都做了些什么呢?本文,我们将开始研究下时间间隔更长的正常数据块汇报.         首先,看下正常数据块汇报是如何发起的?我们先看下BPServiceActor工作线程的offerService()方法: /*

HDFS源码分析心跳汇报之数据块增量汇报

        在<HDFS源码分析心跳汇报之BPServiceActor工作线程运行流程>一文中,我们详细了解了数据节点DataNode周期性发送心跳给名字节点NameNode的BPServiceActor工作线程,了解了它实现心跳的大体流程:         1.与NameNode握手:               1.1.第一阶段:获取命名空间信息并验证.设置:               1.2.第二阶段:DataNode注册:         2.周期性调用sendHeartBeat

HDFS源码分析之UnderReplicatedBlocks(一)

        UnderReplicatedBlocks是HDFS中关于块复制的一个重要数据结构.在HDFS的高性能.高容错性体系中,总有一些原因促使HDFS系统内进行块复制工作,比如基于高性能的负载均衡.基于容错性的数据块副本数恢复等.普遍的,任何工作都会有一个优先级的问题,特别是这里的数据块复制,不可能简单的按照先入先出或者其他简单策略,比方说,基于容错性的数据块副本数恢复,特别是数据块副本仅有一个的数据块副本数恢复,其优先级肯定要比基于高性能的负载均衡高,所以数据块复制要有个优先级的概念

HDFS源码分析心跳汇报之数据结构初始化

        在<HDFS源码分析心跳汇报之整体结构>一文中,我们详细了解了HDFS中关于心跳的整体结构,知道了BlockPoolManager.BPOfferService和BPServiceActor三者之间的关系.那么,HDFS心跳相关的这些数据结构,都是如何被初始化的呢?本文,我们就开始研究HDFS心跳汇报之数据结构初始化.         首先,在DataNode节点启动时所必须执行的startDataNode()方法中,有如下代码: // DataNode启动时执行的startD