zookeeper源码分析之leader选举

zookeeper提供顺序一致性、原子性、统一视图、可靠性保证服务
zookeeper使用的是zab(atomic broadcast protocol)协议而非paxos协议
zookeeper能处理并发地处理多个客户端的写请求,并且以FIFO顺序commit这些写操作,zab采用了一个事务ID来实现事务的全局有序性,
在Zab协议的实现时,分为三个阶段:
1、 Leader Election
2、 Recovery Phase
3、 Broadcast Phase

今天就先分析选举算法的源码实现

zookeeper默认选举算法为FastLeaderElection.java。其主要方法为FastLeaderElection.lookForLeader,该接口是一个同步接口,直到选举结束才会返回。选举的结果保存在类Vote中

选举整体过程主要流程可概括为下图:

来看源码实现

1.//首先logicalclock自增, 在这里logicalclock表示本次选举的id,逻辑时钟的值,这个值从0开始递增,每次选举对应一个值,如果在同一次选举中,这个值是一样的,逻辑时钟值越大,说明该节点上的这一次选举leader的进程更加新

  1. synchronized(this){  
  2.    logicalclock++;  
  3.     //如果自己不是OBSERVER,则投给自己    
  4.    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());  
  5. }  

2.sendNotifications();向所有的node发送notification消息,其主方法:

  1. ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader, proposedZxid,logicalclock,QuorumPeer.ServerState.LOOKING,sid,proposedEpoch);  
  2. 消息格式:  
  3. mType  type     消息类型  
  4. long   leader     推荐的leader的id,就是配置文件中写好的每个服务器的id  
  5. long   zxid         推荐的leader的zxid,zookeeper中的每份数据,都有一个对应的zxid值,越新的数据,zxid值就越大  
  6. long   epoch,     logicalclock  
  7. ServerState state,  本节点的状态  
  8. long    sid         本节点的 id,即myid  

发送完添加到到发送队列中

3.当该节点的状态为LOOKING且没有stop时,就一直loop到选出leader为止

  1. //从消息队列中接收消息  
  2. Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);  
  3. //如没有接收到消息,则检查manager.haveDelivered(),如果已经全部发送出去了,就继续发送,一直到选出leader为止。否则就重新连接。  
  4. if(manager.haveDelivered()){  
  5.         sendNotifications();  
  6.     } else {  
  7.         manager.connectAll();  
  8.     }  
  9.     int tmpTimeOut = notTimeout*2;//延长超时时间    
  10.     notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval);  

4.如果收到回应消息,则检查回应应状态,回应状态有以下四种:LOOKING、OBSERVING、FOLLOWING、LEADING
5.下面分析最核心的LOOKING状态:

  1. case LOOKING:  
  2. // If notification > current, replace and send messages out  
  3. if (n.electionEpoch > logicalclock) {//该节点的epoch大于 logicalclock,表示当前新一轮的选举  
  4.    logicalclock = n.electionEpoch;//更新本地的logicalclock  
  5.    recvset.clear();//清空接收队列recvset  
  6. //调用totalOrderPredicate决定是否更新自己的投票,依次比较选举轮数epoch,事务zxid,服务器编号server id(myid)    
  7.    if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {  
  8.        updateProposal(n.leader, n.zxid, n.peerEpoch);//把投票修改为对方的  
  9.    } else {  
  10.        updateProposal(getInitId(),getInitLastLoggedZxid(), getPeerEpoch());  
  11.    }  
  12.    sendNotifications();//广播消息  
  13. } else if (n.electionEpoch < logicalclock) {//如果该节点的epoch小于logicalclock,则忽略  
  14.    break;  
  15. } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {  
  16.    updateProposal(n.leader, n.zxid, n.peerEpoch);  
  17.    sendNotifications();  
  18. }  
  19. recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//把从该节点的信息发到recvset中,表明已经收到该节点的回应  
  20. //通过termPredicate函数判断recvset是否已经达到法定quorum,默认超过半数就通过  
  21. if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) {  
  22. // Verify if there is any change in the proposed leader  
  23.    while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){//循环,一直等新的notification到达,直到超时  
  24.        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){  
  25.            recvqueue.put(n);  
  26.            break;  
  27.        }  
  28.    }  
  29.   if (n == null) {//确定leader  
  30.        self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState());  
  31.        Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);  
  32.        leaveInstance(endVote);//清空接收队列  
  33.        return endVote;  
  34.    }  
  35. }   
  36.   
  37.   
  38.    /* 
  39.      *  
  40.      *  返回true说明需要更新数据 
  41.      * We return true if one of the following three cases hold: 
  42.      * 1- New epoch is higher 
  43.      * 2- New epoch is the same as current epoch, but new zxid is higher 
  44.      * 3- New epoch is the same as current epoch, new zxid is the same 
  45.      *  as current zxid, but server id is higher. 
  46.      */  
  47. protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {  
  48. ...   
  49.    return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));  
  50. }  

zookeeper的leader算法类似于公民选举,每一个节点(选民),他们都有自己的推荐人(自己)。谁更适合成为leader有一个简单的规则,例如zxid(数据新)、sid/myid(服务编号大)。每个选民都告诉其他选民自己目前的推荐人是谁,当选民发现有比自己更适合的人时就转而推荐这个更适合的人。最后,过半数人意见一致时,就可以结束选举。当然,如果大多数人已经选举出了leader,那剩下的选民(无论是否参与投票)就只能接受已经选出的leader。

