Hadoop-2.6.0 DFSClient Hedged Read实现分析

一、简介

      DFSClient Hedged Read是Hadoop-2.4.0引入的一个新特性,如果读取一个数据块的操作比较慢,DFSClient Hedged Read将会开启一个从另一个副本的hedged读操作。我们会选取首先完成的操作,并取消其它操作。这个Hedged读特性将有助于控制异常值,比如由于命中一个坏盘等原因而需要花费较长时间的异常阅读等。

二、开启

      DFSClient Hedged Read特性默认是关闭的。如果要开启,则需配置如下:

      1、dfs.client.hedged.read.threadpool.size

      并发Hedged 读的线程池大小

      2、dfs.client.hedged.read.threshold.millis

      开启一个Hedged 读前的等待时间(毫秒)

三、实现分析

      1、DFSClient实现

      DFSClient中,定义了一个静态线程池:

    private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL;

      DFSClient的构造函数中,有如下处理:

    this.hedgedReadThresholdMillis = conf.getLong(
        DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS,
        DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS);
    int numThreads = conf.getInt(
        DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE,
        DFSConfigKeys.DEFAULT_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE);
    if (numThreads > 0) {
      this.initThreadsNumForHedgedReads(numThreads);
    }

      根据参数dfs.client.hedged.read.threadpool.size确定是否实例化线程池,而initThreadsNumForHedgedReads()方法如下:

  /**
   * Create hedged reads thread pool, HEDGED_READ_THREAD_POOL, if
   * it does not already exist.
   * @param num Number of threads for hedged reads thread pool.
   * If zero, skip hedged reads thread pool creation.
   */
  private synchronized void initThreadsNumForHedgedReads(int num) {
    if (num <= 0 || HEDGED_READ_THREAD_POOL != null) return;
    HEDGED_READ_THREAD_POOL = new ThreadPoolExecutor(1, num, 60,
        TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
        new Daemon.DaemonFactory() {
          private final AtomicInteger threadIndex =
            new AtomicInteger(0);
          @Override
          public Thread newThread(Runnable r) {
            Thread t = super.newThread(r);
            t.setName("hedgedRead-" +
              threadIndex.getAndIncrement());
            return t;
          }
        },
        new ThreadPoolExecutor.CallerRunsPolicy() {

      @Override
      public void rejectedExecution(Runnable runnable,
          ThreadPoolExecutor e) {
        LOG.info("Execution rejected, Executing in current thread");
        HEDGED_READ_METRIC.incHedgedReadOpsInCurThread();
        // will run in the current thread
        super.rejectedExecution(runnable, e);
      }
    });
    HEDGED_READ_THREAD_POOL.allowCoreThreadTimeOut(true);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Using hedged reads; pool threads=" + num);
    }
  }

      实例化了一个ThreadPoolExecutor,corePoolSize大小是1,maximumPoolSize大小是参数,workQueue为一个没有数据缓冲的阻塞队列,ThreadFactory是Hadoop自己实现的后台线程工厂,并自定义了RejectedExecutionHandler,主要是在有异常时实现HEDGED_READ_METRIC.incHedgedReadOpsInCurThread(),即计数器减1。
      最后,DFSClient提供了如下几个get和set方法,方便输入流调用:

  long getHedgedReadTimeout() {
    return this.hedgedReadThresholdMillis;
  }

  @VisibleForTesting
  void setHedgedReadTimeout(long timeoutMillis) {
    this.hedgedReadThresholdMillis = timeoutMillis;
  }

  ThreadPoolExecutor getHedgedReadsThreadPool() {
    return HEDGED_READ_THREAD_POOL;
  }

  boolean isHedgedReadsEnabled() {
    return (HEDGED_READ_THREAD_POOL != null) &&
      HEDGED_READ_THREAD_POOL.getMaximumPoolSize() > 0;
  }

  DFSHedgedReadMetrics getHedgedReadMetrics() {
    return HEDGED_READ_METRIC;
  }

      2、DFSInputStream实现
      在输入流DFSInputStream的read方法中,会通过dfsClient.isHedgedReadsEnabled()判断是否开启了Hedged Read特性,在其开启的情况下,调用hedgedFetchBlockByteRange()方法进行数据读取操作,如下:

        if (dfsClient.isHedgedReadsEnabled()) {
          hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead
              - 1, buffer, offset, corruptedBlockMap);
        } else {
          fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
              buffer, offset, corruptedBlockMap);
        }

      hedgedFetchBlockByteRange()方法通过ExecutorCompletionService和Future List实现了Hedged Read特性,具体实现如下:
      1、构造一个futures列表:

ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();

      2、构造一个ExecutorCompletionService:

    CompletionService<ByteBuffer> hedgedService =
        new ExecutorCompletionService<ByteBuffer>(
        dfsClient.getHedgedReadsThreadPool());

      3、计算数据块和长度;

    ByteBuffer bb = null;
    int len = (int) (end - start + 1);
    block = getBlockAt(block.getStartOffset(), false);

      4、在一个while循环内,分两种情况:
            1)第一次读取时:从NameNode选取DataNode,即chooseDataNode,构造Callable并提交至hedgedService,获取Future<ByteBuffer> firstRequest,然后

