ZAB协议恢复模式-leader选举

之前在网上看了很多zookeeper的zab原理,但讲述的都不够详细,很多地方模糊不清,所以就研究了一下zookeeper源码,并整理成几篇博客,希望对其他小伙伴有所帮助。本文是ZAB协议崩溃恢复Leader选举部分的内容,数据同步见另一篇博客 《ZAB协议恢复模式-数据同步》    。

为了避免理解上的歧义,将投票动作和投票信息区分开,在本文中,我将服务器的投票信息称之为选票。

一  基本概念

1.  Noitifcation

Notification其实是选举过程中的通信信息;选举过程主要围绕Notification进行。

选Leader过程中Zookeeper server(QuorumPeer)都会根据Notification信息生成Vote(选票信息)。为了方便以下理解,我们不妨将Notification看成每个zookeeper server的选票信息。

以下是notification的主要字段。

1)   zxid

事务id,事务请求的唯一标记,由leader服务器负责进行分配。

高32位是peerEpoch,低32位是请求的计数,从0开始。

2)   peerEpoch

每次leader选举完成之后,都会选举出一个新的peerEpoch,用来标记事务请求所属的轮次。

3)   electionEpoch

每次leader选举,electionEpoch就会自增1,统计选票信息时,首先保证electionEpoch相同。

4)   sid

服务器id

5)   leader

提议的leader。

 

2.  其他概念

1)   lastProcessedZxid

最后一次commit的事务请求的zxid

 

二  Zookeeper Server

1.  QuorumPeer

Zookeeper server启动始于QuorumPeerMain#main()。

Zookeeper server的主要逻辑都在QuorumPeer中,此类具体负责的逻辑如下流程图:

本文仅介绍Leader选举流程内容,其他流程(follower流程、leader流程、observer流程)见《ZAB协议恢复模式-数据同步》。

从流程图中可以看到,QuorumPeer将一直运行,直到running=false。While无限循环中,根据当前zookeeper服务器的投票状态进入不同的业务逻辑。

服务器启动时处于LOOKING状态;退出任何子流程以后状态立即被改成LOOKING状态。LOOKING状态,表示Zookeeper服务端在进行选举流程。

在集群环境下,任何一台服务器都可能被选中成为leader,但每台服务器成为leader的可能性会有所不同,具体为:zxid、peerEpoch、electionEpoch、sid大者更容易被选举为leader,选举流程部分会详细讲述此中缘由。

 

2.  状态

1)   LOOKING

表示在选举leader。

2)   FOLLOWING

服务器的角色为follower。

3)   LEADING

服务器的角色为leader。

4)   OBSERVING

服务器的角色为observer,此种服务器不参与投票,只是同步状态。

 

三  Leader选举

1.  选举须知

选举流程比较复杂,在正式进入选举流程之前,需要先弄清楚以下内容:

每个Zookeeper服务端进入LOOKING状态以后,都会发起选举流程,默认情况下是快速选举,所以由FastLeaderElection#lookForLeader方法承担此职责。

每个Zookeeper服务器接收到选票提议以后,只有两个选择:

  • 接受选票提议,认可提议中推荐的服务器作为Leader候选人;
  • 不接受选票提议,推荐自己上一次推荐的服务器作为Leader候选人。(选举开始是总是推荐自己作为候选人,选举中会根据收到的选票信息决定是否更换推荐候选人)

默认情况下,至少超过半数(即n/2+1)服务器投票给同一个Leader候选人时,Leader候选人才有可能被选中为Leader。(这里说的是有可能,还需要进行一些其他逻辑进行验证)。

 

2.  流程

以上为代码的完整流程,看起来比较复杂,我们可以按照以下内容简单理解一下。

需要说明的是:此流程结束仅仅是确认那个服务器成为Leader,具体Leader是否能够最终成为Leader,还有另外的流程决定,这部分内容会见《ZAB协议恢复模式-数据同步》。

 

3.  流程详述

流程比较复杂,接下来对流程图中标有数字的地方详细介绍。

1)   1-自增logicalclock

Logicalclock就是Notification中的electionEpoch.

选举的第一个操作是logicalclock自增,接着更新提议信息,其实第一次总是提议自己作为Leader。

如果和现实中总统选举做一个类比的话,每次总统选举时都要明确这是第几届选举,logicalclock就对应的是“第几届”。整个选举必须保证处于同一届选举中方有效。

 

