ZAB协议恢复模式-数据同步

上一篇博客中,我们详细讨论了Zookeeper的Leader选举过程,接下来我们讨论一下Leader选举以后的事情,并了解zookeeper的集群管理原理。

提前说明:

  • 本文主题虽然是讲述崩溃恢复模式,不过也会对广播模式的内容进行简单的描述。
  • 为了在文中描述不至于太过啰嗦,所以对超过半数省略掉了一个限定返回。例如当出现类似于“超过半数follower与leader同步”,“收到超过半数follower的回复”这种描述时,这种描述不正确,因为这个半数计算的时候是包含leader的。即如果文中讲述“超过半数”其实指的是leader、follower在一起,超过半数,或者所有服务器(不包括observer)超过半数。另外,Leader一定与自己是同步的。

 

本文主要讨论内容如下:

  • lead流程。
  • followLeader流程。
  • 主从数据如何同步。
  • 如何保证集群数据一致性。
  • 异常场景分析。

 

一  主从同步简化流程

1.  模型

目前在网上看到的主从不同原理基本上都是这样的:follower连接leader,将lastzxid告诉leader;leader根据last zxid确定同步点,然后将已经提交的协议发送给follower;follower同步完成后同通知leader。

2.  分析

此模型可以说它正确,也可以说它不正确,说它对是因为大体上确实是如此的;但如果剖根问底,同步的每一步具体是怎样的,这个图就显得不那么正确了。Zookeeper主从同步过程中有非常多消息,每种消息分别处理不同业务,并非上面简单的三步。

如果你只想知道个大概,那么就按照这个图理解就可以了,如果想了解更加详细的内容,搞清楚具体原理请继续阅读后续内容。

3.  问题

因为这个模型太过于简单,它无法回答以下几个问题:

  • 如果follower的zxid大于leader的zxid将会如何?

这种情况是可能存在的,例如:之前leader是S1服务器,leader宕机后过了一段时间才重新加入到集群中来,加进来是已经选好了新的leader(例如S2服务器),而此时S2的zxid小于S1,此时会如何?

  • Follower连不上leader,或者连接上了以后leader一直没有反应,集群将会如何?

例如:集群选举时leader正常,选举完成以后,在数据同步完成之前或数据同步过程中leader不可用了,那么集群将会如何?

  • 数据同步时,如果有部分Follower同步的很慢,集群一直无法完成崩溃恢复这一过程会如何?
  • Zookeeper集群是如何保证follower和leader同步以后才能接受请求的?
  • Leader、follower的通信时有哪些类型,如何保证消息不错乱,通信过程安全可靠的?

 

接下来会从两个角度分析ZAB协议恢复模式中数据同步逻辑。

  • 从leader、follower流程上分析。
  • 从leader、follwer数据交互上分析。

二  流程分析

此部分从leader、follower的流程初步探讨zookeeper集群工作原理,因为存在大量的主从交互,所以从流程上不一定很好理解,所以这一部分从整体上介绍一下工作原理,更加详细的部分放在“主从交互详解”中介绍。

1.  Leader

1)  流程图

a)  leader主流程

因为上图中的多个子流程基本上都是在LearnerHandler中实现的,所以我将这几个这里其他的几个子流程放在LearnerHandler中描述。

2)  分析

a)  流程分析
  • Leader每接受一个follower的链接,均会创建一个LearnerHandler对象,以后leader与此follower的交互均在LearnerHandler类中完成。
  • 接着就是确定新的epoch,必须从半数以上follower处获取epoch信息以后,才能生成新的epoch,即这里会会导致leader等待。见Leader#getEpochToPropose方法。
  • 然后是通知所有follower新的epoch,这个过程需要确保集群半数以上服务器达成一致,所以会等待。见leader#waitForEpochAck方法。
  • 然后等待集群完成崩溃恢复中数据同步逻辑,当集群中超过半数以上服务器发送ACK信息给leader以后,leader认为同步完成,恢复模式结束,接着进入广播模式。见Leader#waitForNewLeaderAck方法。
  • ZAB协议中恢复模式完成以后,集群进入广播模式,与之对应的是Leader会进入一个无限循环之中,在此循环中每隔一段时间统计一次同步的follower信息,并校验是否还有超过半数的follower与leader同步,如果少于一半,则退出lead流程。
