Alluxio源码分析定位策略:循环遍历策略RoundRobinPolicy

        循环遍历策略RoundRobinPolicy是一种通过循环遍历方式并且跳过没有足够空间workers的为下一个数据块选择worker的策略。如果没有worker被找到,该策略会返回null。在RoundRobinPolicy内部,有三个十分重要的成员变量,如下:

  // 初始化的BlockWorkerInfo列表,每次都从这个列表中选择BlockWorkerInfo
  private List<BlockWorkerInfo> mWorkerInfoList;

  // mWorkerInfoList列表当前遍历到的索引
  private int mIndex;

  // 是否初始化的标志位mInitialized,默认为未初始化false
  private boolean mInitialized = false;

        mWorkerInfoList为初始化的BlockWorkerInfo列表,每次都从这个列表中选择BlockWorkerInfo,当核心方法getWorkerForNextBlock()第一次被调用进行初始化时,由传入的BlockWorkerInfo列表进行赋值;

        mIndex为mWorkerInfoList列表当前遍历到的索引;

        mInitialized为是否初始化的标志位,默认为未初始化false。

        再看核心方法getWorkerForNextBlock(),代码如下:

  /**
   * The policy uses the first fetch of worker info list as the base, and visits each of them in a
   * round-robin manner in the subsequent calls. The policy doesn't assume the list of worker info
   * in the subsequent calls has the same order from the first, and it will skip the workers that
   * are no longer active.
   *
   * @param workerInfoList the info of the active workers
   * @param blockSizeBytes the size of the block in bytes
   * @return the address of the worker to write to
   */
  @Override
  public WorkerNetAddress getWorkerForNextBlock(List<BlockWorkerInfo> workerInfoList,
      long blockSizeBytes) {

	// 第一次执行该方法时,未初始化的话,先进行初始化
	if (!mInitialized) {

      // 将第一次传入的workerInfoList赋值给成员变量mWorkerInfoList
      mWorkerInfoList = workerInfoList;

      // 对mWorkerInfoList进行shuffle,避免热点问题
      Collections.shuffle(mWorkerInfoList);

      // mWorkerInfoList列表当前遍历到的索引初始化为0,默认从第一个开始
      mIndex = 0;

      // 是否初始化的标志位mInitialized设置为true
      mInitialized = true;
    }

    // at most try all the workers

	// 遍历成员变量BlockWorkerInfo列表mWorkerInfoList:
    for (int i = 0; i < mWorkerInfoList.size(); i++) {

      // 取出索引为mIndex的worker的网络地址WorkerNetAddress,即candidate
      WorkerNetAddress candidate = mWorkerInfoList.get(mIndex).getNetAddress();

      // 调用findBlockWorkerInfo()方法,从入参BlockWorkerInfo列表workerInfoList中,
      // 根据上述地址candidate找到对应的BlockWorkerInfo
      BlockWorkerInfo workerInfo = findBlockWorkerInfo(workerInfoList, candidate);

      // 索引mIndex重置:当前mIndex加1对mWorkerInfoList列表大小取余,
      // 也就是mWorkerInfoList列表取下一个,达到最后一个的话,再折回列表头部,实现循环遍历
      mIndex = (mIndex + 1) % mWorkerInfoList.size();

      // 如果获取的workerInfo不为null,可总容量大于要求的块大小,直接返回选中的worker地址candidate
      if (workerInfo != null && workerInfo.getCapacityBytes() >= blockSizeBytes) {
        return candidate;
      }
    }

    // 选择不到的话,返回null
    return null;
  }

        当第一次进入getWorkerForNextBlock()方法时,由于标志位mInitialized默认为false,那么我们需要先做初始化工作,如下:

        1、将第一次传入的workerInfoList赋值给成员变量mWorkerInfoList;

        2、对mWorkerInfoList进行shuffle,避免热点问题;

        3、mWorkerInfoList列表当前遍历到的索引mIndex初始化为0,默认从第一个开始;

        4、是否初始化的标志位mInitialized设置为true。

        初始化后,连同后续每次进入getWorkerForNextBlock()方法时,我们就可以开始选择worker的操作了,大体流程如下:

        1、遍历成员变量BlockWorkerInfo列表mWorkerInfoList:

              1.1、取出索引为mIndex的worker的网络地址WorkerNetAddress,即candidate;

              1.2、调用findBlockWorkerInfo()方法,从入参BlockWorkerInfo列表workerInfoList中,根据上述地址candidate找到对应的BlockWorkerInfo;

              1.3、索引mIndex重置:当前mIndex加1对mWorkerInfoList列表大小取余,也就是mWorkerInfoList列表取下一个,达到最后一个的话,再折回列表头部,实现循环遍历;

              1.4、如果获取的workerInfo不为null,可总容量大于要求的块大小,直接返回选中的worker地址candidate,否则继续遍历下一个位置;

        2、选择不到的话,返回null。

        很简单,上述流程描述的应该很详细。下面我们再看下findBlockWorkerInfo()方法,代码如下:

  /**
   * 根据worker网络地址address从workerInfoList列表中获取对应的BlockWorkerInfo
   *
   * @param workerInfoList the list of worker info
   * @param address the address to look for
   * @return the worker info in the list that matches the host name, null if not found
   */
  private BlockWorkerInfo findBlockWorkerInfo(List<BlockWorkerInfo> workerInfoList,
      WorkerNetAddress address) {

	// 遍历workerInfoList中的每个BlockWorkerInfo:
    for (BlockWorkerInfo info : workerInfoList) {

      // 判断地址是否相同,相同的话直接返回,否则继续遍历下一个
      if (info.getNetAddress().equals(address)) {
        return info;
      }
    }

    // 列表中不存在的话,最终返回null
    return null;
  }

        它负责根据worker网络地址address从workerInfoList列表中获取对应的BlockWorkerInfo,遍历workerInfoList中的每个BlockWorkerInfo,判断地址是否相同,相同的话直接返回,否则继续遍历下一个,列表中不存在的话,最终返回null。

        循环遍历策略RoundRobinPolicy有一个显著的问题,既然初始化时mWorkerInfoList已经确定了,那么当worker增减时它是如何处理的呢?还是说根本无法应对这种情况?留待以后慢慢分析吧!