2)   2-发送选票信息

这是一个异步操作(由sendNotifications封装),将提议信息放到FastLeaderElection#sendqueue队列中,然后异步的发送个所有其他zookeeper server(这里指的是所有参与投票的服务器,不会发送个Observer类型的服务器)。

 

3)   3-从选票队列中取选票信息

当前server收到其他服务器的选举回复信息以后,将选票信息放在FastLeaderElection#recvqueue,当服务器循环从此队列中取选票信息。

如果队列中有选票信息立即返回,如果没有则等待。这里有一个超时时间,如果超过此时间依然没有选票信息,则返回null,这么做可以防止死等。

 

4)   4-判断消息是否发送出去

当从recvqueue没有取得选票信息时,会检查是否已经将提议的leader发送给其他server了,如果queueSendMap(待发送队列)为空,说明已经全部发送出去了;否则认为没有发送出去,此时会重连其他zookeeper server,保证链路畅通。

 

5)   5-重连其他ZookeeperServer

如果链路出现异常,可能会导致提议信息无法发送成功,所以如果queueSendMap中的信息没有全部发送出去,此时会重连其他zookeeperserver,以保证zookeeper集群的链路畅通。

 

6)   6-LOOKING状态时,electionEpoch比较

如果收到的选票信息状态为LOOKING,说明对方也在选举中。

a)   electionEpoch比较

进行electionEpoch比较的目的是统一当前是第几届选举。

  • 如果收到选票的electionEpoch更大,那么使用收到选票的electionEpoch作为“届”,然后清空收到的选票信息,更新提议信息(这里有一个判断过程),重新发送更新后的提议。
  • 如果收到选票的electionEpoch小,直接忽略此选票。
  • 如果收到选票的electionEpoch和当前相同,那么认为是合法的选票,接着判断是否应该更新选票。
b)   接受选票的提议

当且仅当以下三个条件满足其一时,将接受选票的提议,并重新发送选票信息。

  • n.peerEpoch > self.proposedEpoch
  • n.peerEpoch == self.proposedEpoch&& n.zxid > self.proposedZxid
  • n.peerEpoch == self.proposedEpoch&& n.zxid = self.proposedZxid && n.leader > self.proposedLeader

n指的是收到的选票,self指的是当前服务器自身的提议

由此可知:peerEpoch、zxid、leader越大,与有可能成为Leader。

proposedLeader开始的时候一定是当前server的id,但随着选举的进行,会变成上一次提议的leader。

 

7)   7-Leader是否有效

如果某一个server已经得到半数以上的选票,那么进入leader是否有效的验证逻辑,具体如下:

无限循环的从recvqueue中取选票,满足一下条件之一时退出循环:

  • recvqueue没有选举票(超时时间内一直没有获取到选票);
  • 取到一个更新的选票信息(满足“接受选票提议”的条件,则说明提议更新)。

这里其实是一个Leader有效性的校验。依次从recvqueue中取出所有的选票,校验发现所有的选票均满足“接受选票提议”时,说明没有服务器的选票能够推翻之前的结论,所以此时可以认为Leader是有效的。

 

8)   8-FOLLOWING、LEADING状态时,electionEpoch比较

a)   选票集合

为了将此部分解释清楚,需要先能清楚选举过程中用到的两个集合:

recvset:用来记录选票信息,以方便后续统计;

outofelection:用来记录选举逻辑之外的选票,例如当一个服务器加入zookeeper集群时,因为集群已经存在,不用重新选举,只需要在满足一定条件下加入集群即可。

 

b)   electionEpoch比较

如果收到的选票显示处于FOLLOWING、LEADING状态,说明集群目前有Leader,只需要确保当前服务器和Leader能够正常通信,并收到了集群半数以上服务器推荐推荐此Leader时,就直接加入到集群中去。

因为Leader已经存在,所以所有的选票都会加入到outofelection中。如果outofelection有一条选票是来自leader的,那么就可以认为自己和leader正常通信;如果outofelection中统计出有超过半数的服务器都推荐了这个leader,那么毫无疑问,此选票推荐的就是我们的leader。

 

c)   源码逻辑
  • 如果logicalclock = n.electionEpoch相同,那么将此选票加入到选票列表中,如果此张选票通过了“选票有效性验证”,那么将此选票推举的候选人作为leader。
  • 因为leader已存在,将所有选票放在outofelection中,进行一次选票“选票有效性验证”,如果通过就可以将次选票推举的候选人作为leader。