b)  广播模式示例代码
boolean tickSkip = true;
while (true) {
   //休眠
   Thread.sleep(self.tickTime / 2);
   if (!tickSkip){
       self.tick++;
   }
   HashSet<Long> syncedSet = new HashSet<Long>();

    //lock on the followers when we use it.
   syncedSet.add(self.getId());

   for (LearnerHandler f :getLearners()) {
        // Synced setis used to check we have a supporting quorum,
        // so only
        // PARTICIPANT, notOBSERVER, learners should be used
       if (f.synced() && f.getLearnerType()==LearnerType.PARTICIPANT) {
          //更新同步集合列表
          syncedSet.add(f.getSid());
       }
       //心跳维持
       f.ping();
   }

    //check leader running status
   if (!this.isRunning()){
        shutdown("Unexpectedinternal error");
       return;
   }

   if (!tickSkip&&!self.getQuorumVerifier().containsQuorum(syncedSet)) {
        //超过半数以上服务器与leader不同步时,退出lead流程
        // Lost quorum,shutdown
        shutdown("Not sufficientfollowers synced, only synced with sids: [ " + getSidSetString(syncedSet)
              + "]");
        // make sure the orderis the same!
        // the leader goes tolooking
       return;
   }
    tickSkip = !tickSkip;
}

2.  LearnerHandler

1)  流程图

a)  LearnerHandler主流程

b)  计算集群epoch

 

c)  集群更新epoch

 

d)  集群数据同步

 

e)  广播模式

 

2)  分析

a)  流程分析

LearnerHandler是leader中最为核心的一个类,他和Leader一起合作保证集群正常稳定的运行。LearnerHandler继承自ZooKeeperThread,由此可以看出它是一个线程,此线程负责和此follower的所有通信,具体来说需要负责以下几件事情:

  • 取follower得epoch,以便于leader计算出集群新的epoch。此过程需要确保超过半数以上follower告知leader他们的acceptedEpoch。
  • 通知集群服务器新的epoch。
  • Follower收到epoch后会回复一个消息(条件是protocolVersion>=0x10000,不满足此条件的流程略有差别),等待leader收到超过半数的follower回复此消息。此过程也是需要等待,确保整个集群半数以上服务器epoch已经一致了。
  • 根据follower的last zxid决定同步的起点,然后将需要同步的数据加入到一个队列中,新启动一个线程将这些数据发送给follower。此过程需要等待半数以上服务器完成与leader的同步。此过程结束则集群的崩溃恢复过程技术。
  • 将进入无限循环,读取数据包、处理数据包,广播给follower,即进入广播模式。

 

3.  Follower

1)  流程图

a)  Follwer主流程

b)  Epoch同步流程

 

c)  主从数据同步流程

d)  广播模式

 

2)  分析

Follower的逻辑较为简单,总体来说负责以下内容:

  • 告知leader自己的acceptedEpoch,以便于leader计算出集群新的epoch。
  • 接收leader新epoch信息,以便于和集群保持一致的epoch。
  • 同步之前已经committed的数据,以便于和leader同步。
  • 通知leader自己已经同步完成,以便于集群退出崩溃恢复模式。
  • 数据同步完成以后,将进入无限循环,读取数据包、处理数据包,即进入广播模式,处理提议信息。

 

三  主从交互详解

接下来从主从交互的角度上详细讨论一下zookeeper工作原理。

1.  主从交互图

说明:

以上模型是根据zookeeper-3.4.10画出来的。

在zookeeper不同版本中,逻辑会稍有不同,不过从整体上而言基本相同,数据同步逻辑中这种区别具体体现在:Leader端会根据follower传输过来的protocolVersion,做一些特殊处理。例如: protocolVersion<0x10000时,不会在等待ack前发送LEADERINFO消息;在上面流程如第7步中,如果protocolVersion<0x10000,会先写一个NEWLEADER消息到follower等。