用非阻塞的poll获取结果future,判断future是否成功,成功即返回,否则在ignored中添加下次需要忽略的本节点,incHedgedReadOps计数并继续;

            2)通过getBestNodeDNAddrPair或chooseDataNode选取DataNode,构造Callable并提交至hedgedService,通过getFirstToComplete获取第一个成功的结果后,调用cancelAll取消其它的,并计数,否则也是计数外加忽略本次DataNode。

            getFirstToComplete中,是通过阻塞式的hedgedService.take()来实现的。

            具体代码如下:

    while (true) {
      // see HDFS-6591, this metric is used to verify/catch unnecessary loops
      hedgedReadOpsLoopNumForTesting++;
      DNAddrPair chosenNode = null;
      // there is no request already executing.
      if (futures.isEmpty()) {
        // chooseDataNode is a commitment. If no node, we go to
        // the NN to reget block locations. Only go here on first read.
        chosenNode = chooseDataNode(block, ignored);
        bb = ByteBuffer.wrap(buf, offset, len);
        Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
            chosenNode, block, start, end, bb, corruptedBlockMap);
        Future<ByteBuffer> firstRequest = hedgedService
            .submit(getFromDataNodeCallable);
        futures.add(firstRequest);
        try {
          Future<ByteBuffer> future = hedgedService.poll(
              dfsClient.getHedgedReadTimeout(), TimeUnit.MILLISECONDS);
          if (future != null) {
            future.get();
            return;
          }
          if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("Waited " + dfsClient.getHedgedReadTimeout()
                + "ms to read from " + chosenNode.info
                + "; spawning hedged read");
          }
          // Ignore this node on next go around.
          ignored.add(chosenNode.info);
          dfsClient.getHedgedReadMetrics().incHedgedReadOps();
          continue; // no need to refresh block locations
        } catch (InterruptedException e) {
          // Ignore
        } catch (ExecutionException e) {
          // Ignore already logged in the call.
        }
      } else {
        // We are starting up a 'hedged' read. We have a read already
        // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
        // If no nodes to do hedged reads against, pass.
        try {
          try {
            chosenNode = getBestNodeDNAddrPair(block, ignored);
          } catch (IOException ioe) {
            chosenNode = chooseDataNode(block, ignored);
          }
          bb = ByteBuffer.allocate(len);
          Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
              chosenNode, block, start, end, bb, corruptedBlockMap);
          Future<ByteBuffer> oneMoreRequest = hedgedService
              .submit(getFromDataNodeCallable);
          futures.add(oneMoreRequest);
        } catch (IOException ioe) {
          if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("Failed getting node for hedged read: "
                + ioe.getMessage());
          }
        }
        // if not succeeded. Submit callables for each datanode in a loop, wait
        // for a fixed interval and get the result from the fastest one.
        try {
          ByteBuffer result = getFirstToComplete(hedgedService, futures);
          // cancel the rest.
          cancelAll(futures);
          if (result.array() != buf) { // compare the array pointers
            dfsClient.getHedgedReadMetrics().incHedgedReadWins();
            System.arraycopy(result.array(), result.position(), buf, offset,
                len);
          } else {
            dfsClient.getHedgedReadMetrics().incHedgedReadOps();
          }
          return;
        } catch (InterruptedException ie) {
          // Ignore and retry
        }
        // We got here if exception. Ignore this node on next go around IFF
        // we found a chosenNode to hedge read against.
        if (chosenNode != null && chosenNode.info != null) {
          ignored.add(chosenNode.info);
        }
      }
    }

        而getFirstToComplete实现如下:

  private ByteBuffer getFirstToComplete(
      CompletionService<ByteBuffer> hedgedService,
      ArrayList<Future<ByteBuffer>> futures) throws InterruptedException {
    if (futures.isEmpty()) {
      throw new InterruptedException("let's retry");
    }
    Future<ByteBuffer> future = null;
    try {
      future = hedgedService.take();
      ByteBuffer bb = future.get();
      futures.remove(future);
      return bb;
    } catch (ExecutionException e) {
      // already logged in the Callable
      futures.remove(future);
    } catch (CancellationException ce) {
      // already logged in the Callable
      futures.remove(future);
    }

    throw new InterruptedException("let's retry");
  }

        over...

时间: 2024-07-31 23:27:17

Hadoop-2.6.0 DFSClient Hedged Read实现分析的相关文章

Hadoop 2.3.0解决了哪些问题