以上两步的差别是在进行有效性校验时,一个用的是recvset,一个用的是outofelection。从代码上看,zookeeper认为只要electionEpoch就认为这是在选举,所以判断选票数目的时候使用的是recvset。

以上两步逻辑比较绕,如果理解起来比较困难,可以参考一下源码。

 

d)   源码
case FOLLOWING:
case LEADING:
if (n.electionEpoch == logicalclock) {
   //如果Notification的electionEpoch和当前的electionEpoch相同,那么说明在同一轮的选举中,
   recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

   //判定选举是否结束
    if (ooePredicate(recvset,outofelection, n)){
       // 选举结束,设置状态
       self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());

       Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
       leaveInstance(endVote);
       return endVote;
    }
}

outofelection.put(n.sid,
       new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
// 判定选举是否结束
if(ooePredicate(outofelection, outofelection, n)) {
   synchronized (this) {
       logicalclock = n.electionEpoch;
       self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
    }
   Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
   leaveInstance(endVote);
   return endVote;
}
break;

 

e)   选票有效性验证
  • leader候选人获得超过半数的选票。
  • 通过Leader有效性校验。

 

f)   Leader有效性验证
  • 如果自己不是leader,那么一定要收到过Leader的信息,即收到Leader信息,并且leader的回复信息中宣称自己的状态是ServerState.LEADING
  • 如果自己是leader,那么当前logicalclock一定要等于选票信息中的electionEpoch

 

4.  核心类

以下为选举过程中使用到的核心类。

1)   QuorumPeer

见整理流程部分

2)   FastLeaderElection

默认的选举算法,即上面流程图描述的内容。

此类有几个重要的内部类,如下:

a)   FastLeaderElection#Messenger#WorkerReceiver

从QuorumCnxManager#recvQueue中获取网络包,并将其发到FastLeaderElection#recvqueue中

b)   FastLeaderElection#Messenger#WorkerSender

从FastLeaderElection#sendqueue中获取网络包,并将其放到QuorumCnxManager#queueSendMap中,并发送到网络上

 

3)   QuorumCnxManager

QuorumCnxManager是实际发生网络交互的地方。QuorumCnxManager保证与每一个zookeeper服务器之间只有一个链接。

主要数据结构如下:

a)   queueSendMap

sid(key) -> buffer queue(value),为每个参与投票的server都保留一个队列。

b)   recvQueue

message queue,所有收到的消息都放到recvQueue。

c)   listener

server主线程,收发消息时和上面两个队列交互。

 

四  源码分析

以下是选举过程中用到的主要代码以及注释。代码版本为zookeeper-3.4.10。

我给选举逻辑的部分代码加了注释,以便于理解,下载地址:git@gitee.com:wuzhengfei/zookeeper-3.4.10-sources.git

 

1.  FastLeaderElection