Observer的流程和follower类似,主要区别在于:不参与确认集群状态,不参与提议处理。

 

2.  模型详细分析

以下重点对模型中标注数字的模块进行详细说明,阅读时请参考上面模型图。

1) 1-从磁盘中加载数据

加载磁盘数据,获取lastProcessZxid用于后续生成newEpoch。每次leader诞生以后,都需要升级epoch。

2) 2-保证有超过半数的follower与leader建立连接

a)  创建LearnerHandler

leader生成的条件时需要半数以上follower选此服务器;选出leader后,需要保证有超过半数的follower能和leader建立通信。

Leader会创建LearnerCnxAcceptor线程,专门负责接收follower的连接,将leader与follower建立连接以后,则创建一个LearnerHandler线程。

LearnerCnxAcceptor#run示例代码如下:

   while (!stop){
       try {
          Socket s= ss.accept();
           // start with theinitLimit, once the ack isprocessed
           // in LearnerHandlerswitch to the syncLimit
          s.setSoTimeout(self.tickTime *self.initLimit);
          s.setTcpNoDelay(nodelay);

          BufferedInputStream is= new BufferedInputStream(s.getInputStream());
           // 建立处理learner请求的处理器
          LearnerHandler fh= new LearnerHandler(s, is, Leader.this);
          fh.start();
       } catch(SocketException e) {
          if (stop) {
               LOG.info("exceptionwhile shutting down acceptor: " + e);

               // WhenLeader.shutdown() callsss.close(),
               // the callto accept throws anexception.
               // We catchand set stop to true.
              stop= true;
          } else {
              throwe;
           }
       } catch (SaslException e) {
           LOG.error("Exceptionwhile connecting to quorum learner", e);
       }
   }

b)  为集群生成新的epoch

通过Leader#getEpochToPropose方法计算epoch。这个方法会等待一段时间,如果这段时间内有超过半数follower成功与leader建立链接,那么将这些服务器中最大的epoch+1作为新的epoch。此过程就是。

Leader#getEpochToPropose示例代码如下:

private HashSet<Long> connectingFollowers= new HashSet<Long>();

/**
 * 获取提议的epoch,此方法会等待足够多的follower进来,以确定epoch
 * <li>如果lastAcceptedEpoch>leader.epoch,那么设置
 * {@code leader.epoch =lastAcceptedEpoch+1};
 * <li>如果没有超过半数的follower与leader通信,那么进入等待,直到超时或者足够的follower与leader建立通信。
 *
 * @param sid
 * @param lastAcceptedEpoch
 *           : 最近接受的epoch,新leader选举出来以后的epoch不小于此值。
 * @return
 * @throws InterruptedException
 * @throws IOException
 */
public long getEpochToPropose(long sid, long lastAcceptedEpoch)throws InterruptedException, IOException {
    synchronized (connectingFollowers) {
        if (!waitingForNewEpoch) {
          return epoch;
       }
       if (lastAcceptedEpoch >= epoch){
          epoch = lastAcceptedEpoch + 1;
       }
        connectingFollowers.add(sid);
      QuorumVerifier verifier= self.getQuorumVerifier();
       if (connectingFollowers.contains(self.getId())&& verifier.containsQuorum(connectingFollowers)) {
           waitingForNewEpoch = false;
          self.setAcceptedEpoch(epoch);
          connectingFollowers.notifyAll();
       } else {
          long start = System.currentTimeMillis();
          long cur = start;
          long end = start + self.getInitLimit()* self.getTickTime();
          while (waitingForNewEpoch && cur < end) {
              connectingFollowers.wait(end - cur);
              cur=System.currentTimeMillis();
           }
           if (waitingForNewEpoch) {
               throw newInterruptedException("Timeoutwhile waiting for epoch from quorum");
           }
       }
       return epoch;
   }
}
c)  统一集群epoch

