在《HDFS源码分析心跳汇报之整体结构》一文中,我们详细了解了HDFS中关于心跳的整体结构,知道了BlockPoolManager、BPOfferService和BPServiceActor三者之间的关系。那么,HDFS心跳相关的这些数据结构,都是如何被初始化的呢?本文,我们就开始研究HDFS心跳汇报之数据结构初始化。
首先,在DataNode节点启动时所必须执行的startDataNode()方法中,有如下代码:
// DataNode启动时执行的startDataNode()方法 // 构造一个BlockPoolManager实例 // 调用其refreshNamenodes()方法 blockPoolManager = new BlockPoolManager(this); blockPoolManager.refreshNamenodes(conf);
它构造了一个BlockPoolManager实例,并调用其refreshNamenodes()方法,完成NameNodes的刷新。我们来看下这个方法:
void refreshNamenodes(Configuration conf) throws IOException { LOG.info("Refresh request received for nameservices: " + conf.get (DFSConfigKeys.DFS_NAMESERVICES)); // 从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap Map<String, Map<String, InetSocketAddress>> newAddressMap = DFSUtil .getNNServiceRpcAddressesForCluster(conf); // 需要通过使用synchronized关键字在refreshNamenodesLock上加互斥锁 synchronized (refreshNamenodesLock) { // 调用doRefreshNamenodes()方法执行集合newAddressMap中的刷新 doRefreshNamenodes(newAddressMap); } }
很简单,两大步骤:第一步,从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap,第二步调用doRefreshNamenodes()方法执行集合newAddressMap中NameNodes的刷新。
首先,我们看下如何从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap,相关代码如下:
/** * Returns list of InetSocketAddresses corresponding to the namenode * that manages this cluster. Note this is to be used by datanodes to get * the list of namenode addresses to talk to. * * Returns namenode address specifically configured for datanodes (using * service ports), if found. If not, regular RPC address configured for other * clients is returned. * * @param conf configuration * @return list of InetSocketAddress * @throws IOException on error */ public static Map<String, Map<String, InetSocketAddress>> getNNServiceRpcAddressesForCluster(Configuration conf) throws IOException { // Use default address as fall back String defaultAddress; try { // 获取默认地址defaultAddress defaultAddress = NetUtils.getHostPortString(NameNode.getAddress(conf)); } catch (IllegalArgumentException e) { defaultAddress = null; } // 获取hdfs的内部命名服务:dfs.internal.nameservices,得到集合parentNameServices Collection<String> parentNameServices = conf.getTrimmedStringCollection (DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY); if (parentNameServices.isEmpty()) {// 如果没有配置dfs.internal.nameservices // 获取dfs.nameservices,赋值给集合parentNameServices parentNameServices = conf.getTrimmedStringCollection (DFSConfigKeys.DFS_NAMESERVICES); } else { // Ensure that the internal service is ineed in the list of all available // nameservices. // 获取dfs.nameservices Set<String> availableNameServices = Sets.newHashSet(conf .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMESERVICES)); // 验证parentNameServices中的每个nsId在dfs.nameservices中是否都存在 // 即参数dfs.internal.nameservices包含在参数dfs.nameservices范围内 for (String nsId : parentNameServices) { if (!availableNameServices.contains(nsId)) { throw new IOException("Unknown nameservice: " + nsId); } } } // 调用getAddressesForNsIds()方法,获取nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合 // dfs.namenode.servicerpc-address // dfs.namenode.rpc-address Map<String, Map<String, InetSocketAddress>> addressList = getAddressesForNsIds(conf, parentNameServices, defaultAddress, DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY); if (addressList.isEmpty()) { throw new IOException("Incorrect configuration: namenode address " + DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY + " or " + DFS_NAMENODE_RPC_ADDRESS_KEY + " is not configured."); } return addressList; }
这个方法的处理逻辑如下:
1、首先,根据NameNode类的静态方法getAddress()从配置信息中获取默认地址defaultAddress;
2、然后,获取hdfs的内部命名服务:dfs.internal.nameservices,得到集合parentNameServices:
2.1、如果没有配置dfs.internal.nameservices,获取dfs.nameservices,赋值给集合parentNameServices;
2.2、如果配置了dfs.internal.nameservices,再获取获取dfs.nameservices,得到availableNameServices,验证parentNameServices中的每个nsId在availableNameServices中是否都存在,即参数dfs.internal.nameservices包含在参数dfs.nameservices范围内;
3、调用getAddressesForNsIds()方法,利用conf、parentNameServices、defaultAddress等获取nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合addressList,并返回。
下面,我们再看下getAddressesForNsIds()方法,代码如下:
/** * Returns the configured address for all NameNodes in the cluster. * @param conf configuration * @param nsIds *@param defaultAddress default address to return in case key is not found. * @param keys Set of keys to look for in the order of preference @return a map(nameserviceId to map(namenodeId to InetSocketAddress)) */ private static Map<String, Map<String, InetSocketAddress>> getAddressesForNsIds(Configuration conf, Collection<String> nsIds, String defaultAddress, String... keys) { // Look for configurations of the form <key>[.<nameserviceId>][.<namenodeId>] // across all of the configured nameservices and namenodes. // dfs.namenode.servicerpc-address // dfs.namenode.rpc-address Map<String, Map<String, InetSocketAddress>> ret = Maps.newLinkedHashMap(); // 遍历每个nameserviceId,做以下处理: for (String nsId : emptyAsSingletonNull(nsIds)) { // 通过getAddressesForNameserviceId()方法获取nameNodeId->InetSocketAddress的对应关系,nameNodeId来自参数dfs.ha.namenodes.nsId Map<String, InetSocketAddress> isas = getAddressesForNameserviceId(conf, nsId, defaultAddress, keys); if (!isas.isEmpty()) { // 将nameserviceId->{nameNodeId->InetSocketAddress}的对应关系放入集合ret ret.put(nsId, isas); } } // 返回nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合ret return ret; }
非常简单,遍历每个nameserviceId,做以下处理:
1、通过getAddressesForNameserviceId()方法获取nameNodeId->InetSocketAddress的对应关系,nameNodeId来自参数dfs.ha.namenodes.nsId;
2、将nameserviceId->{nameNodeId->InetSocketAddress}的对应关系放入集合ret;
3、最后返回nameserviceId->{nameNodeId->InetSocketAddress}对应关系的集合ret。
继续看getAddressesForNameserviceId()方法,如下:
private static Map<String, InetSocketAddress> getAddressesForNameserviceId( Configuration conf, String nsId, String defaultValue, String... keys) { // keys // dfs.namenode.servicerpc-address // dfs.namenode.rpc-address // 获取dfs.ha.namenodes.nsId Collection<String> nnIds = getNameNodeIds(conf, nsId); Map<String, InetSocketAddress> ret = Maps.newHashMap(); for (String nnId : emptyAsSingletonNull(nnIds)) { String suffix = concatSuffixes(nsId, nnId); // 根据keys获取address String address = getConfValue(defaultValue, suffix, conf, keys); if (address != null) { // 将address封装成InetSocketAddress,得到isa InetSocketAddress isa = NetUtils.createSocketAddr(address); if (isa.isUnresolved()) { LOG.warn("Namenode for " + nsId + " remains unresolved for ID " + nnId + ". Check your hdfs-site.xml file to " + "ensure namenodes are configured properly."); } // 将nnId->InetSocketAddress的对应关系放入到Map中 ret.put(nnId, isa); } } return ret; }
它通过参数获取dfs.ha.namenodes.nsId获取到NameNodeId的集合nnIds,然后针对每个NameNode,根据keys获取address,这keys传递进来的就是dfs.namenode.servicerpc-address、dfs.namenode.rpc-address,也就是优先取前一个参数,前一个取不到的话,再取第二个参数,然后将address封装成InetSocketAddress,得到isa,将nnId->InetSocketAddress的对应关系放入到Map中,最终返回给上层应用。
至此,从配置信息conf中获取nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap就分析完了。下面,我们再看下初始化的重点:调用doRefreshNamenodes()方法执行集合newAddressMap中的刷新。代码如下:
private void doRefreshNamenodes( Map<String, Map<String, InetSocketAddress>> addrMap) throws IOException { // 确保当前线程在refreshNamenodesLock上拥有互斥锁 assert Thread.holdsLock(refreshNamenodesLock); // 定义三个集合,分别为待刷新的toRefresh、待添加的toAdd和待移除的toRemove Set<String> toRefresh = Sets.newLinkedHashSet(); Set<String> toAdd = Sets.newLinkedHashSet(); Set<String> toRemove; // 使用synchronized关键字在当前对象上获得互斥锁 synchronized (this) { // Step 1. For each of the new nameservices, figure out whether // it's an update of the set of NNs for an existing NS, // or an entirely new nameservice. // 第一步,针对所有新的nameservices中的每个nameservice, // 确认它是一个已存在nameservice中的被更新了的NN集合,还是完全的一个新的nameservice // 判断的依据就是对应nameserviceId是否在bpByNameserviceId结合中存在 // 循环addrMap,放入待添加或者待刷新集合 for (String nameserviceId : addrMap.keySet()) { // 如果bpByNameserviceId结合中存在nameserviceId,加入待刷新集合toRefresh,否则加入到待添加集合toAdd if (bpByNameserviceId.containsKey(nameserviceId)) { toRefresh.add(nameserviceId); } else { toAdd.add(nameserviceId); } } // Step 2. Any nameservices we currently have but are no longer present // need to be removed. // 第二步,删除所有我们目前拥有但是现在不再需要的,也就是bpByNameserviceId中存在,而配置信息addrMap中没有的 // 加入到待删除集合toRemove toRemove = Sets.newHashSet(Sets.difference( bpByNameserviceId.keySet(), addrMap.keySet())); // 验证,待刷新集合toRefresh的大小与待添加集合toAdd的大小必须等于配置信息addrMap中的大小 assert toRefresh.size() + toAdd.size() == addrMap.size() : "toAdd: " + Joiner.on(",").useForNull("<default>").join(toAdd) + " toRemove: " + Joiner.on(",").useForNull("<default>").join(toRemove) + " toRefresh: " + Joiner.on(",").useForNull("<default>").join(toRefresh); // Step 3. Start new nameservices // 第三步,启动所有新的nameservices if (!toAdd.isEmpty()) {// 待添加集合toAdd不为空 LOG.info("Starting BPOfferServices for nameservices: " + Joiner.on(",").useForNull("<default>").join(toAdd)); // 针对待添加集合toAdd中的每个nameserviceId,做以下处理: for (String nsToAdd : toAdd) { // 从addrMap中根据nameserviceId获取对应Socket地址InetSocketAddress,创建集合addrs ArrayList<InetSocketAddress> addrs = Lists.newArrayList(addrMap.get(nsToAdd).values()); // 根据addrs创建BPOfferService BPOfferService bpos = createBPOS(addrs); // 将nameserviceId->BPOfferService的对应关系添加到集合bpByNameserviceId中 bpByNameserviceId.put(nsToAdd, bpos); // 将BPOfferService添加到集合offerServices中 offerServices.add(bpos); } } // 启动所有BPOfferService,实际上是通过调用它的start()方法启动 startAll(); } // Step 4. Shut down old nameservices. This happens outside // of the synchronized(this) lock since they need to call // back to .remove() from another thread // 第4步,停止所有旧的nameservices。这个是发生在synchronized代码块外面的,是因为它们需要回调另外一个线程的remove()方法 if (!toRemove.isEmpty()) { LOG.info("Stopping BPOfferServices for nameservices: " + Joiner.on(",").useForNull("<default>").join(toRemove)); // 遍历待删除集合toRemove中的每个nameserviceId for (String nsToRemove : toRemove) { // 根据nameserviceId从集合bpByNameserviceId中获取BPOfferService BPOfferService bpos = bpByNameserviceId.get(nsToRemove); // 调用BPOfferService的stop()和join()方法停止服务 bpos.stop(); bpos.join(); // they will call remove on their own // 它们会调用本身的remove()方法 } } // Step 5. Update nameservices whose NN list has changed // 第5步,更新NN列表已变化的nameservices if (!toRefresh.isEmpty()) {// 待更新集合toRefresh不为空时 LOG.info("Refreshing list of NNs for nameservices: " + Joiner.on(",").useForNull("<default>").join(toRefresh)); // 遍历待更新集合toRefresh中的每个nameserviceId for (String nsToRefresh : toRefresh) { // 根据nameserviceId从集合bpByNameserviceId中取出对应的BPOfferService BPOfferService bpos = bpByNameserviceId.get(nsToRefresh); // 根据BPOfferService从配置信息addrMap中取出NN的Socket地址InetSocketAddress,形成列表addrs ArrayList<InetSocketAddress> addrs = Lists.newArrayList(addrMap.get(nsToRefresh).values()); // 调用BPOfferService的refreshNNList()方法根据addrs刷新NN列表 bpos.refreshNNList(addrs); } } }
整个doRefreshNamenodes()方法比较长,但是主体逻辑很清晰,主要分五大步骤,分别如下:
1、第一步,针对nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap中每个nameserviceid,确认它是一个完全新加的nameservice,还是一个其NameNode列表被更新的nameservice,分别加入待添加toAdd和待刷新toRefresh集合;
2、第二步,针对newAddressMap中没有,而目前DataNode内存bpByNameserviceId中存在的nameservice,需要删除,添加到待删除toRemove集合;
3、第三步,处理待添加toAdd集合,启动所有新的nameservices:根据addrs创建BPOfferService,维护BPOfferService相关映射集合,然后启动所有的BPOfferService;
4、第四步,处理待删除toRemove集合,停止所有旧的nameservices;
5、第五步,处理待刷新toRefresh集合,更新NN列表已变化的nameservices。
对,就是这么简单,将需要处理的nameservice分别加入到不同的集合,然后按照添加、删除、更新的顺序针对处理类型相同的nameservice一并处理即可。
接下来,我们分别研究下每一步的细节:
1、第一步,针对nameserviceid->{namenode名称->InetSocketAddress}的映射集合newAddressMap中每个nameserviceid,确认它是一个完全新加的nameservice,还是一个其NameNode列表被更新的nameservice,分别加入待添加toAdd和待刷新toRefresh集合;
它的处理思路是,循环addrMap中每个nameserviceid,放入待添加toAdd或者待刷新toRefresh集合;如果bpByNameserviceId结合中存在nameserviceId,加入待刷新集合toRefresh,否则加入到待添加集合toAdd。
2、第二步,针对newAddressMap中没有,而目前DataNode内存bpByNameserviceId中存在的nameservice,需要删除,添加到待删除toRemove集合;
它的处理思路是:利用Sets的difference()方法,比较bpByNameserviceId和addrMap两个集合的keySet,找出bpByNameserviceId中存在,但是addrMap中不存在的nameserviceid,生成待删除集合toRemove。
3、第三步,处理待添加toAdd集合,启动所有新的nameservices:根据addrs创建BPOfferService,维护BPOfferService相关映射集合,然后启动所有的BPOfferService;
这一步针对待添加集合toAdd中的每个nameserviceId,做以下处理:
3.1、从addrMap中根据nameserviceId获取对应Socket地址InetSocketAddress,创建集合addrs;
3.2、根据addrs创建BPOfferService实例bpos;
3.3、将nameserviceId->BPOfferService的对应关系添加到集合bpByNameserviceId中
3.4、将BPOfferService添加到集合offerServices中;
最后,调用startAll()方法启动所有BPOfferService,实际上是通过调用它的start()方法启动。
其中,创建BPOfferService实例bpos时,BPOfferService的构造方法如下:
// 构造方法 BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) { Preconditions.checkArgument(!nnAddrs.isEmpty(), "Must pass at least one NN."); this.dn = dn; // 遍历nnAddrs,为每个namenode添加一个构造的BPServiceActor线城实例,加入到bpServices列表 for (InetSocketAddress addr : nnAddrs) { this.bpServices.add(new BPServiceActor(addr, this)); } }
它实际上是遍历nnAddrs,为每个namenode添加一个构造的BPServiceActor线城实例,加入到bpServices列表。
而调用startAll()方法启动所有BPOfferService时,执行的代码如下:
synchronized void startAll() throws IOException { try { UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { // 遍历offerServices,启动所有的BPOfferService for (BPOfferService bpos : offerServices) { bpos.start(); } return null; } }); } catch (InterruptedException ex) { IOException ioe = new IOException(); ioe.initCause(ex.getCause()); throw ioe; } }
它会遍历offerServices,启动所有的BPOfferService,而BPOfferService的启动,实际上就是将其所持有的每个NameNode对应的BPServiceActor线程启动,代码如下:
//This must be called only by blockPoolManager void start() { for (BPServiceActor actor : bpServices) { actor.start(); } }
4、第四步,处理待删除toRemove集合,停止所有旧的nameservices;
在这一步中,遍历待删除集合toRemove中的每个nameserviceId:
4.1、根据nameserviceId从集合bpByNameserviceId中获取BPOfferService;
4.2、调用BPOfferService的stop()和join()方法停止服务,它们会调用本身的remove()方法;
而BPOfferService的stop()和join()方法,则是依次调用BPOfferService所包含的所有BPServiceActor线程的stop()和join()方法,代码如下:
//This must be called only by blockPoolManager. void stop() { for (BPServiceActor actor : bpServices) { actor.stop(); } } //This must be called only by blockPoolManager void join() { for (BPServiceActor actor : bpServices) { actor.join(); } }
5、第五步,处理待刷新toRefresh集合,更新NN列表已变化的nameservices;
在最后一步中,遍历待更新集合toRefresh中的每个nameserviceId:
5.1、根据nameserviceId从集合bpByNameserviceId中取出对应的BPOfferService;
5.2、根据BPOfferService从配置信息addrMap中取出NN的Socket地址InetSocketAddress,形成列表addrs;
5.3、调用BPOfferService的refreshNNList()方法根据addrs刷新NN列表。
好了,HDFS心跳相关数据结构的初始化已分析完毕,至此,涉及到每个命名空间服务中每个NameNode相关的BPServiceActor线程均已启动,它是真正干活的苦力,真正的底层劳动人民啊!至于它是怎么运行来完成HDFS心跳的,我们下一节再分析吧!