1)   lookForLeader

   public Vote lookForLeader() throws InterruptedException {
       try {
           self.jmxLeaderElectionBean = newLeaderElectionBean();
           MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean,self.jmxLocalPeerBean);
       } catch (Exception e) {
            LOG.warn("Failedto register with JMX", e);
            self.jmxLeaderElectionBean = null;
       }
       if (self.start_fle== 0) {
           self.start_fle = System.currentTimeMillis();
       }
       try {
           // 投票信息列表
           HashMap<Long, Vote> recvset = newHashMap<Long, Vote>();

           HashMap<Long, Vote> outofelection = newHashMap<Long, Vote>();

           int notTimeout = finalizeWait;

           synchronized(this) {
               //更新选举周期
               logicalclock++;
                // 更新提议信息,提议自己作为leader,
               updateProposal(getInitId(),getInitLastLoggedZxid(), getPeerEpoch());
           }

           LOG.info("New election. My id =  " + self.getId() + ",proposed zxid=0x" + Long.toHexString(proposedZxid));
           // 发送选票信息
           sendNotifications();

           /*
             * 循环直到选举出leader
             */
           while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
                // 从recvqueue移除第一个Notification(选票信息),最多等待notTimeout ms
               Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

                // 如果当前没有收到足够多的Notification,不足以产生leader,那么继续发送选票信息,否则继续处理收到的Notification
               if(n == null){
                   // 当前没有收到回复信息
                   if (manager.haveDelivered()){
                       // 如果消息全部投递出去了,那么在发送一次选票信息
                      sendNotifications();
                   } else {
                       // 如果消息没有投递出去,那么尝试重连
                       manager.connectAll();
                   }

                   int tmpTimeOut= notTimeout * 2;
                   notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval);
                   LOG.info("Notificationtime out: " + notTimeout);
               } elseif (self.getVotingView().containsKey(n.sid)) {
                   /*
                    * Only proceed if the vote comes from a replica in the
                    * voting view.
                    */
                   switch (n.state) {
                   case LOOKING:
                       // If notification > current, replace and sendmessages
                       // out
                       if (n.electionEpoch > logicalclock){
                          logicalclock = n.electionEpoch;
                          // 清空收到的选票信息列表
                          recvset.clear();
                           // 比较notification,current,大的最为下次推荐的leader,然后发送选票信息
                          if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(),
                                  getPeerEpoch())) {
                              updateProposal(n.leader, n.zxid, n.peerEpoch);
                          } else {
                              updateProposal(getInitId(),getInitLastLoggedZxid(), getPeerEpoch());
                          }
                          sendNotifications();
                       } else if (n.electionEpoch< logicalclock) {
                           // 如果notification#electionEpoch小,认为是无效选票,直接忽略
                          if (LOG.isDebugEnabled()){
                              LOG.debug(
                                       "Notificationelection epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                             + Long.toHexString(n.electionEpoch) + ",logicalclock=0x"
                                             + Long.toHexString(logicalclock));
                          }
                          break;
                       } else if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader,proposedZxid,
                              proposedEpoch)){
                           // 比较notification,current,大的最为下次推荐的leader,然后发送选票信息
                          updateProposal(n.leader, n.zxid, n.peerEpoch);
                          sendNotifications();
                       }

                       if (LOG.isDebugEnabled()){
                          LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader
                                  + ",proposed zxid=0x" + Long.toHexString(n.zxid) + ",proposed election epoch=0x"
                                  + Long.toHexString(n.electionEpoch));
                       }

                       // 将收到的选票加入到选票列表中
                       recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch,n.peerEpoch));System.err.println("-----"+DateUtil.formatTime(new Date(), DateUtil.PATTERN_TIMESTEMP_SSS)+"    recvset="+JSON.toJSONString(recvset));
                      System.err.println("-----"+DateUtil.formatTime(new Date(), DateUtil.PATTERN_TIMESTEMP_SSS)+"    recvset="+JSON.toJSONString(recvset));

                       // 判断self提议的服务器是否已经获得超过半数的票
                       if (termPredicate(recvset,
                               new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch))) {

                           // 循环直到当recvqueue中没有Notification,或有出现一个Notification的选票信息比推举的leader更大时,方跳出循环
                          while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS))!= null) {
                              if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader,proposedZxid,
                                      proposedEpoch)) {
                                  recvqueue.put(n);
                                  break;
                              }
                          }

                          if (n== null) {
                               // 进入这里说明recvqueue中没有比推举leader更大的票,可以将推举的服务器作为leader了。
                              self.setPeerState(
                                      (proposedLeader == self.getId())? ServerState.LEADING : learningState());

                              Vote endVote = new Vote(proposedLeader,proposedZxid, logicalclock,proposedEpoch);
                              leaveInstance(endVote);
                              return endVote;
                          }
                       }
                       break;
                   case OBSERVING:
                       LOG.debug("Notificationfrom observer: " + n.sid);
                       break;
                   case FOLLOWING:
                   case LEADING:
                       /**
                        * if内的代码、if外的代码比较类似,唯一的差别是ooePredicate方法的第一个参数,
                        * 考虑集群中只有leader存活,其他所有服务器都处于LOOKING状态,就能理解为什么需要这两段逻辑了。
                        */
                       if (n.electionEpoch == logicalclock){
                           // 如果Notification的electionEpoch和当前的electionEpoch相同,那么说明在同一轮的选举中
                          recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                          // 判定选举是否结束
                          if (ooePredicate(recvset, outofelection,n)) {
                              // 选举结束,设置状态
                              self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());

                              Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                              leaveInstance(endVote);
                              return endVote;
                          }
                       }

                       outofelection.put(n.sid,
                              new Vote(n.version, n.leader, n.zxid, n.electionEpoch,n.peerEpoch,n.state));
                       // 判定选举是否结束
                       if (ooePredicate(outofelection,outofelection, n)){
                          synchronized (this) {
                              logicalclock = n.electionEpoch;
                              self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : learningState());
                          }
                          Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                          leaveInstance(endVote);
                          return endVote;
                       }
                       break;
                   default:
                       LOG.warn("Notification state unrecognized:{} (n.state), {} (n.sid)", n.state, n.sid);
                       break;
                   }
               } else{
                   LOG.warn("Ignoring notification from non-clustermember " + n.sid);
               }
           }
           return null;
       } finally {
           try {
               if(self.jmxLeaderElectionBean!= null) {
                  MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
               }
           } catch(Exception e) {
                LOG.warn("Failedto unregister with JMX", e);
           }
            self.jmxLeaderElectionBean = null;
           LOG.debug("Number of connection processing threads: {}",manager.getConnectionThreadCount());
       }
   }

 