当protocolVersion>=0x10000时,leader会发送LEADERINFO消息并等待follower返回信息,并且需要保证在指定时间内,有超过半数的follwer回复了此消息。protocolVersion<0x10000时此过程逻辑稍微有一点不同,比较简单,所以暂不讨论。

private HashSet<Long> electingFollowers= new HashSet<Long>();
private boolean electionFinished = false;

/**
 * 等待超过半数的follower ack
 * <li>首先判断当前leader的epoch、lastZxid是否比参数{@link ss}更新,<br>
 * <ol>
 * <li>如果新,那么将follower的sid加入到electingFollowers列表</li>
 * <li>如果ss更新,说明follower的epoch或者lastZxid比leader更大,
 * 那么follower不能加入到集群中来,所以抛出错误信息。</li>
 * </ol>
 * </li>
 * <li>检查electingFollowers集合,判断是否有超过半数的follower回复了leader,如果是,唤醒线程;
 * 如果不是那么等待直到超时或者达到半数这个条件。</li>
 *
 * @param id
 * @param ss
 * @throws IOException
 * @throws InterruptedException
 */
public void waitForEpochAck(long id,StateSummary ss) throwsIOException,InterruptedException {
    synchronized (electingFollowers) {
        if (electionFinished) {
          return;
       }
       if (ss.getCurrentEpoch() != -1) {
          if (ss.isMoreRecentThan(leaderStateSummary)){
               // follower比 leader 更新(事物id更大),抛出错误信息
              thrownew IOException(
                      "Followeris ahead of the leader, leader summary:" + leaderStateSummary.getCurrentEpoch()
                             + " (currentepoch), " + leaderStateSummary.getLastZxid()+ " (last zxid)");
           }
          electingFollowers.add(id);
       }
      QuorumVerifier verifier= self.getQuorumVerifier();
       if (electingFollowers.contains(self.getId())&& verifier.containsQuorum(electingFollowers)) {
          electionFinished= true;
          electingFollowers.notifyAll();
       } else {
          long start = System.currentTimeMillis();
           long cur = start;
          long end = start + self.getInitLimit()* self.getTickTime();
          while (!electionFinished && cur < end) {
              electingFollowers.wait(end - cur);
              cur=System.currentTimeMillis();
           }
          if (!electionFinished) {
               throw newInterruptedException("Timeoutwhile waiting for epoch to be acked byquorum");
           }
       }
   }
}

 

当同时满足epoch验证和ack验证是,认为有超过半数的follower与leader建立连接了,不满足条件时会抛出异常,退出lead流程。

 3) 3-等待同步完成的消息

leader与follower、observer建立链接以后,需要进行数据同步,同样的,当在指定时间内和leader同步的follower少于一半时,也会抛出异常,退出lead流程。

单个learner与leader是否同步的判断标准是:同步完成后follower会发送ACK消息给leader,leader收到此消息,认为这个follower和leader同步了。

Leader#waitForNewLeaderAck示例:

/**
 * LeaderHandler在收到ACK消息时调用此方法,用于告知leader同步已经完成。
 * <li>此方法需要保证超过一半的服务器与leader保持一致。
 *
 * @param sid
 * @param learnerType
 * @throws InterruptedException
 */
public void waitForNewLeaderAck(long sid, long zxid,LearnerType learnerType) throws InterruptedException{

    synchronized (newLeaderProposal.ackSet) {

       if (quorumFormed) {
          return;
       }

       long currentZxid = newLeaderProposal.packet.getZxid();
       if (zxid != currentZxid){
           LOG.error("NEWLEADERACK from sid: " + sid + "is from adifferent epoch - current 0x"
                  +Long.toHexString(currentZxid) + " receieved 0x" + Long.toHexString(zxid));
          return;
       }

       if (learnerType == LearnerType.PARTICIPANT) {
           newLeaderProposal.ackSet.add(sid);
       }

       if (self.getQuorumVerifier().containsQuorum(newLeaderProposal.ackSet)){
          quorumFormed= true;
          newLeaderProposal.ackSet.notifyAll();
       } else {
          long start = System.currentTimeMillis();
          long cur = start;
          long end = start + self.getInitLimit()* self.getTickTime();
          while (!quorumFormed && cur< end) {
              newLeaderProposal.ackSet.wait(end- cur);
              cur=System.currentTimeMillis();
           }
          if (!quorumFormed) {
               throw newInterruptedException("Timeoutwhile waiting for NEWLEADER to be acked byquorum");
           }
       }
   }
}

