为了便于实现,Copycat把member分成3种,
active, passive, and reserve members — each of which play some role in supporting rapid replacement of failed servers.
Active members are full voting members which participate in all aspects of the Raft consensus algorithm. Active servers are always in one of the Raft states — follower, candidate, or leader — at any given time.
Active member包含Raft协议中的follower, candidate, or leader 角色,即正常使用的member
Passive member
When a new server is added to a Raft cluster, the server typically must be caught up to within some bound of the leader before it can become a full voting member of the cluster. Adding a new server without first warming up its log will result in some period of decreased availability.
Systems can maintain servers that are virtually kept in sync with the rest of the cluster at all times. We call these servers passive servers.
那些只是和集群其他的member保持同步,但不参加vote流程的member,称为Passive
有passive节点的好处,是当需要加Active节点和替换fail的Active节点时,不需要catch up的过程,直接替换就可以
passive节点,还可以作为只读节点
Passive Replication
Passive节点的同步catchup,不是从leader直接接收AppendEntries RPCs
Each follower is responsible for sending AppendEntries RPCs to a subset of passive servers at regular intervals.
- Each follower sends AppendEntries RPCs only to a subset of passive servers
- Followers send only committed entries to passive servers
Reserve Members
For large clusters, though, the overhead of maintaining passive servers can by itself become a drain on the cluster’s resources.
Each additional passive server imposes the overhead of replicating all committed log entries, and this is significant even if done by followers. Thus, to ease the load on large clusters, we introduce the reserve member type.
对于比较大的集群,维护passive member的代价也很高;所以为了降低这个成本,又加入reserve member
Reserve members serve as standbys to passive members.
Reserve servers do not maintain state machines and need not known about committed entries.
However, because reserve servers can be promoted to passive, they do need to have some mechanism for learning about configuration changes.
Reserve是passive的standby,这样就不用维护太多的passive,当一个passive升级成active后,再把一个Reserve加入到Passive中
所以Reserve不需要存储完整的machine state,但是需要知道configuration的变化
Member
public interface Member { enum Type { INACTIVE, RESERVE, PASSIVE, ACTIVE, } CompletableFuture<Void> promote(); CompletableFuture<Void> demote(); }
定义member有几种类型,然后可以promote和demote
ServerMember
public final class ServerMember implements Member, CatalystSerializable, AutoCloseable { @Override public CompletableFuture<Void> promote() { return configure(Type.values()[type.ordinal() + 1]); } @Override public CompletableFuture<Void> demote() { return configure(Type.values()[type.ordinal() - 1]); } /** * Recursively reconfigures the cluster. */ private void configure(Member.Type type, CompletableFuture<Void> future) { // Set a timer to retry the attempt to leave the cluster. configureTimeout = cluster.getContext().getThreadContext().schedule(cluster.getContext().getElectionTimeout(), () -> { configure(type, future); }); // Attempt to leave the cluster by submitting a LeaveRequest directly to the server state. // Non-leader states should forward the request to the leader if there is one. Leader states // will log, replicate, and commit the reconfiguration. cluster.getContext().getServerState().reconfigure(ReconfigureRequest.builder() //往ServerState发送reconfigure请求 .withIndex(cluster.getConfiguration().index()) .withTerm(cluster.getConfiguration().term()) .withMember(new ServerMember(type, serverAddress(), clientAddress(), updated)) .build()).whenComplete((response, error) -> { //....... }); }
下面看看ServerState
ServerState,定义各种可以接受的request
public interface ServerState extends Managed<ServerState> { CopycatServer.State type(); CompletableFuture<RegisterResponse> register(RegisterRequest request); CompletableFuture<ConnectResponse> connect(ConnectRequest request, Connection connection); CompletableFuture<KeepAliveResponse> keepAlive(KeepAliveRequest request); CompletableFuture<UnregisterResponse> unregister(UnregisterRequest request); CompletableFuture<PublishResponse> publish(PublishRequest request); CompletableFuture<ConfigureResponse> configure(ConfigureRequest request); CompletableFuture<InstallResponse> install(InstallRequest request); CompletableFuture<JoinResponse> join(JoinRequest request); CompletableFuture<ReconfigureResponse> reconfigure(ReconfigureRequest request); CompletableFuture<LeaveResponse> leave(LeaveRequest request); CompletableFuture<AppendResponse> append(AppendRequest request); CompletableFuture<PollResponse> poll(PollRequest request); CompletableFuture<VoteResponse> vote(VoteRequest request); CompletableFuture<CommandResponse> command(CommandRequest request); CompletableFuture<QueryResponse> query(QueryRequest request); }
AbstractState
public abstract class AbstractState implements ServerState { /** * Forwards the given request to the leader if possible. */ protected <T extends Request, U extends Response> CompletableFuture<U> forward(T request) { CompletableFuture<U> future = new CompletableFuture<>(); context.getConnections().getConnection(context.getLeader().serverAddress()).whenComplete((connection, connectError) -> { if (connectError == null) { connection.<T, U>send(request).whenComplete((response, responseError) -> { if (responseError == null) { future.complete(response); } else { future.completeExceptionally(responseError); } }); } else { future.completeExceptionally(connectError); } }); return future; } /** * Updates the term and leader. */ protected boolean updateTermAndLeader(long term, int leader) { // If the request indicates a term that is greater than the current term or no leader has been // set for the current term, update leader and term. if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) { context.setTerm(term); context.setLeader(leader); // Reset the current cluster configuration to the last committed configuration when a leader change occurs. context.getClusterState().reset(); return true; } return false; } }
主要是增加一些工具接口
InactiveState
对于InactiveState,只能响应configure请求,其他请求都是返回error
class InactiveState extends AbstractState { @Override public CompletableFuture<ConfigureResponse> configure(ConfigureRequest request) { updateTermAndLeader(request.term(), request.leader()); Configuration configuration = new Configuration(request.index(), request.term(), request.timestamp(), request.members()); // Configure the cluster membership. This will cause this server to transition to the // appropriate state if its type has changed. context.getClusterState().configure(configuration); // 更新clusterState中对应server的state // If the configuration is already committed, commit it to disk. // Check against the actual cluster Configuration rather than the received configuration in // case the received configuration was an older configuration that was not applied. if (context.getCommitIndex() >= context.getClusterState().getConfiguration().index()) { context.getClusterState().commit(); } return CompletableFuture.completedFuture(logResponse(ConfigureResponse.builder() .withStatus(Response.Status.OK) .build())); }
context.getClusterState().configure
首先看看什么是ClusterState
Manages the persistent state of the Copycat cluster from the perspective of a single server
也就是说,每个server上都会保存ClusterState,来了解整个cluster的情况
并且ClusterState.member用来表示self server
ClusterState.members用来记录cluster中所有的member的状态
这里configure的逻辑,主要就是根据传入的configuration来更新member和members
context.getClusterState().commit()
核心逻辑,是把更新的配置commit到disk
if (context.getMetaStore().loadConfiguration().index() < configuration.index()) { context.getMetaStore().storeConfiguration(configuration); }
ReserveState
响应append请求
class ReserveState extends InactiveState { @Override public CompletableFuture<AppendResponse> append(AppendRequest request) { context.checkThread(); logRequest(request); updateTermAndLeader(request.term(), request.leader()); // Update the local commitIndex and globalIndex. context.setCommitIndex(request.commitIndex()); context.setGlobalIndex(request.globalIndex()); //Sets the maximum compaction index for major compaction
return CompletableFuture.completedFuture(logResponse(AppendResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withSucceeded(true) .withLogIndex(0) .build())); }
AbstractState.updateTermAndLeader
protected boolean updateTermAndLeader(long term, int leader) { // If the request indicates a term that is greater than the current term or no leader has been // set for the current term, update leader and term. if (term > context.getTerm() || (term == context.getTerm() && context.getLeader() == null && leader != 0)) { context.setTerm(term); //如果term,leader是新的更新context信息 context.setLeader(leader); // Reset the current cluster configuration to the last committed configuration when a leader change occurs. context.getClusterState().reset(); //需要reset到最新commited配置,避免老leader的脏数据 return true; } return false; }
context.setCommitIndex
ServerContext setCommitIndex(long commitIndex) { long previousCommitIndex = this.commitIndex; if (commitIndex > previousCommitIndex) { this.commitIndex = commitIndex; log.commit(Math.min(commitIndex, log.lastIndex())); //log commit到这个index long configurationIndex = cluster.getConfiguration().index(); if (configurationIndex > previousCommitIndex && configurationIndex <= commitIndex) { cluster.commit(); //commit cluster的configuration } } return this; }
响应command,query,register, keepalive, unregister, join, leave, reconfigure
都是forward到leader
也就是说,reserve以上的state都至少会转发这些request到leader
PassiveState
对于passive state,主要是和其他的member完成同步
在open的时候会先truncate,没有commit的log,即所有PassiveState往后的state都有这个操作
@Override public CompletableFuture<ServerState> open() { return super.open() .thenRun(this::truncateUncommittedEntries) .thenApply(v -> this); }
private void truncateUncommittedEntries() { if (type() == CopycatServer.State.PASSIVE) { context.getLog().truncate(Math.min(context.getCommitIndex(), context.getLog().lastIndex())); } }
open是何时调用的,
ServerContext.transition,在state迁移时,
// Close the old state. try { this.state.close().get(); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException("failed to close Raft state", e); } // Force state transitions to occur synchronously in order to prevent race conditions. try { this.state = createState(state); this.state.open().get(); } catch (InterruptedException | ExecutionException e) { throw new IllegalStateException("failed to initialize Raft state", e); }
会调用old state的close,并且调用新state的open
注意后面的get,说明这里是block同步执行的
响应connect请求,注册connection
context.getStateMachine().executor().context().sessions().registerConnection(request.client(), connection);
响应append请求,这里可以看到passive的append请求的逻辑要比reserve复杂,因为reserve只需要同步config,而passive需要同步状态机的数据
@Override public CompletableFuture<AppendResponse> append(final AppendRequest request) { context.checkThread(); logRequest(request); updateTermAndLeader(request.term(), request.leader()); return CompletableFuture.completedFuture(logResponse(handleAppend(request))); }
handleAppend
protected AppendResponse handleAppend(AppendRequest request) { // If the request term is less than the current term then immediately // reply false and return our current term. The leader will receive // the updated term and step down. if (request.term() < context.getTerm()) { LOGGER.debug("{} - Rejected {}: request term is less than the current term ({})", context.getCluster().member().address(), request, context.getTerm()); return AppendResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withSucceeded(false) .withLogIndex(context.getLog().lastIndex()) .build(); } else { return checkGlobalIndex(request); } }
当request的term比较旧的时候,直接回复拒绝append,因为这说明这个leader已经是过期的,他需要stepdown
checkGlobalIndex,调用appendEntries
/** * Appends entries to the local log. */ protected AppendResponse appendEntries(AppendRequest request) { // Get the last entry index or default to the request log index. long lastEntryIndex = request.logIndex(); if (!request.entries().isEmpty()) { lastEntryIndex = request.entries().get(request.entries().size() - 1).getIndex(); } // Ensure the commitIndex is not increased beyond the index of the last entry in the request. long commitIndex = Math.max(context.getCommitIndex(), Math.min(request.commitIndex(), lastEntryIndex)); // Append entries to the log starting at the last log index. for (Entry entry : request.entries()) { // If the entry index is greater than the last index and less than the commit index, append the entry. // We perform no additional consistency checks here since passive members may only receive committed entries. if (context.getLog().lastIndex() < entry.getIndex() && entry.getIndex() <= commitIndex) { context.getLog().skip(entry.getIndex() - context.getLog().lastIndex() - 1).append(entry); LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex()); } } // Update the context commit and global indices. context.setCommitIndex(commitIndex); context.setGlobalIndex(request.globalIndex()); // Apply commits to the state machine in batch. context.getStateMachine().applyAll(context.getCommitIndex()); return AppendResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withSucceeded(true) .withLogIndex(context.getLog().lastIndex()) .build(); }
关键就是中间的注释,
当entry index 大于last index,并小于commit index时,就append entry
响应query请求,passive在某些条件下,也可以响应query请求
@Override public CompletableFuture<QueryResponse> query(QueryRequest request) { context.checkThread(); logRequest(request); // If the query was submitted with RYW or monotonic read consistency, attempt to apply the query to the local state machine. if (request.query().consistency() == Query.ConsistencyLevel.SEQUENTIAL) { // If this server has not yet applied entries up to the client's session ID, forward the // query to the leader. This ensures that a follower does not tell the client its session // doesn't exist if the follower hasn't had a chance to see the session's registration entry. if (context.getStateMachine().getLastApplied() < request.session()) { LOGGER.debug("{} - State out of sync, forwarding query to leader"); return queryForward(request); } // If the commit index is not in the log then we've fallen too far behind the leader to perform a local query. // Forward the request to the leader. if (context.getLog().lastIndex() < context.getCommitIndex()) { LOGGER.debug("{} - State out of sync, forwarding query to leader"); return queryForward(request); } QueryEntry entry = context.getLog().create(QueryEntry.class) .setIndex(request.index()) .setTerm(context.getTerm()) .setTimestamp(System.currentTimeMillis()) .setSession(request.session()) .setSequence(request.sequence()) .setQuery(request.query()); return queryLocal(entry); //查询local数据 } else { return queryForward(request); } }
ActiveState
abstract class ActiveState extends PassiveState
可以看到ActiveState是abstract,因为active member一定是follower,candidator,leader中的一种
同时Active member作为正式的member,需要响应如,poll,vote,append等请求
先看看append请求,和passive有什么不同
@Override public CompletableFuture<AppendResponse> append(final AppendRequest request) { context.checkThread(); logRequest(request); // If the request indicates a term that is greater than the current term then // assign that term and leader to the current context and transition to follower. boolean transition = updateTermAndLeader(request.term(), request.leader()); //如果request的term比当前的term大,说明你肯定不是leader CompletableFuture<AppendResponse> future = CompletableFuture.completedFuture(logResponse(handleAppend(request))); // If a transition is required then transition back to the follower state. // If the node is already a follower then the transition will be ignored. if (transition) { context.transition(CopycatServer.State.FOLLOWER); //切换到Follower } return future; }
handleAppend最终仍然调用到,
appendEntries
// Iterate through request entries and append them to the log. for (Entry entry : request.entries()) { // If the entry index is greater than the last log index, skip missing entries. if (context.getLog().lastIndex() < entry.getIndex()) { context.getLog().skip(entry.getIndex() - context.getLog().lastIndex() - 1).append(entry); LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex()); } else if (context.getCommitIndex() >= entry.getIndex()) { continue; } else { // Compare the term of the received entry with the matching entry in the log. long term = context.getLog().term(entry.getIndex()); if (term != 0) { if (entry.getTerm() != term) { // We found an invalid entry in the log. Remove the invalid entry and append the new entry. // If appending to the log fails, apply commits and reply false to the append request. LOGGER.debug("{} - Appended entry term does not match local log, removing incorrect entries", context.getCluster().member().address()); context.getLog().truncate(entry.getIndex() - 1).append(entry); LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex()); } } else { context.getLog().truncate(entry.getIndex() - 1).append(entry); LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex()); } } }
主要这块逻辑和Passive是不一样的,
当entry index > last index,不管commit index,就直接append entry
如果entry index<= last index并小于commit index, 忽略这个entry
如果entry index<= last index并大于commit index,说明有脏数据,所以truncate到entry.getIndex() - 1,继续append entry
响应poll请求,
/** * Handles a poll request. */ protected PollResponse handlePoll(PollRequest request) { // If the request term is not as great as the current context term then don't // vote for the candidate. We want to vote for candidates that are at least // as up to date as us. if (request.term() < context.getTerm()) { LOGGER.debug("{} - Rejected {}: candidate's term is less than the current term", context.getCluster().member().address(), request); return PollResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withAccepted(false) //拒绝 .build(); } else if (isLogUpToDate(request.logIndex(), request.logTerm(), request)) { return PollResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withAccepted(true) //接受 .build(); } else { return PollResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withAccepted(false) .build(); } }
isLogUpToDate
boolean isLogUpToDate(long lastIndex, long lastTerm, Request request) { // If the log is empty then vote for the candidate. if (context.getLog().isEmpty()) { LOGGER.debug("{} - Accepted {}: candidate's log is up-to-date", context.getCluster().member().address(), request); return true; } // Read the last entry index and term from the log. long localLastIndex = context.getLog().lastIndex(); long localLastTerm = context.getLog().term(localLastIndex); // If the candidate's last log term is lower than the local log's last entry term, reject the request. if (lastTerm < localLastTerm) { LOGGER.debug("{} - Rejected {}: candidate's last log entry ({}) is at a lower term than the local log ({})", context.getCluster().member().address(), request, lastTerm, localLastTerm); return false; } // If the candidate's last term is equal to the local log's last entry term, reject the request if the // candidate's last index is less than the local log's last index. If the candidate's last log term is // greater than the local log's last term then it's considered up to date, and if both have the same term // then the candidate's last index must be greater than the local log's last index. if (lastTerm == localLastTerm && lastIndex < localLastIndex) { LOGGER.debug("{} - Rejected {}: candidate's last log entry ({}) is at a lower index than the local log ({})", context.getCluster().member().address(), request, lastIndex, localLastIndex); return false; } // If we made it this far, the candidate's last term is greater than or equal to the local log's last // term, and if equal to the local log's last term, the candidate's last index is equal to or greater // than the local log's last index. LOGGER.debug("{} - Accepted {}: candidate's log is up-to-date", context.getCluster().member().address(), request); return true; }
接受poll请求的规则,
如果当前log是empty的,那么只能接受
如果request term小于 local term,拒绝
如果request term等于 local term,但是request index < local index,拒绝
总之,我只同意比我要更新的candidator,at least as up to date as us
响应vote请求
/** * Handles a vote request. */ protected VoteResponse handleVote(VoteRequest request) { // If the request term is not as great as the current context term then don't // vote for the candidate. We want to vote for candidates that are at least // as up to date as us. if (request.term() < context.getTerm()) { //request的term旧 LOGGER.debug("{} - Rejected {}: candidate's term is less than the current term", context.getCluster().member().address(), request); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build(); } // If a leader was already determined for this term then reject the request. else if (context.getLeader() != null) { //已经有leader LOGGER.debug("{} - Rejected {}: leader already exists", context.getCluster().member().address(), request); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build(); } // If the requesting candidate is not a known member of the cluster (to this // node) then don't vote for it. Only vote for candidates that we know about. else if (!context.getClusterState().getRemoteMemberStates().stream().<Integer>map(m -> m.getMember().id()).collect(Collectors.toSet()).contains(request.candidate())) { LOGGER.debug("{} - Rejected {}: candidate is not known to the local member", context.getCluster().member().address(), request); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build(); } // If no vote has been cast, check the log and cast a vote if necessary. else if (context.getLastVotedFor() == 0) { //如果还没有vote其他的candidator if (isLogUpToDate(request.logIndex(), request.logTerm(), request)) { //足够新 context.setLastVotedFor(request.candidate()); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(true) //接受 .build(); } else { return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build(); } } // If we already voted for the requesting server, respond successfully. else if (context.getLastVotedFor() == request.candidate()) { //之前选中也是该candidator LOGGER.debug("{} - Accepted {}: already voted for {}", context.getCluster().member().address(), request, context.getCluster().member(context.getLastVotedFor()).address()); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(true) //接受 .build(); } // In this case, we've already voted for someone else. else { LOGGER.debug("{} - Rejected {}: already voted for {}", context.getCluster().member().address(), request, context.getCluster().member(context.getLastVotedFor()).address()); return VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build(); } }
FollowerState
关键逻辑是等待超时,并试图成为candidator
public synchronized CompletableFuture<ServerState> open() { return super.open().thenRun(this::startHeartbeatTimeout).thenApply(v -> this); }
在open的时候,异步run,thenRun就是不依赖前面的输入
startHeartbeatTimeout –> resetHeartbeatTimeout
private void resetHeartbeatTimeout() { // Set the election timeout in a semi-random fashion with the random range // being election timeout and 2 * election timeout. Duration delay = context.getElectionTimeout().plus(Duration.ofMillis(random.nextInt((int) context.getElectionTimeout().toMillis()))); //随机产生delay时间 heartbeatTimer = context.getThreadContext().schedule(delay, () -> { //delay时间到后 heartbeatTimer = null; if (isOpen()) { context.setLeader(0); //清空leader sendPollRequests(); //发送poll请求 } }); }
当超时结束时,是否我就可以成为candidator,raft论文里面限制,必须要具有最新commit的member才能成为candidator,
那么我怎么知道我是否具有最新的commit
发送poll请求,majority都同意,说明我的commit比大多数都要新或一样新,说明我具有最新的commit
注意这个resetHeartbeatTimeout,不光在这里调用,基本在Follower所有的请求响应时都会调用,即如果和leader有交互,会不停的重启这个timer
只有接收不到leader的心跳了,才会调用sendPollRequests,试图成为candidator
/** * Polls all members of the cluster to determine whether this member should transition to the CANDIDATE state. */ private void sendPollRequests() { // Create a quorum that will track the number of nodes that have responded to the poll request. final AtomicBoolean complete = new AtomicBoolean(); final Set<ServerMember> votingMembers = new HashSet<>(context.getClusterState().getActiveMemberStates().stream().map(MemberState::getMember).collect(Collectors.toList())); //找到所有active的members,并且生成Quorum final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> { // If a majority of the cluster indicated they would vote for us then transition to candidate. complete.set(true); if (elected) { //如果elected成功 context.transition(CopycatServer.State.CANDIDATE); //迁移到candidate } else { resetHeartbeatTimeout(); } }); // Once we got the last log term, iterate through each current member // of the cluster and vote each member for a vote. for (ServerMember member : votingMembers) { LOGGER.debug("{} - Polling {} for next term {}", context.getCluster().member().address(), member, context.getTerm() + 1); PollRequest request = PollRequest.builder() .withTerm(context.getTerm()) .withCandidate(context.getCluster().member().id()) .withLogIndex(lastIndex) //当前我的lastindex .withLogTerm(lastTerm) //当前我的lastTerm,别人需要根据index和term来决定是否poll我 .build(); context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> { connection.<PollRequest, PollResponse>send(request).whenCompleteAsync((response, error) -> { //异步发送request,并且加上callback context.checkThread(); if (isOpen() && !complete.get()) { if (error != null) { LOGGER.warn("{} - {}", context.getCluster().member().address(), error.getMessage()); quorum.fail(); } else { if (response.term() > context.getTerm()) { context.setTerm(response.term()); } if (!response.accepted()) { LOGGER.debug("{} - Received rejected poll from {}", context.getCluster().member().address(), member); quorum.fail(); } else if (response.term() != context.getTerm()) { LOGGER.debug("{} - Received accepted poll for a different term from {}", context.getCluster().member().address(), member); quorum.fail(); } else { LOGGER.debug("{} - Received accepted poll from {}", context.getCluster().member().address(), member); quorum.succeed(); //各种错误后,只有这个表示对方poll我了 } } } }, context.getThreadContext().executor()); }); } }
quorum的实现比较简单,
public class Quorum { private final int quorum; private int succeeded = 1; private int failed; private Consumer<Boolean> callback; private boolean complete; public Quorum(int quorum, Consumer<Boolean> callback) { this.quorum = quorum; this.callback = callback; } private void checkComplete() { if (!complete && callback != null) { if (succeeded >= quorum) { complete = true; callback.accept(true); } else if (failed >= quorum) { complete = true; callback.accept(false); } } } /** * Indicates that a call in the quorum succeeded. */ public Quorum succeed() { succeeded++; checkComplete(); return this; }
succeed就+1,并checkComplete,当成功的数目大于quorum就调用callback
FollowerState的append请求响应
@Override public CompletableFuture<AppendResponse> append(AppendRequest request) { CompletableFuture<AppendResponse> future = super.append(request); // Reset the heartbeat timeout. resetHeartbeatTimeout(); // Send AppendEntries requests to passive members if necessary. appender.appendEntries(); return future; }
可以看到除了调用super的append
以及resetHB外,还有appender.appendEntries();
这应该是Follower需要承担起,把数据同步给passive的责任
final class FollowerAppender extends AbstractAppender { public FollowerAppender(ServerContext context) { super(context); } /** * Sends append requests to assigned passive members. */ public void appendEntries() { if (open) { for (MemberState member : context.getClusterState().getAssignedPassiveMemberStates()) { appendEntries(member); } } }
逻辑就是,给所有assigned的passive memeber,发送appendEntries
CandidateState
candidate的逻辑就是通过vote,变成leader
public synchronized CompletableFuture<ServerState> open() { return super.open().thenRun(this::startElection).thenApply(v -> this); }
startElection-> sendVoteRequests
/** * Resets the election timer. */ private void sendVoteRequests() { // When the election timer is reset, increment the current term and // restart the election. context.setTerm(context.getTerm() + 1).setLastVotedFor(context.getCluster().member().id()); //重新选举,所以term+1;setLastVoteFor,设成self,先投自己一票 final AtomicBoolean complete = new AtomicBoolean(); final Set<ServerMember> votingMembers = new HashSet<>(context.getClusterState().getActiveMemberStates().stream().map(MemberState::getMember).collect(Collectors.toList())); // Send vote requests to all nodes. The vote request that is sent // to this node will be automatically successful. // First check if the quorum is null. If the quorum isn't null then that // indicates that another vote is already going on. final Quorum quorum = new Quorum(context.getClusterState().getQuorum(), (elected) -> { complete.set(true); if (elected) { context.transition(CopycatServer.State.LEADER); //vote成功,就是leader } else { context.transition(CopycatServer.State.FOLLOWER); } }); // Once we got the last log term, iterate through each current member // of the cluster and vote each member for a vote. for (ServerMember member : votingMembers) { LOGGER.debug("{} - Requesting vote from {} for term {}", context.getCluster().member().address(), member, context.getTerm()); VoteRequest request = VoteRequest.builder() .withTerm(context.getTerm()) .withCandidate(context.getCluster().member().id()) .withLogIndex(lastIndex) .withLogTerm(lastTerm) .build(); context.getConnections().getConnection(member.serverAddress()).thenAccept(connection -> { connection.<VoteRequest, VoteResponse>send(request).whenCompleteAsync((response, error) -> { context.checkThread(); if (isOpen() && !complete.get()) { if (error != null) { LOGGER.warn(error.getMessage()); quorum.fail(); } else { if (response.term() > context.getTerm()) { LOGGER.debug("{} - Received greater term from {}", context.getCluster().member().address(), member); context.setTerm(response.term()); complete.set(true); context.transition(CopycatServer.State.FOLLOWER); } else if (!response.voted()) { LOGGER.debug("{} - Received rejected vote from {}", context.getCluster().member().address(), member); quorum.fail(); } else if (response.term() != context.getTerm()) { LOGGER.debug("{} - Received successful vote for a different term from {}", context.getCluster().member().address(), member); quorum.fail(); } else { LOGGER.debug("{} - Received successful vote from {}", context.getCluster().member().address(), member); quorum.succeed(); } } } }, context.getThreadContext().executor()); }); } }
响应append请求,
public CompletableFuture<AppendResponse> append(AppendRequest request) { context.checkThread(); // If the request indicates a term that is greater than the current term then // assign that term and leader to the current context and step down as a candidate. if (request.term() >= context.getTerm()) { //如果term比我的大,说明已经有leader context.setTerm(request.term()); context.transition(CopycatServer.State.FOLLOWER); //退化成follower } return super.append(request); }
响应vote请求,
@Override public CompletableFuture<VoteResponse> vote(VoteRequest request) { context.checkThread(); logRequest(request); // If the request indicates a term that is greater than the current term then // assign that term and leader to the current context and step down as a candidate. if (updateTermAndLeader(request.term(), 0)) { //如果request term比我大,说明已经有leader CompletableFuture<VoteResponse> future = super.vote(request); context.transition(CopycatServer.State.FOLLOWER); //退化成follower return future; } // If the vote request is not for this candidate then reject the vote. if (request.candidate() == context.getCluster().member().id()) { //否则,只有request的candidate id是我,我才同意 return CompletableFuture.completedFuture(logResponse(VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(true) .build())); } else { //candidate不会同意其他的candidate return CompletableFuture.completedFuture(logResponse(VoteResponse.builder() .withStatus(Response.Status.OK) .withTerm(context.getTerm()) .withVoted(false) .build())); } }
LeaderState
leaderState比较复杂单独开个blog