2)   totalOrderPredicate

   /**
     * 如果new票信息大于当前的票信息,那么返回true。以下三种case,只要满足一种就返回true:
     * <li>newEpoch > curEpoch
     * <li>newEpoch == curEpoch&& newZxid > curZxid
     * <li>newEpoch == curEpoch&& newZxid = curZxid && newId > curId
     */
   protected booleantotalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid,
           long curEpoch) {
       LOG.debug("id: " + newId+ ", proposed id: " + curId + ", zxid:0x" + Long.toHexString(newZxid)
               + ",proposed zxid: 0x" + Long.toHexString(curZxid));
       if (self.getQuorumVerifier().getWeight(newId) == 0) {
           return false;
       }

       /*
         * We return true if one of thefollowing three cases hold: 1- New epoch
         * is higher 2- New epoch isthe same as current epoch, but new zxid is
         * higher 3- New epoch is thesame as current epoch, new zxid is the
         * same as current zxid,but server id is higher.
         */
        return ((newEpoch > curEpoch)
               || ((newEpoch== curEpoch) && ((newZxid > curZxid)|| ((newZxid == curZxid)&& (newId > curId)))));
   }

 

3)   termPredicate

   /**
     * 是否终止投票,如果{@linkvote}对应的服务器获得了超过半数的票,那么返回true
     *
     * @param votes
     * @param vote
     * @return
     */
   protected booleantermPredicate(HashMap<Long, Vote> votes,Vote vote) {

       HashSet<Long> set= new HashSet<Long>();

       /*
         * First make the viewsconsistent. Sometimes peers will have different
         * zxids for a serverdepending on timing.
         */
       for (Map.Entry<Long,Vote> entry : votes.entrySet()){
           if (vote.equals(entry.getValue())){
               set.add(entry.getKey());
           }
       }

       return self.getQuorumVerifier().containsQuorum(set);
   }

 

4)   checkLeader

   /**
     * leader确认检查
     * <li>如果自己不是leader,那么一定要收到过Leader的信息,并且leader宣称自己的状态是ServerState.LEADING;
     * <li>如果自己是leader,那么当前logicalclock一定要等于选票信息中的electionEpoch
     *
     * @param outofelection
     * @param leader
     * @param electionEpoch
     */
   protected booleancheckLeader(HashMap<Long, Vote> outofelection,long leader,long electionEpoch){

       boolean predicate = true;

       /*
         * If everyone else thinks I'mthe leader, I must be the leader. The
         * other two checks are justfor the case in which I'm not the leader.
         * If I'm not the leader and Ihaven't received a message from leader
         * stating that it is leading,then predicate is false.
         */

       if (leader != self.getId()){
           if (outofelection.get(leader)== null)
                // 表示还未从leader中收到任何信息,所以选举结束未结束。
               predicate= false;
           else if (outofelection.get(leader).getState() != ServerState.LEADING)
                // 指推举的那个leader还没有将自己的状态改成leader,所以选举依然未完成。
               predicate= false;
       } else if (logicalclock!= electionEpoch) {
            // 自己是leader时,这两个值应该是相同的,如果不同说明有异常,不能结束选举。
           predicate= false;
       }

       return predicate;
   }

 