4) 4-进入广播模式

leader进入while无限循环,每次循环线休眠一段时间,然后检验是否有超过一半的follower与leader保持同步,是否被设置为停止(running=false),如果满足这两个条件之一,则退出lead流程。

 

5) 5-确认leader、follower确认开始

在Leader、LearnerHandler中我们多次提到主从需要计算出新的epoch,那么他是如何计算的呢?这里将详细讲述这个过程。

选主完成后,follower去链接leader,链接好以后需要完成以下几件事情。

  • follower发送FOLLOWERINFO消息(带上AcceptedEpoch、protocolVersion= 0x10000)。
  • leader等待超过半数的follower发送FOLLOWERINFO信息给leader,然后计算出epoch(如有疑问,见2步)。接着回复LEADERINFO消息(带上epoch、protocolVersion= 0x10000)。这里和leader#getEpochToPropose相对应。
  • Follower收到消息以后回复ACKEPOCH消息(带上epoch、lastLoggedZxid)。
  • 过程有点绕,我们解释一下这么做的原因:
  • 集群已超过半数为基本条件,这意味着epoch计算时,leader需要通过超过半数follower的epoch来计算出一个新的epoch,所以就必须等待这个条件。
  • Leader需要将计算出的新epoch告诉follower。这也比较好理解:一届leader选举出以后,以后所有的数据应该都发生在此届之中,所以应该统一集群的epoch。
  • Follower再次回复信息,一来是做个ack,告知已经让epoch和leader的epoch保持一致了;二来是为后续确定同步点做准备。

 

6) 6-确保超过半数的followerack leader

5中的最后一步指的是一个follower回复ACKEPOCH消息,但是集群需要确保超过半数的follower ack,所以这里会有一个等待确认的过程。这里和leader中等待ack过程相对应。

等待leader收到半数以上的ack,是为了确保整个集群多数派已经更新了epoch。

 

7) 7-确定同步点

a)  leader中确定同步节点的逻辑是:
  • 首先设置type=SNAP
  • 如果follower.lastZxid = leader.lastProcessedZxid,那么设置type=DIFF。
  • 如果commitedLog为空,那么设置type=DIFF。
  • 如果follower.lastZxid >leader.maxCommittedLog, 那么设置type= TRUNC。
  • 如果leader.maxCommittedLog>= follower.lastZxid并且 leader.minCommittedLog <= follower.lastZxid,确定同步起始位置,并且将此后的所有提议消息(提议+commit提议两个消息)加入queuedPackets中。
  • 将type、zxid发送个follower

示例代码如下:

ReentrantReadWriteLocklock = leader.zk.getZKDatabase().getLogLock();
ReadLockrl = lock.readLock();
try {
   rl.lock();
   final longmaxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
   final longminCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
    LOG.info("Synchronizingwith Follower sid:" + sid + "maxCommittedLog=0x"
           +Long.toHexString(maxCommittedLog)+ " minCommittedLog=0x" +Long.toHexString(minCommittedLog)
          + "peerLastZxid=0x" + Long.toHexString(peerLastZxid));

   LinkedList<Proposal> proposals= leader.zk.getZKDatabase().getCommittedLog();

   if (peerLastZxid== leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()){
        // 同伴(follower)的事务id和leader的事务id相同,说明follower和leader是同步的。
        LOG.info("leaderandfollower are in sync, zxid=0x{}", Long.toHexString(peerLastZxid));
       packetToSend = Leader.DIFF;
        zxidToSend = peerLastZxid;
   } else if(proposals.size() != 0) {
       LOG.debug("proposalsize is {}", proposals.size());
        if ((maxCommittedLog >= peerLastZxid) &&(minCommittedLog <= peerLastZxid)) {
           // follower有数据需要从leader处同步
           LOG.debug("Sendingproposals to follower");

           // as we look throughproposals, thisvariable keeps
           // track of previous
           // proposal Id.
           long prevProposalZxid = minCommittedLog;

           // Keep track ofwhether we are about tosend the first
          // packet.
           // Before sending thefirst packet, we haveto tell the
          // learner
           // whether to expect atrunc ora diff
          boolean firstPacket = true;

           // If we are here, wecan use committedLogto sync with
           // follower. Then weonly need to decidewhether to
           // send truncor not
          packetToSend=Leader.DIFF;
           zxidToSend = maxCommittedLog;

          for(Proposal propose : proposals){
               // skip theproposals the peer alreadyhas
              if(propose.packet.getZxid()<= peerLastZxid) {
                  prevProposalZxid = propose.packet.getZxid();
                  continue;
              } else{
                  // If we are sending thefirst packet, figure
                  // out whether to trunc incase the follower has
                  // some proposals thatthe leader doesn't
                  if (firstPacket){
                      firstPacket = false;
                      //Does the peer have some proposals that
                      //the leader hasn't seen yet
                      if (prevProposalZxid< peerLastZxid) {
                          // send a trunc message beforesending
                         // the diff
                         packetToSend = Leader.TRUNC;
                         zxidToSend = prevProposalZxid;
                         updates = zxidToSend;
                      }
                  }
                  queuePacket(propose.packet);
                  QuorumPacket qcommit = newQuorumPacket(Leader.COMMIT, propose.packet.getZxid(),null,
                         null);
                  queuePacket(qcommit);
              }
           }
        } else if (peerLastZxid > maxCommittedLog) {
           LOG.debug("SendingTRUNC to follower zxidToSend=0x{} updates=0x{}",
                 Long.toHexString(maxCommittedLog),Long.toHexString(updates));

          packetToSend=Leader.TRUNC;
           zxidToSend = maxCommittedLog;
           updates = zxidToSend;
       } else {
           LOG.warn("Unhandledproposal scenario");
       }
   } else {
        // just let the statetransfer happen
        LOG.debug("proposalsisempty");
   }

   LOG.info("Sending" +Leader.getPacketType(packetToSend));
   leaderLastZxid = leader.startForwarding(this,updates);

} finally {
   rl.unlock();
}
b)  Follower收到消息后进行一下处理:
  • 如果type= DIFF,说明主从有差异,直接接收数据就好。
  • 如果type= SNAP,那么说明主从差异太多了,直接从主下载snapshot数据。
  • 如果type= TRUNC,说明从的zxid比主大,需要将follower中大的这部分提议清理掉。
  • 如果type为其他值,那说明异常了,退出服务。

 

8) 8-数据同步

a)  leader
  • 向queuedPackets队列中加入NEWLEADER消息。
  • 在LearnerHandler线程中启动一个新线程将queuedPackets中的消息发送给follower。
  • 因为最后一个消息是NEWLEADER,Follower收到此消息时,回复一个ACK消息,leader根据此消息确定同步过程结束。
  • 等待指定的时间,确保leader半数以上的follower都已经完成数据同步,指定时间内集群满足这个条件,则集群成形;不满足则抛出异常,退出lead流程。
  • 向queuedPackets队列中加入UPTODATE消息。

 

b)  follower

接收leader发送过来的数据。

当收到NEWLEADER消息时,表示目前所有消息已经全部同步完了,Follower回复ACK消息告知Leader目前最新的zxid。

Leader收到ACK消息后,等待集群成形,即崩溃恢复模式结束,成型后会给follower发送NEWLEADER消息,follower收到NEWLEADER后退出同步逻辑。

 

c)  说明