时间: 2024-09-08 20:52:14

Alluxio源码分析定位策略:循环遍历策略RoundRobinPolicy的相关文章

Alluxio源码分析定位策略:指定主机策略SpecificHostPolicy

        指定主机策略SpecificHostPolicy是一种总是返回一个指定主机名的worker的定位策略.如果在那个主机名对应机器上没有活跃worker的话则返回null.在SpecificHostPolicy内部,封装了一个成员变量,如下: // 主机名 private final String mHostname;         这个mHostname就是SpecificHostPolicy策略实现所依赖的主机名,在SpecificHostPolicy构造方法中,就会根据外部传

Alluxio源码分析定位策略:最大可用容量优先策略MostAvailableFirstPolicy

        最大可用容量优先策略MostAvailableFirstPolicy是一种worker可用容量最大的定位策略.如果没有worker合格的话该策略返回null.它的核心方法getWorkerForNextBlock()实现如下: /** * A policy that returns the worker with the most available bytes. The policy returns null if no * worker is qualified. * 一种w

Alluxio源码分析:RPC框架浅析(三)

        Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS.HBase等一样,也是由主从节点构成的.而节点之间的通信,一般都是采用的RPC通讯模型.Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答.         继<Alluxio源码分析:RPC框架浅析(二)>一文后,本文继续讲解Alluxio中RPC实现.         4.Client端实现        以FileSystemM

Alluxio源码分析:RPC框架浅析(一)

        Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS.HBase等一样,也是由主从节点构成的.而节点之间的通信,一般都是采用的RPC通讯模型.Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答.         一.Alluxio中RPC实现技术支持         Alluxio中的RPC是依靠Thrift实现的,Apache Thrift 是 Facebook 实现的一种高效的.支持多

Alluxio源码分析:RPC框架浅析(二)

        Alluxio源码分析是一个基于内存的分布式文件系统,和HDFS.HBase等一样,也是由主从节点构成的.而节点之间的通信,一般都是采用的RPC通讯模型.Alluxio中RPC是基于何种技术如何实现的呢?它对于RPC请求是如何处理的?都涉及到哪些组件?本文将针对这些问题,为您一一解答.         继<Alluxio源码分析:RPC框架浅析(一)>一文后,本文继续讲解Alluxio中RPC实现.         3.Server端实现:RPC Server端口绑定.传输协议

OkHttp 3.7源码分析(四)——缓存策略

OkHttp3.7源码分析文章列表如下: OkHttp源码分析--整体架构 OkHttp源码分析--拦截器 OkHttp源码分析--任务队列 OkHttp源码分析--缓存策略 OkHttp源码分析--多路复用 合理地利用本地缓存可以有效地减少网络开销,减少响应延迟.HTTP报头也定义了很多与缓存有关的域来控制缓存.今天就来讲讲OkHttp中关于缓存部分的实现细节. 1. HTTP缓存策略 首先来了解下HTTP协议中缓存部分的相关域. 1.1 Expires 超时时间,一般用在服务器的respon

Alluxio源码分析读数据:打开文件选项OpenFileOptions

        OpenFileOptions是为读数据打开一个文件方法的选项,提供了打开文件的多种选择.在OpenFileOptions内部,封装了两个重要的成员变量,如下: // 定位策略 private FileWriteLocationPolicy mLocationPolicy; // 读取类型 private ReadType mReadType;         其中mLocationPolicy为FileWriteLocationPolicy类型的定位策略,而mReadType为

Spark源码分析之六:Task调度(二)

        话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: // Make fake resource offers on all executors     // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的)     private def makeOffers() {  

Spark Catalyst 源码分析

Architecture Ø 把输入的SQL,parse成unresolved logical plan,这一步参考SqlParser的实现 Ø 把unresolved logical plan转化成resolved logical plan,这一步参考analysis的实现 Ø 把resolved logical plan转化成optimized logical plan,这一步参考optimize的实现 Ø 把optimized logical plan转化成physical plan,这一