5)   ooePredicate

   /**
     * 是否能宣称选举结束
     * <li>{@code n}获得了半数以上的票
     * <li>通过了leader确认的检查
     *
     * @param recv
     *           map of received votes
     * @param ooe
     *           map containing out of election votes (LEADING or FOLLOWING)
     * @param n
     *           Notification
     * @return
     */
   protected booleanooePredicate(HashMap<Long, Vote> recv,HashMap<Long, Vote> ooe, Notification n) {

       return (termPredicate(recv, new Vote(n.version, n.leader, n.zxid, n.electionEpoch,n.peerEpoch,n.state))
               && checkLeader(ooe, n.leader, n.electionEpoch));

   }

 

五  选举示例分析

假设zookeeper集群的部署信息如下:

server.1=192.168.1.1:2888:3888

server.2=192.168.1.2:2888:3888

server.3=192.168.1.3:2888:3888

1.  集群启动时

根据上面的信息,由此我们可以得出集群启动时具有如下信息:


Server


Sid/myid


peerEpoch


zxid


Ip


S1


1


0


0


192.168.1.1


S2


2


0


0


192.168.1.2


S3


3


0


0


192.168.1.3

Zookeeper每一台服务器启动时,默认状态为LOOKING状态,所以都会启动选leader流程。当然如果只有一台启动成功,无法满足选举的超过一半的条件,所以永远无法完成选leader逻辑。

 

1)   S1服务器选举过程

为了描述的简单,假设集群只启动S1、S2服务器,我们看一下选举过程是怎样的。

a)   S1发送提议

S1启动后首先就是进行Leader选举。S1首先logicalclock改为1,接着发送选票信息,S1的选票信息如下:


proposedLeader


proposedZxid


proposedEpoch


1


0


0

b)   S2收到S1选举提议

S2启动后就会进入选举流程,因为logicalclock=1,所以忽略同步时钟这个操作;此时S2的提议信息为:


proposedLeader


proposedZxid


proposedEpoch


2


0


0

S2收到信息以后,将消息放在recvqueue中,接着会比较S1的提议和自己的提议信息,此时S2发现self.proposedLeader>S1.proposedLeader,所以S2回复提议信息如下:


proposedLeader


proposedZxid


proposedEpoch


2


0


0

c)   S1收到S2回复的选票信息

S1比较提议后发现自己推举的Leader小,所以将自己的提议信息更新为:


proposedLeader


proposedZxid


proposedEpoch


2


0


0

d)   S2再次收到S1的选票信息

此时S2不用更新提议信息,直接回复S1,回复的提议内容如下:


proposedLeader


proposedZxid


proposedEpoch


2


0


0

e)   S1收到S2的回复的选举提议

此时S1自己的选票信息为:


proposedLeader


proposedZxid


proposedEpoch


2


0


0

收到S2的选票信息为:


proposedLeader


proposedZxid


proposedEpoch


2


0


0

汇总后S1发现,目前收到的选票信息如下:


proposedLeader


proposedZxid


proposedEpoch


2


0


0


2


0


0

因为满足超过半数的条件,所以S1会认为S2是leader,S1退出选举流程。

2)   S1服务器收到的选票信息

a)   S1未收到S2的选票信息

recvset={1:{"electionEpoch":1,"id":1,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0}}

 

b)   S1收到S2的选票信息

recvset={1:{"electionEpoch":1,"id":1,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0},2:{"electionEpoch":1,"id":2,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0}}

进行到这一步是S1会更新自己的提议信息,改成选举S2作为leader

 

c)   S1收到S2的选票信息

recvset={1:{"electionEpoch":1,"id":2,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0},2:{"electionEpoch":1,"id":2,"peerEpoch":0,"state":"LOOKING","version":0,"zxid":0}}

进行到这一步就可以认为S2服务器当选成为Leader了。

 

3)   S2的leader选举

S2的选举过程和上面基本一样,唯一不同的地方是:S2一直选举的都是S2服务器,所以不用像S1一样选举过程中修改提议的Leader。

 

 

2.  新服务器加入集群

a)   初始条件假设

继续上面的示例,假设集群开始只启动了S1、S2服务器,运行一段时间以后S3服务器希望加入到集群中。

假设运行一段时间以后集群的情况如下:


Server


Sid/myid


peerEpoch


Zxid(低32位)


Ip


Leader


S1


1


10


900


192.168.1.1



S2


2


10


1000


192.168.1.2


S3新加入集群,加入时的情况如下:


Server


Sid/myid


peerEpoch


Ip


Leader


S3


3


0


192.168.1.3


-