watch注意事项
1.Zookeeper客户端可以在znode上设置Watch。znode发生的变化会触发watch然后清除watch。当一个watch被触发,Zookeeper给客户端发送一个通知,当ZooKeeper客户端断开和服务器的连接,直到重新连接上这段时间你都收不到任何通知。如果你正在监视znode是否存在,那么你在断开连接期间收不到它创建和销毁的通知。

2.Zookeeper的客户端和服务会检查确保每个znode上的数据小于1M,因为Zookeeper为了提供高吞吐量,保存到内存里的数据量不宜过多

转载请注明来源:http://blog.csdn.net/odailidong/article/details/41855613

时间: 2024-09-17 19:37:13

zookeeper源码分析之leader选举的相关文章

ZooKeeper源码研究系列(4)集群版服务器介绍

1 系列目录 ZooKeeper源码研究系列(1)源码环境搭建 ZooKeeper源码研究系列(2)客户端创建连接过程分析 ZooKeeper源码研究系列(3)单机版服务器介绍 ZooKeeper源码研究系列(4)集群版服务器介绍 2 集群版服务器启动过程 启动类是org.apache.zookeeper.server.quorum.QuorumPeerMain,启动参数就是配置文件的地址 2.1 配置文件说明 来看下一个简单的配置文件内容: tickTime=4000 initLimit=10

深入理解Spark:核心思想与源码分析

大数据技术丛书 深入理解Spark:核心思想与源码分析 耿嘉安 著 图书在版编目(CIP)数据 深入理解Spark:核心思想与源码分析/耿嘉安著. -北京:机械工业出版社,2015.12 (大数据技术丛书) ISBN 978-7-111-52234-8 I. 深- II.耿- III.数据处理软件 IV. TP274 中国版本图书馆CIP数据核字(2015)第280808号 深入理解Spark:核心思想与源码分析 出版发行:机械工业出版社(北京市西城区百万庄大街22号 邮政编码:100037)

ZooKeeper源码研究系列(3)单机版服务器介绍

1 系列目录 ZooKeeper源码研究系列(1)源码环境搭建 ZooKeeper源码研究系列(2)客户端创建连接过程分析 ZooKeeper源码研究系列(3)单机版服务器介绍 ZooKeeper源码研究系列(4)集群版服务器介绍 2 单机版服务器启动方式 单机版的服务器启动,使用ZooKeeperServerMain的main函数来启动,参数分为两种: 只有一个参数:表示为一个配置文件地址 有2~4个参数:分别表示端口.dataDir.tickTime.maxClientCnxns 详细介绍见

Apache Kafka源码分析 – Broker Server

1. Kafka.scala 在Kafka的main入口中startup KafkaServerStartable, 而KafkaServerStartable这是对KafkaServer的封装 1: val kafkaServerStartble = new KafkaServerStartable(serverConfig) 2: kafkaServerStartble.startup   1: package kafka.server 2: class KafkaServerStartab

ZooKeeper源码研究系列(5)集群版建立连接过程

1 系列目录 ZooKeeper源码研究系列(1)源码环境搭建 ZooKeeper源码研究系列(2)客户端创建连接过程分析 ZooKeeper源码研究系列(3)单机版服务器介绍 ZooKeeper源码研究系列(4)集群版服务器介绍 2 各服务器角色的请求处理器链 先介绍下Leader.Follower.Observer服务器的请求处理器链 2.1 Leader服务器 PrepRequestProcessor->ProposalRequestProcessor->CommitProcessor-

《深入理解SPARK:核心思想与源码分析》一书正式出版上市

自己牺牲了7个月的周末和下班空闲时间,通过研究Spark源码和原理,总结整理的<深入理解Spark:核心思想与源码分析>一书现在已经正式出版上市,目前亚马逊.京东.当当.天猫等网站均有销售,欢迎感兴趣的同学购买.我开始研究源码时的Spark版本是1.2.0,经过7个多月的研究和出版社近4个月的流程,Spark自身的版本迭代也很快,如今最新已经是1.6.0.目前市面上另外2本源码研究的Spark书籍的版本分别是0.9.0版本和1.2.0版本,看来这些书的作者都与我一样,遇到了这种问题.由于研究和

Hive源码分析:Driver类运行过程

说明: 本文的源码分析基于hive-0.12.0-cdh5.0.1. 概括 从<hive cli的入口类>中可以知道hive中处理hive命令的处理器一共有以下几种: (1)set SetProcessor,设置修改参数,设置到SessionState的HiveConf里. (2)dfs DfsProcessor,使用hadoop的FsShell运行hadoop的命令. (3)add AddResourceProcessor,添加到SessionState的resource_map里,运行提交

HBase源码分析之HRegionServer上MemStore的flush处理流程(一)

        在<HBase源码分析之HRegion上MemStore的flsuh流程(一)>.<HBase源码分析之HRegion上MemStore的flsuh流程(二)>等文中,我们介绍了HRegion上Memstore flush的主体流程和主要细节.但是,HRegion只是HBase表中按照行的方向对一片连续的数据区域的抽象,它并不能对外提供单独的服务,供客户端或者HBase其它实体调用.而HRegion上MemStore的flush还是要通过HRegionServer来

dubbo源码分析系列(3)服务的引用

1 系列目录 dubbo源码分析系列(1)扩展机制的实现 dubbo源码分析系列(2)服务的发布 dubbo源码分析系列(3)服务的引用 dubbo源码分析系列(4)dubbo通信设计 2 服务引用案例介绍 先看一个简单的客户端引用服务的例子,dubbo配置如下: <dubbo:application name="consumer-of-helloService" /> <dubbo:registry protocol="zookeeper" ad