循环遍历策略RoundRobinPolicy是一种通过循环遍历方式并且跳过没有足够空间workers的为下一个数据块选择worker的策略。如果没有worker被找到,该策略会返回null。在RoundRobinPolicy内部,有三个十分重要的成员变量,如下:
// 初始化的BlockWorkerInfo列表,每次都从这个列表中选择BlockWorkerInfo private List<BlockWorkerInfo> mWorkerInfoList; // mWorkerInfoList列表当前遍历到的索引 private int mIndex; // 是否初始化的标志位mInitialized,默认为未初始化false private boolean mInitialized = false;
mWorkerInfoList为初始化的BlockWorkerInfo列表,每次都从这个列表中选择BlockWorkerInfo,当核心方法getWorkerForNextBlock()第一次被调用进行初始化时,由传入的BlockWorkerInfo列表进行赋值;
mIndex为mWorkerInfoList列表当前遍历到的索引;
mInitialized为是否初始化的标志位,默认为未初始化false。
再看核心方法getWorkerForNextBlock(),代码如下:
/** * The policy uses the first fetch of worker info list as the base, and visits each of them in a * round-robin manner in the subsequent calls. The policy doesn't assume the list of worker info * in the subsequent calls has the same order from the first, and it will skip the workers that * are no longer active. * * @param workerInfoList the info of the active workers * @param blockSizeBytes the size of the block in bytes * @return the address of the worker to write to */ @Override public WorkerNetAddress getWorkerForNextBlock(List<BlockWorkerInfo> workerInfoList, long blockSizeBytes) { // 第一次执行该方法时,未初始化的话,先进行初始化 if (!mInitialized) { // 将第一次传入的workerInfoList赋值给成员变量mWorkerInfoList mWorkerInfoList = workerInfoList; // 对mWorkerInfoList进行shuffle,避免热点问题 Collections.shuffle(mWorkerInfoList); // mWorkerInfoList列表当前遍历到的索引初始化为0,默认从第一个开始 mIndex = 0; // 是否初始化的标志位mInitialized设置为true mInitialized = true; } // at most try all the workers // 遍历成员变量BlockWorkerInfo列表mWorkerInfoList: for (int i = 0; i < mWorkerInfoList.size(); i++) { // 取出索引为mIndex的worker的网络地址WorkerNetAddress,即candidate WorkerNetAddress candidate = mWorkerInfoList.get(mIndex).getNetAddress(); // 调用findBlockWorkerInfo()方法,从入参BlockWorkerInfo列表workerInfoList中, // 根据上述地址candidate找到对应的BlockWorkerInfo BlockWorkerInfo workerInfo = findBlockWorkerInfo(workerInfoList, candidate); // 索引mIndex重置:当前mIndex加1对mWorkerInfoList列表大小取余, // 也就是mWorkerInfoList列表取下一个,达到最后一个的话,再折回列表头部,实现循环遍历 mIndex = (mIndex + 1) % mWorkerInfoList.size(); // 如果获取的workerInfo不为null,可总容量大于要求的块大小,直接返回选中的worker地址candidate if (workerInfo != null && workerInfo.getCapacityBytes() >= blockSizeBytes) { return candidate; } } // 选择不到的话,返回null return null; }
当第一次进入getWorkerForNextBlock()方法时,由于标志位mInitialized默认为false,那么我们需要先做初始化工作,如下:
1、将第一次传入的workerInfoList赋值给成员变量mWorkerInfoList;
2、对mWorkerInfoList进行shuffle,避免热点问题;
3、mWorkerInfoList列表当前遍历到的索引mIndex初始化为0,默认从第一个开始;
4、是否初始化的标志位mInitialized设置为true。
初始化后,连同后续每次进入getWorkerForNextBlock()方法时,我们就可以开始选择worker的操作了,大体流程如下:
1、遍历成员变量BlockWorkerInfo列表mWorkerInfoList:
1.1、取出索引为mIndex的worker的网络地址WorkerNetAddress,即candidate;
1.2、调用findBlockWorkerInfo()方法,从入参BlockWorkerInfo列表workerInfoList中,根据上述地址candidate找到对应的BlockWorkerInfo;
1.3、索引mIndex重置:当前mIndex加1对mWorkerInfoList列表大小取余,也就是mWorkerInfoList列表取下一个,达到最后一个的话,再折回列表头部,实现循环遍历;
1.4、如果获取的workerInfo不为null,可总容量大于要求的块大小,直接返回选中的worker地址candidate,否则继续遍历下一个位置;
2、选择不到的话,返回null。
很简单,上述流程描述的应该很详细。下面我们再看下findBlockWorkerInfo()方法,代码如下:
/** * 根据worker网络地址address从workerInfoList列表中获取对应的BlockWorkerInfo * * @param workerInfoList the list of worker info * @param address the address to look for * @return the worker info in the list that matches the host name, null if not found */ private BlockWorkerInfo findBlockWorkerInfo(List<BlockWorkerInfo> workerInfoList, WorkerNetAddress address) { // 遍历workerInfoList中的每个BlockWorkerInfo: for (BlockWorkerInfo info : workerInfoList) { // 判断地址是否相同,相同的话直接返回,否则继续遍历下一个 if (info.getNetAddress().equals(address)) { return info; } } // 列表中不存在的话,最终返回null return null; }
它负责根据worker网络地址address从workerInfoList列表中获取对应的BlockWorkerInfo,遍历workerInfoList中的每个BlockWorkerInfo,判断地址是否相同,相同的话直接返回,否则继续遍历下一个,列表中不存在的话,最终返回null。
循环遍历策略RoundRobinPolicy有一个显著的问题,既然初始化时mWorkerInfoList已经确定了,那么当worker增减时它是如何处理的呢?还是说根本无法应对这种情况?留待以后慢慢分析吧!