b)   S3发送提议

S3启动后会发起选主流程,发送的提议信息如下:


proposedLeader


proposedZxid


proposedEpoch


3


0


0

c)   S1收到S3的提议信息

因为S3已经是Follower,并且epoch、zxid均大于S3,所以回复的提议如下:


proposedLeader


proposedZxid


proposedEpoch


2


900


10

d)   S2收到S3的提议信息

因为S2已经是Leader,并且epoch、zxid均大于S3,所以回复的提议如下:


proposedLeader


proposedZxid


proposedEpoch


2


1000


10

 

e)   S3收到回复信息

S3统计选票信息会发现S1、S2都推荐S2作为Leader,并且S2已经是leader,因为超过半数推荐S2,所以此时S3也会将S2作为leader,结束选主流程

 

3.  又有新服务器加入集群

继续假设按照上面信息部署集群,又来了一个服务器S4希望加入集群,那么流程会如何呢?

选举过程中存在以下代码:

if (self.getVotingView().containsKey(n.sid)) {

    // 选主逻辑

}

这意味着S1、S2、S3收到S4的选票请求,认为这不是当前集群的服务器,直接忽略选票信息。结论是:S4无法加入集群的。

 

4.  集群运行中重新选举

运行过程中如果Leader失效,或者超过半数服务器与Leader不同步等等情况都可能导致集群运行一段时间后,触发了选主流程。接下来我们以一种最为复杂的方式来介绍选举的完整过程。假设集群信息如下:


Server


Sid/myid


peerEpoch


Zxid(高位_地位)


Ip


S1


1


200


200_900


192.168.1.1


S2


2


100


100_1000


192.168.1.2


S3


3


50


50_1950


192.168.1.3

Zxid的高位是peerEpoch,低位是自增的序列,所以如果peerEpoch大,那么zxid一定大。为了方便描述,zxid我改成“高位_地位”的形式了。

说明:一般而言,正常运行过程是不会出现以上假设的数据的,除非数据被人修改过,或者其他未知原因导致所有服务器的数据均错乱了。

因为集群中没有了leader,所以所有服务器均将重新进入选主流程。接下来我们就以S3服务器为例来讨论选主流程。

a)   第一轮选票

S3发出的选票信息信息为:


proposedLeader


proposedZxid


proposedEpoch


3


50_1950


50

S2收到提议后回复的提议信息如下:


proposedLeader


proposedZxid


proposedEpoch


2


100_1000


100

S1收到提议后回复的提议信息如下:


proposedLeader


proposedZxid


proposedEpoch


1


200_900


200

b)   第一轮选票汇总

假设S3服务器先收到S2的回复,因为S3.proposedEpoch < S2.proposedEpoch,所以S3先更新提议的信息,然后重新发送提议(P1),新的提议如下:


proposedLeader


proposedZxid


proposedEpoch


2


100_1000


100

紧接着接着S3又收到S1的回复,又发现S3.proposedEpoch < S1.proposedEpoch,所以S3再一次更新提议信息,然后发送新的提议(P2),新的提议如下:


proposedLeader


proposedZxid


proposedEpoch


1


200_900


200

因为提议P2最终会覆盖提议P1,所以我们直接忽略S1、S2对P1的回复。直接讨论他们对P2的回复。

S1收到选票的提议以后,回复以下提议信息:


proposedLeader


proposedZxid


proposedEpoch


1


200_900


200

S2收到选票的提议以后,回复以下提议信息:


proposedLeader


proposedZxid


proposedEpoch


1


200_900


200

 

c)   第二轮选票汇总

S3收到的提议信息如下:


proposedLeader


proposedZxid


proposedEpoch


1


200_900


200


1


200_900


200


1


200_900


200

S3汇总以后得出的结论是S1成为Leader,然后退出选举。

 

 

 

时间: 2024-07-29 22:28:33

ZAB协议恢复模式-leader选举的相关文章

ZAB协议恢复模式-数据同步

上一篇博客中,我们详细讨论了Zookeeper的Leader选举过程,接下来我们讨论一下Leader选举以后的事情,并了解zookeeper的集群管理原理. 提前说明: 本文主题虽然是讲述崩溃恢复模式,不过也会对广播模式的内容进行简单的描述. 为了在文中描述不至于太过啰嗦,所以对超过半数省略掉了一个限定返回.例如当出现类似于"超过半数follower与leader同步","收到超过半数follower的回复"这种描述时,这种描述不正确,因为这个半数计算的时候是包含l