截止此步骤结束,整个zookeeper集群方退出崩溃恢复模式。到此步骤成功前的任何一个步骤失败或者条件不满足,均退出lead流程、follower流程,即再次进入选举流程。

 

9) 9-进入广播模式

这一环节是zookeeper集群接收提议、提交提议、维持心跳等逻辑的环节,即zookeeper集群正常运行的环节。这一阶段的过程大概如下:

Leader收到请求后,将请求封装成提议发送给follower,消息类型为PROPOSAL

Follower收到PROPOSAL消息以后,回复Leader一个ACK消息。

Leader发现一个PROPOSAL有超过半数以上回复,则认为此消息可以commit了,那么发送COMMIT消息给follower。

 

这一环节还有几种消息:

  • PING:主从心跳维持消息。
  • REVALIDATE:延长session,保证主从会话有效。
  • SYNC:将已提交的数据刷新到committedRequests集合。
  • UPTODATE:这个环节应该受到整个消息,如果收到了留一条日志。

 

3.  总结

Zookeeper数据同步核心就是在于上面提到的各种消息。接下来我们总结一下上面的整个流程。

1) 建立链接

选举后Leader、Follower身份将被确立下来;接着Follower向Leader发起连接,Leader创建LearnerHandler。

2) 确定集群epoch

leader等待超过半数follower链接,计算出新的epoch。

protocolVersion>0x10000时,Leader会等待Follower确认epoch。

3) 同步数据

leader根据follower给的zxid信息,计算出需要同步的数据,并将这些数据放到queuedPackets队列中。

在发送已提交的提议数据前,会发送给follower一个消息,告诉follower进行一些预处理。例如:如果follower的zxid太大,那么截取部分消息;如果follower落后太远,那么直接从leader获取snapshot等。

启动线程发送queuedPackets中的提议消息。

提议消息发送完成后,Leader发送发送NEWLEADER消息告知follower同步已结束;follower收到此消息以后会回复ACK消息给Leader,表示知道已经同步结束了。注意,leader会等待至少半数以上的FollowerACK,因为follower同步过程耗时可能差别较大,这里需要等待一段时间,让follower跟上leader。

超过半数Follwer ACK,意味着整个集群大部分服务器已经和leader同步了,那么Zookeeper集群也就“成形”了。leader再给一个UPTODATE消息,告诉follower退出同步逻辑。

 

4) 进入广播模式

follower、LearnerHandler都将进入无限循环,处理REQUEST、发送提议、提交提议、维持集群心跳。

 

时间: 2024-11-01 03:30:22

ZAB协议恢复模式-数据同步的相关文章

ZAB协议恢复模式-leader选举

之前在网上看了很多zookeeper的zab原理,但讲述的都不够详细,很多地方模糊不清,所以就研究了一下zookeeper源码,并整理成几篇博客,希望对其他小伙伴有所帮助.本文是ZAB协议崩溃恢复Leader选举部分的内容,数据同步见另一篇博客 <ZAB协议恢复模式-数据同步>    . 为了避免理解上的歧义,将投票动作和投票信息区分开,在本文中,我将服务器的投票信息称之为选票. 一  基本概念 1.  Noitifcation Notification其实是选举过程中的通信信息:选举过程主要

SQL server 2005 备份恢复模式

