Server被拉起有两种方式,
Address address = new Address("123.456.789.0", 5000); CopycatServer.Builder builder = CopycatServer.builder(address); builder.withStateMachine(MapStateMachine::new);
自己拉起一个cluster,
CompletableFuture<CopycatServer> future = server.bootstrap();
future.join();
join到一个现有的cluster,
Collection<Address> cluster = Collections.singleton(new Address("127.0.0.1", 8700))
server.join(cluster).join();
CopycatServer.builder.build
/** * @throws ConfigurationException if a state machine, members or transport are not configured */ @Override public CopycatServer build() { if (stateMachineFactory == null) throw new ConfigurationException("state machine not configured"); // If the transport is not configured, attempt to use the default Netty transport. if (serverTransport == null) { try { serverTransport = (Transport) Class.forName("io.atomix.catalyst.transport.netty.NettyTransport").newInstance(); //默认netty } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw new ConfigurationException("transport not configured"); } } // If the client transport is not configured, default it to the server transport. if (clientTransport == null) { clientTransport = serverTransport; } // If no serializer instance was provided, create one. if (serializer == null) { serializer = new Serializer(new PooledHeapAllocator()); } // Resolve serializable request/response and other types. serializer.resolve(new ClientRequestTypeResolver()); serializer.resolve(new ClientResponseTypeResolver()); serializer.resolve(new ProtocolSerialization()); serializer.resolve(new ServerSerialization()); serializer.resolve(new StorageSerialization()); // If the storage is not configured, create a new Storage instance with the configured serializer. if (storage == null) { storage = new Storage(); //storage } ConnectionManager connections = new ConnectionManager(serverTransport.client()); ThreadContext threadContext = new SingleThreadContext(String.format("copycat-server-%s-%s", serverAddress, name), serializer); //单线程的TreadContext,对thread的简单封装,用于执行对statemachine的操作 ServerContext context = new ServerContext(name, type, serverAddress, clientAddress, storage, serializer, stateMachineFactory, connections, threadContext); //封装成ServerContext context.setElectionTimeout(electionTimeout) .setHeartbeatInterval(heartbeatInterval) .setSessionTimeout(sessionTimeout) .setGlobalSuspendTimeout(globalSuspendTimeout); return new CopycatServer(name, clientTransport, serverTransport, context); }
CopycatServer.bootstrap
public CompletableFuture<CopycatServer> bootstrap() { return bootstrap(Collections.EMPTY_LIST); //仅仅拉起自己,所以参数是empty list }
public CompletableFuture<CopycatServer> bootstrap(Collection<Address> cluster) { return start(() -> cluster().bootstrap(cluster)); }
调用start,
/** * Starts the server. */ private CompletableFuture<CopycatServer> start(Supplier<CompletableFuture<Void>> joiner) { if (started) return CompletableFuture.completedFuture(this); if (openFuture == null) { synchronized (this) { if (openFuture == null) { Function<Void, CompletionStage<CopycatServer>> completionFunction = state -> { CompletableFuture<CopycatServer> future = new CompletableFuture<>(); openFuture = null; joiner.get().whenComplete((result, error) -> { //处理joiner if (error == null) { if (cluster().leader() != null) { started = true; future.complete(this); } else { electionListener = cluster().onLeaderElection(leader -> { if (electionListener != null) { started = true; future.complete(this); electionListener.close(); electionListener = null; } }); } } else { future.completeExceptionally(error); } }); return future; }; if (closeFuture == null) { openFuture = listen().thenCompose(completionFunction); //listen } else { openFuture = closeFuture.thenCompose(c -> listen().thenCompose(completionFunction)); } } } }
start主要做两件事,执行joiner和listen
joiner这里是cluster().bootstrap(cluster)
@Override public CompletableFuture<Void> bootstrap(Collection<Address> cluster) { if (joinFuture != null) return joinFuture; if (configuration == null) { if (member.type() != Member.Type.ACTIVE) { return Futures.exceptionalFuture(new IllegalStateException("only ACTIVE members can bootstrap the cluster")); } else { // Create a set of active members. Set<Member> activeMembers = cluster.stream() .filter(m -> !m.equals(member.serverAddress())) .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated())) .collect(Collectors.toSet()); // Add the local member to the set of active members. activeMembers.add(member); // Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration. configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); } } return join(); }
listen
/** * Starts listening the server. */ private CompletableFuture<Void> listen() { CompletableFuture<Void> future = new CompletableFuture<>(); context.getThreadContext().executor().execute(() -> { internalServer.listen(cluster().member().serverAddress(), context::connectServer).whenComplete((internalResult, internalError) -> { //internalServer可能是local或是netty if (internalError == null) { // If the client address is different than the server address, start a separate client server. if (clientServer != null) { clientServer.listen(cluster().member().clientAddress(), context::connectClient).whenComplete((clientResult, clientError) -> { //和client沟通可能是不同的地址 started = true; future.complete(null); }); } else { started = true; future.complete(null); } } else { future.completeExceptionally(internalError); } }); }); return future; }
ServerContext
/** * Handles a connection from a client. */ public void connectClient(Connection connection) { threadContext.checkThread(); // Note we do not use method references here because the "state" variable changes over time. // We have to use lambdas to ensure the request handler points to the current state. connection.handler(RegisterRequest.class, request -> state.register(request)); connection.handler(ConnectRequest.class, request -> state.connect(request, connection)); connection.handler(KeepAliveRequest.class, request -> state.keepAlive(request)); connection.handler(UnregisterRequest.class, request -> state.unregister(request)); connection.handler(CommandRequest.class, request -> state.command(request)); connection.handler(QueryRequest.class, request -> state.query(request)); connection.closeListener(stateMachine.executor().context().sessions()::unregisterConnection); } /** * Handles a connection from another server. */ public void connectServer(Connection connection) { threadContext.checkThread(); // Handlers for all request types are registered since requests can be proxied between servers. // Note we do not use method references here because the "state" variable changes over time. // We have to use lambdas to ensure the request handler points to the current state. connection.handler(RegisterRequest.class, request -> state.register(request)); connection.handler(ConnectRequest.class, request -> state.connect(request, connection)); connection.handler(KeepAliveRequest.class, request -> state.keepAlive(request)); connection.handler(UnregisterRequest.class, request -> state.unregister(request)); connection.handler(PublishRequest.class, request -> state.publish(request)); connection.handler(ConfigureRequest.class, request -> state.configure(request)); connection.handler(InstallRequest.class, request -> state.install(request)); connection.handler(JoinRequest.class, request -> state.join(request)); connection.handler(ReconfigureRequest.class, request -> state.reconfigure(request)); connection.handler(LeaveRequest.class, request -> state.leave(request)); connection.handler(AppendRequest.class, request -> state.append(request)); connection.handler(PollRequest.class, request -> state.poll(request)); connection.handler(VoteRequest.class, request -> state.vote(request)); connection.handler(CommandRequest.class, request -> state.command(request)); connection.handler(QueryRequest.class, request -> state.query(request)); connection.closeListener(stateMachine.executor().context().sessions()::unregisterConnection); }
加入一个cluster
public CompletableFuture<CopycatServer> join(Collection<Address> cluster) { return start(() -> cluster().join(cluster)); }
ClusterState.join,这里的逻辑和bootstrap类似
@Override public synchronized CompletableFuture<Void> join(Collection<Address> cluster) { // If no configuration was loaded from disk, create a new configuration. if (configuration == null) { //当不存在configuration // Create a set of cluster members, excluding the local member which is joining a cluster. Set<Member> activeMembers = cluster.stream() .filter(m -> !m.equals(member.serverAddress())) //过滤掉自己 .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated())) //创建ServerMember对象 .collect(Collectors.toSet()); // If the set of members in the cluster is empty when the local member is excluded, // fail the join. if (activeMembers.isEmpty()) { //如果cluster为空 return Futures.exceptionalFuture(new IllegalStateException("cannot join empty cluster")); } // Create a new configuration and configure the cluster. Once the cluster is configured, the configuration // will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary. configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); //让configuration生效 } return join(); }
只是需要初始化configuration
然后调用join
/** * Starts the join to the cluster. */ private synchronized CompletableFuture<Void> join() { joinFuture = new CompletableFuture<>(); context.getThreadContext().executor().execute(() -> { // Transition the server to the appropriate state for the local member type. context.transition(member.type()); //将当前member transitioin到指定type // Attempt to join the cluster. If the local member is ACTIVE then failing to join the cluster // will result in the member attempting to get elected. This allows initial clusters to form. List<MemberState> activeMembers = getActiveMemberStates(); if (!activeMembers.isEmpty()) { join(getActiveMemberStates().iterator()); //join 其他active members } else { joinFuture.complete(null); } }); return joinFuture.whenComplete((result, error) -> joinFuture = null); }
/** * Recursively attempts to join the cluster. */ private void join(Iterator<MemberState> iterator) { if (iterator.hasNext()) { cancelJoinTimer(); joinTimeout = context.getThreadContext().schedule(context.getElectionTimeout().multipliedBy(2), () -> { join(iterator); //只要不成功,就会一直递归schedule join }); MemberState member = iterator.next(); LOGGER.debug("{} - Attempting to join via {}", member().address(), member.getMember().serverAddress()); context.getConnections().getConnection(member.getMember().serverAddress()).thenCompose(connection -> { JoinRequest request = JoinRequest.builder() .withMember(new ServerMember(member().type(), member().serverAddress(), member().clientAddress(), member().updated())) .build(); return connection.<JoinRequest, JoinResponse>send(request); //发送join request }).whenComplete((response, error) -> { // Cancel the join timer. cancelJoinTimer(); //先cancel join timer if (error == null) { //join 成功 if (response.status() == Response.Status.OK) { LOGGER.info("{} - Successfully joined via {}", member().address(), member.getMember().serverAddress()); Configuration configuration = new Configuration(response.index(), response.term(), response.timestamp(), response.members()); // Configure the cluster with the join response. // Commit the configuration as we know it was committed via the successful join response. configure(configuration).commit(); //更新配置 } else if (response.error() == null || response.error() == CopycatError.Type.CONFIGURATION_ERROR) { // If the response error is null, that indicates that no error occurred but the leader was // in a state that was incapable of handling the join request. Attempt to join the leader // again after an election timeout. LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address()); resetJoinTimer(); } else { // If the response error was non-null, attempt to join via the next server in the members list. LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address()); join(iterator); } } else { LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address()); join(iterator); } }); } // If join attempts remain, schedule another attempt after two election timeouts. This allows enough time // for servers to potentially timeout and elect a leader. else { LOGGER.debug("{} - Failed to join cluster, retrying...", member.address()); resetJoinTimer(); //如果遍历完还不成功,reset } }
对任何一个member,join成功,即可,因为join request无论发给谁,都会forward给leader
时间: 2024-10-02 19:51:46