Zookeeper ZAB 协议分析

前言 ZAB 协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议.在 ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性,基于该协议,ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性. Atomic broadcast protocol ZAB 是 Zookeeper 原子广播协议的简称,下面我们来讨论协议的内容,注意:理论与实现是有区别的,如果你对协议的理论不感兴趣,可以直接跳过看实现. 问题的提出 Zoo

ZAB协议

zookeeper依赖zab协议来实现分布式数据一致性.基于该协议,zookeeper实现了一种主备模式的系统架构来保持ZooKeeper为高可用的一致性协调框架,自然的ZooKeeper也有着一致性算法的实现,ZooKeeper使用的是ZAB协议作为数据一致性的算法, ZAB(ZooKeeper Atomic Broadcast ) 全称为:原子消息广播协议:ZAB可以说是在Paxos算法基础上进行了扩展改造而来的,ZAB协议设计了支持崩溃恢复,ZooKeeper使用单一主进程Leader用于

Raft对比ZAB协议

系列文章 Raft算法赏析 ZooKeeper的一致性算法赏析 Raft对比ZAB协议 0 一致性问题 本篇文章想总结下Raft和ZAB在处理一些一致性问题上的做法,详见之前对这2个算法的描述 Raft算法赏析 ZooKeeper的一致性算法赏析 上述分别是针对如下算法实现的讨论: Raft的实现copycat,由于Raft算法本身已经介绍的相当清晰,copycat基本上和Raft算法保持一致 ZAB的实现ZooKeeper,由于ZooKeeper里面的很多实现细节并没有在ZAB里体现(ZAB里

zookeeper源码分析之leader选举

zookeeper提供顺序一致性.原子性.统一视图.可靠性保证服务zookeeper使用的是zab(atomic broadcast protocol)协议而非paxos协议zookeeper能处理并发地处理多个客户端的写请求,并且以FIFO顺序commit这些写操作,zab采用了一个事务ID来实现事务的全局有序性,在Zab协议的实现时,分为三个阶段:1. Leader Election2. Recovery Phase3. Broadcast Phase 今天就先分析选举算法的源码实现 zoo

ubunut在系统恢复模式下无法修改root密码的分析和解决

    前些日子本猫的ubuntu 14.10貌似出了点问题,想修改下root密码,但是无奈原系统有错正常情况下无法修改啊,这是逼我重装的节奏吗?     在ubuntu开机后立即按住left_shift不放,调出grub菜单.因为我没装双系统,所以默认grub菜单是隐藏的.依次选择"高级选项"->xxx(recovery mode)->root.咦,还是要root密码才可以进入恢复模式的控制台呢!遂ctl+d,再次重启系统,同样进入xxx(recovery mode)主界

SQL Server误区:有关大容量事务日志恢复模式的误区

误区 #28:有关大容量事务日志恢复模式的几个误区 28 a)常见的DML操作可以被"最小记录日志" 不是.在大容量事务日志恢复模式下只有一小部分批量操作可以被"最小记录日志",这类操作的列表可以在Operations That Can Be Minimally Logged找到.这是适合SQL Server 2008的列表,对于不同的SQL Server版本,请确保查看正确的列表. 28 b)使用大容量事务日志恢复模式不会影响灾难恢复 首先,在上次事务日志备份之后

DATAGUARD从库宕机后如何恢复到管理恢复模式

一.sys登陆 实例: $ sqlplus / as sysdba SQL*Plus: Release 10.2.0.4.0 - Production on Wed Nov 3 07:35:34 2010 Copyright (c) 1982, 2007, Oracle.  All Rights Reserved. Connected to an idle instance. ----------------------------------------------------- 二.备库启动

HTTP协议Keep-Alive模式详解

HTTP协议Keep-Alive模式详解     1.什么是Keep-Alive模式    我们知道HTTP协议采用"请求-应答"模式,当使用普通模式,即非KeepAlive模式时,每个请求/应答客户和服务器都要新建一个连接,完成 之后立即断开连接(HTTP协议为无连接的协议):当使用Keep-Alive模式(又称持久连接.连接重用)时,Keep-Alive功能使客户端到服 务器端的连接持续有效,当出现对服务器的后继请求时,Keep-Alive功能避免了建立或者重新建立连接.