Hadoop 2.3.0已经发布了,其中最大的亮点就是集中式的缓存管理(HDFS centralized cache management).这个功能对于提升Hadoop系统和上层应用的执行效率与实时性有很大帮助,本文从原理.架构和代码剖析三个角度来探讨这一功能. 主要解决了哪些问题 用户可以根据自己的逻辑指定一些经常被使用的数据或者高优先级任务对应的数据,让他们常驻内存而不被淘汰到磁盘.例如在Hive或 Impala构建的数据仓库应用中fact表会频繁地与其他表做JOIN,显然应该让fact常

eclipse/intellij idea 远程调试hadoop 2.6.0

很多hadoop初学者估计都我一样,由于没有足够的机器资源,只能在虚拟机里弄一个linux安装hadoop的伪分布,然后在host机上win7里使用eclipse或Intellj idea来写代码测试,那么问题来了,win7下的eclipse或intellij idea如何远程提交map/reduce任务到远程hadoop,并断点调试? 一.准备工作 1.1 在win7中,找一个目录,解压hadoop-2.6.0,本文中是D:\yangjm\Code\study\hadoop\hadoop-2.

mac OS X Yosemite 上编译hadoop 2.6.0/2.7.0及TEZ 0.5.2/0.7.0 注意事项

1.jdk 1.7问题 hadoop 2.7.0必须要求jdk 1.7.0,而oracle官网已经声明,jdk 1.7 以后不准备再提供更新了,所以趁现在还能下载,赶紧去down一个mac版吧 http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html 应该选择mac ox 64位的版本 http://download.oracle.com/otn-pub/java/jdk/7u79-b1

将Spark部署到Hadoop 2.2.0上

本文介绍的是如何将http://www.aliyun.com/zixun/aggregation/14417.html">Apache Spark部署到Hadoop 2.2.0上,如果你们的Hadoop是其他版本,比如CDH4,可直接参考官方说明操作. 需要注意两点:(1)使用的Hadoop必须是2.0系列,比如0.23.x,2.0.x,2.x.x或CDH4.CDH5等,将Spark运行在 Hadoop上,本质上是将Spark运行在Hadoop YARN上,因为Spark自身只提供了作业管

package-centOS 6.4+hadoop 2.5.0编译出错

问题描述 centOS 6.4+hadoop 2.5.0编译出错 使用命令:mvn package -Pdist,native -DskipTests -Dtar时出现以上错误,已经卡了三天了,请教大神帮忙看下,谢谢谢 解决方案 hadoop auth examples编译失败 参考:http://blog.csdn.net/w13770269691/article/details/16883663/ 解决方案二: 这个完全不一样?可否帮我再看看,可以加我QQ:937038088...如能帮我解

Hadoop 2.4.0和YARN的安装过程

Hadoop 2.x新特性     将Mapreduce框架升级到Apache YARN,YARN将Map reduce工作区分为两个:JobTracker组件:实现资源管理和任务JOB:计划/监视组件:划分到单独应用中. 使用MapReduce的2.0,开发人员现在可以直接Hadoop内部基于构建应用程序.Hadoop2.2也已经在微软widnows上支持. YARN带来了什么     1.HDFS的高可靠性     2.HDFS snapshots快照     3.支持HDFS中的 NFSv

日志-求救啊!Hadoop 2.2.0 搭建集群 启动hdfs时候 namenode 启动后报空指针

问题描述 求救啊!Hadoop 2.2.0 搭建集群 启动hdfs时候 namenode 启动后报空指针 日志如下: 2015-02-07 01:01:46,610 FATAL org.apache.hadoop.hdfs.server.namenode.NameNode: Error encountered requiring NN shutdown. Shutting down immediately. java.lang.NullPointerException at org.apache

2.6.0-2.6.3升级-hadoop 2.6.0升级到2.6.3

问题描述 hadoop 2.6.0升级到2.6.3 已经根据官网上的方法进行了操作. 1.下载了hadoop2.6.3 2.rollingUpgrade prepare 3.停止standby的namenode,hadoop2.6.3环境中执行rollingUpgrade started 4.切换active和standy的namenode,新的standby的namenode重复3 5.更新datanode 6.finalized操作 在执行3之后会有很多INFO级别的信息打印,是否不需要理会

Hadoop 2.4.0新特性介绍

在http://www.aliyun.com/zixun/aggregation/33721.html">2014年4月7日,Apache发布了Hadoop 2.4.0 .相比于hadoop 2.3.0,这个版本有了一定的改进,突出的变化可以总结为下列几点(官方文档说明): 1 支持HDFS访问控制列表(ACL,Access Control Lists) 这个特性解决了在一定情况下,文件权限访问的权限问题.其机制是基于Linux文件访问权限的特征,如果你熟悉Linux的文件访问机制,你就不