转自联机丛书 可用于数据库的还原操作取决于所用的恢复模式.下表简要说明了每种恢复模式是否支持给定的还原方案以及适用范围. 还原操作 完整恢复模式 大容量日志恢复模式 简单恢复模式 数据还原 完整还原(如果日志可用). 某些数据将丢失. 自上次完整备份或差异备份后的任何数据将丢失. 时点还原 日志备份所涵盖的任何时间. 日志备份包含任何大容量日志更改时不允许. 不支持. 文件还原* 完全支持. 不完全支持.** 仅对只读辅助文件可用. 页面还原* 完全支持. 不完全支持.** 无. 逐级(文件组级

用脚本模式配置数据同步

大数据开发套件里可以通过配置同步任务,实现数据在不同数据源之间的迁移.但是因为目前只部署在华东1(参考文档),有一些特殊网络环境可能无法覆盖到.比如VPC下的DRDS或者其他区域自建数据库内网就不通了.不过套件还提供了脚本模式+调度资源设置这2个大杀器,满足各种复杂场景下的数据同步功能. 本文就数据从MaxCompute的数据导出到VPC下的DRDS为例,详细介绍如何使用这两种方法来实现灵活的数据同步. 同步原理 首先介绍下大数据开发套件的同步任务是怎么做的. (这个图片来自这里) 常有人以同步

Zookeeper ZAB 协议分析

前言 ZAB 协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议.在 ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性,基于该协议,ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性. Atomic broadcast protocol ZAB 是 Zookeeper 原子广播协议的简称,下面我们来讨论协议的内容,注意:理论与实现是有区别的,如果你对协议的理论不感兴趣,可以直接跳过看实现. 问题的提出 Zoo

ZAB协议

zookeeper依赖zab协议来实现分布式数据一致性.基于该协议,zookeeper实现了一种主备模式的系统架构来保持ZooKeeper为高可用的一致性协调框架,自然的ZooKeeper也有着一致性算法的实现,ZooKeeper使用的是ZAB协议作为数据一致性的算法, ZAB(ZooKeeper Atomic Broadcast ) 全称为:原子消息广播协议:ZAB可以说是在Paxos算法基础上进行了扩展改造而来的,ZAB协议设计了支持崩溃恢复,ZooKeeper使用单一主进程Leader用于

MySQL超时参数以及相关数据集成、DataX数据同步案例分享

一.背景 MySQL系统变量提供关于服务器的一些配置和能力信息,大部分变量可在mysqld服务进程启动时设置,部分变量可在mysqld服务进程运行时设置.合理的系统变量设值范围,是保障MySQL稳定提供服务的重要因素.本文主要描述MySQL数据库的超时timeout相关的一些系统变量,部分参数同程序应用中常见到的CommunicationsException: Communications link failure异常息息相关. 本文也结合数据同步的场景,对使用DataX3进行MySQL数据同步

Raft对比ZAB协议

系列文章 Raft算法赏析 ZooKeeper的一致性算法赏析 Raft对比ZAB协议 0 一致性问题 本篇文章想总结下Raft和ZAB在处理一些一致性问题上的做法,详见之前对这2个算法的描述 Raft算法赏析 ZooKeeper的一致性算法赏析 上述分别是针对如下算法实现的讨论: Raft的实现copycat,由于Raft算法本身已经介绍的相当清晰,copycat基本上和Raft算法保持一致 ZAB的实现ZooKeeper,由于ZooKeeper里面的很多实现细节并没有在ZAB里体现(ZAB里

SQLServer 2012之AlwaysOn —— 指定数据同步链路,消除网络抖动导致的提交延迟问题

原文:SQLServer 2012之AlwaysOn -- 指定数据同步链路,消除网络抖动导致的提交延迟问题 事件起因:近期有研发反应,某数据库从08切换到12环境后,不定期出现写操作提交延迟的问题: 事件分析:在排除了系统资源争用等问题后,初步分析可能由于网络抖动导致同步模式alwayson节点经常出现会话超时等待提交的问题导致. 经过排查,扩展事件里发现不定期出现35202错误,这是一条副本连接恢复的消息.   由于机房网络环境复杂,数据库服务器和应用服务器混用一个交换机,在业务高峰期时,因

拯救你的数据 通过日志恢复MSSQL数据_MsSql

这段时间看了关于在SQL server 中通过日志和时间点来恢复数据.也看了一些网上的例子,看如何通过日志来恢复数据. 前提条件: 数据库的故障恢复改为非简单模式,去掉自动关闭和自动收缩两个选项     如果是简单模式:类似下面的语句操作数据就不会记录到日志中:  select * into t from [表名] 这时为保证数据的完整要将数据库的恢复模式改成"完整" 测试环境:    1.建立数据库和测试表         create database zp create tabl