Copycat - CopycatServer

Server被拉起有两种方式,

Address address = new Address("123.456.789.0", 5000);
CopycatServer.Builder builder = CopycatServer.builder(address);
builder.withStateMachine(MapStateMachine::new);

自己拉起一个cluster,

CompletableFuture<CopycatServer> future = server.bootstrap();
future.join();

 

join到一个现有的cluster,

Collection<Address> cluster = Collections.singleton(new Address("127.0.0.1", 8700))
server.join(cluster).join();

 

CopycatServer.builder.build

/**
     * @throws ConfigurationException if a state machine, members or transport are not configured
     */
    @Override
    public CopycatServer build() {
      if (stateMachineFactory == null)
        throw new ConfigurationException("state machine not configured");

      // If the transport is not configured, attempt to use the default Netty transport.
      if (serverTransport == null) {
        try {
          serverTransport = (Transport) Class.forName("io.atomix.catalyst.transport.netty.NettyTransport").newInstance(); //默认netty
        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
          throw new ConfigurationException("transport not configured");
        }
      }

      // If the client transport is not configured, default it to the server transport.
      if (clientTransport == null) {
        clientTransport = serverTransport;
      }

      // If no serializer instance was provided, create one.
      if (serializer == null) {
        serializer = new Serializer(new PooledHeapAllocator());
      }

      // Resolve serializable request/response and other types.
      serializer.resolve(new ClientRequestTypeResolver());
      serializer.resolve(new ClientResponseTypeResolver());
      serializer.resolve(new ProtocolSerialization());
      serializer.resolve(new ServerSerialization());
      serializer.resolve(new StorageSerialization());

      // If the storage is not configured, create a new Storage instance with the configured serializer.
      if (storage == null) {
        storage = new Storage(); //storage
      }

      ConnectionManager connections = new ConnectionManager(serverTransport.client());
      ThreadContext threadContext = new SingleThreadContext(String.format("copycat-server-%s-%s", serverAddress, name), serializer); //单线程的TreadContext,对thread的简单封装,用于执行对statemachine的操作

      ServerContext context = new ServerContext(name, type, serverAddress, clientAddress, storage, serializer, stateMachineFactory, connections, threadContext); //封装成ServerContext
      context.setElectionTimeout(electionTimeout)
        .setHeartbeatInterval(heartbeatInterval)
        .setSessionTimeout(sessionTimeout)
        .setGlobalSuspendTimeout(globalSuspendTimeout);

      return new CopycatServer(name, clientTransport, serverTransport, context);
    }

 

CopycatServer.bootstrap

public CompletableFuture<CopycatServer> bootstrap() {
return bootstrap(Collections.EMPTY_LIST); //仅仅拉起自己,所以参数是empty list
}

 

public CompletableFuture<CopycatServer> bootstrap(Collection<Address> cluster) {
return start(() -> cluster().bootstrap(cluster));
}
 
调用start,

/**
   * Starts the server.
   */
  private CompletableFuture<CopycatServer> start(Supplier<CompletableFuture<Void>> joiner) {
    if (started)
      return CompletableFuture.completedFuture(this);

    if (openFuture == null) {
      synchronized (this) {
        if (openFuture == null) {
          Function<Void, CompletionStage<CopycatServer>> completionFunction = state -> {
            CompletableFuture<CopycatServer> future = new CompletableFuture<>();
            openFuture = null;
            joiner.get().whenComplete((result, error) -> { //处理joiner
              if (error == null) {
                if (cluster().leader() != null) {
                  started = true;
                  future.complete(this);
                } else {
                  electionListener = cluster().onLeaderElection(leader -> {
                    if (electionListener != null) {
                      started = true;
                      future.complete(this);
                      electionListener.close();
                      electionListener = null;
                    }
                  });
                }
              } else {
                future.completeExceptionally(error);
              }
            });
            return future;
          };

          if (closeFuture == null) {
            openFuture = listen().thenCompose(completionFunction); //listen
          } else {
            openFuture = closeFuture.thenCompose(c -> listen().thenCompose(completionFunction));
          }
        }
      }
    }

start主要做两件事,执行joiner和listen

joiner这里是cluster().bootstrap(cluster)

@Override
  public CompletableFuture<Void> bootstrap(Collection<Address> cluster) {
    if (joinFuture != null)
      return joinFuture;

    if (configuration == null) {
      if (member.type() != Member.Type.ACTIVE) {
        return Futures.exceptionalFuture(new IllegalStateException("only ACTIVE members can bootstrap the cluster"));
      } else {
        // Create a set of active members.
        Set<Member> activeMembers = cluster.stream()
          .filter(m -> !m.equals(member.serverAddress()))
          .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated()))
          .collect(Collectors.toSet());

        // Add the local member to the set of active members.
        activeMembers.add(member);

        // Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration.
        configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers));
      }
    }
    return join();
  }

 

listen

/**
   * Starts listening the server.
   */
  private CompletableFuture<Void> listen() {
    CompletableFuture<Void> future = new CompletableFuture<>();
    context.getThreadContext().executor().execute(() -> {
      internalServer.listen(cluster().member().serverAddress(), context::connectServer).whenComplete((internalResult, internalError) -> { //internalServer可能是local或是netty
        if (internalError == null) {
          // If the client address is different than the server address, start a separate client server.
          if (clientServer != null) {
            clientServer.listen(cluster().member().clientAddress(), context::connectClient).whenComplete((clientResult, clientError) -> { //和client沟通可能是不同的地址
              started = true;
              future.complete(null);
            });
          } else {
            started = true;
            future.complete(null);
          }
        } else {
          future.completeExceptionally(internalError);
        }
      });
    });

    return future;
  }

ServerContext

/**
   * Handles a connection from a client.
   */
  public void connectClient(Connection connection) {
    threadContext.checkThread();

    // Note we do not use method references here because the "state" variable changes over time.
    // We have to use lambdas to ensure the request handler points to the current state.
    connection.handler(RegisterRequest.class, request -> state.register(request));
    connection.handler(ConnectRequest.class, request -> state.connect(request, connection));
    connection.handler(KeepAliveRequest.class, request -> state.keepAlive(request));
    connection.handler(UnregisterRequest.class, request -> state.unregister(request));
    connection.handler(CommandRequest.class, request -> state.command(request));
    connection.handler(QueryRequest.class, request -> state.query(request));

    connection.closeListener(stateMachine.executor().context().sessions()::unregisterConnection);
  }

  /**
   * Handles a connection from another server.
   */
  public void connectServer(Connection connection) {
    threadContext.checkThread();

    // Handlers for all request types are registered since requests can be proxied between servers.
    // Note we do not use method references here because the "state" variable changes over time.
    // We have to use lambdas to ensure the request handler points to the current state.
    connection.handler(RegisterRequest.class, request -> state.register(request));
    connection.handler(ConnectRequest.class, request -> state.connect(request, connection));
    connection.handler(KeepAliveRequest.class, request -> state.keepAlive(request));
    connection.handler(UnregisterRequest.class, request -> state.unregister(request));
    connection.handler(PublishRequest.class, request -> state.publish(request));
    connection.handler(ConfigureRequest.class, request -> state.configure(request));
    connection.handler(InstallRequest.class, request -> state.install(request));
    connection.handler(JoinRequest.class, request -> state.join(request));
    connection.handler(ReconfigureRequest.class, request -> state.reconfigure(request));
    connection.handler(LeaveRequest.class, request -> state.leave(request));
    connection.handler(AppendRequest.class, request -> state.append(request));
    connection.handler(PollRequest.class, request -> state.poll(request));
    connection.handler(VoteRequest.class, request -> state.vote(request));
    connection.handler(CommandRequest.class, request -> state.command(request));
    connection.handler(QueryRequest.class, request -> state.query(request));

    connection.closeListener(stateMachine.executor().context().sessions()::unregisterConnection);
  }

 

加入一个cluster

public CompletableFuture<CopycatServer> join(Collection<Address> cluster) {
    return start(() -> cluster().join(cluster));
  }

ClusterState.join,这里的逻辑和bootstrap类似

@Override
  public synchronized CompletableFuture<Void> join(Collection<Address> cluster) {

    // If no configuration was loaded from disk, create a new configuration.
    if (configuration == null) { //当不存在configuration
      // Create a set of cluster members, excluding the local member which is joining a cluster.
      Set<Member> activeMembers = cluster.stream()
        .filter(m -> !m.equals(member.serverAddress())) //过滤掉自己
        .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated())) //创建ServerMember对象
        .collect(Collectors.toSet());

      // If the set of members in the cluster is empty when the local member is excluded,
      // fail the join.
      if (activeMembers.isEmpty()) { //如果cluster为空
        return Futures.exceptionalFuture(new IllegalStateException("cannot join empty cluster"));
      }

      // Create a new configuration and configure the cluster. Once the cluster is configured, the configuration
      // will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.
      configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); //让configuration生效
    }
    return join();
  }

只是需要初始化configuration

然后调用join

/**
   * Starts the join to the cluster.
   */
  private synchronized CompletableFuture<Void> join() {
    joinFuture = new CompletableFuture<>();

    context.getThreadContext().executor().execute(() -> {
      // Transition the server to the appropriate state for the local member type.
      context.transition(member.type()); //将当前member transitioin到指定type

      // Attempt to join the cluster. If the local member is ACTIVE then failing to join the cluster
      // will result in the member attempting to get elected. This allows initial clusters to form.
      List<MemberState> activeMembers = getActiveMemberStates();
      if (!activeMembers.isEmpty()) {
        join(getActiveMemberStates().iterator()); //join 其他active members
      } else {
        joinFuture.complete(null);
      }
    });

    return joinFuture.whenComplete((result, error) -> joinFuture = null);
  }

 

/**
   * Recursively attempts to join the cluster.
   */
  private void join(Iterator<MemberState> iterator) {
    if (iterator.hasNext()) {
      cancelJoinTimer();
      joinTimeout = context.getThreadContext().schedule(context.getElectionTimeout().multipliedBy(2), () -> {
        join(iterator); //只要不成功,就会一直递归schedule join
      });

      MemberState member = iterator.next();
      LOGGER.debug("{} - Attempting to join via {}", member().address(), member.getMember().serverAddress());

      context.getConnections().getConnection(member.getMember().serverAddress()).thenCompose(connection -> {
        JoinRequest request = JoinRequest.builder()
          .withMember(new ServerMember(member().type(), member().serverAddress(), member().clientAddress(), member().updated()))
          .build();
        return connection.<JoinRequest, JoinResponse>send(request); //发送join request
      }).whenComplete((response, error) -> {
        // Cancel the join timer.
        cancelJoinTimer(); //先cancel join timer

        if (error == null) { //join 成功
          if (response.status() == Response.Status.OK) {
            LOGGER.info("{} - Successfully joined via {}", member().address(), member.getMember().serverAddress());

            Configuration configuration = new Configuration(response.index(), response.term(), response.timestamp(), response.members());

            // Configure the cluster with the join response.
            // Commit the configuration as we know it was committed via the successful join response.
            configure(configuration).commit(); //更新配置

          } else if (response.error() == null || response.error() == CopycatError.Type.CONFIGURATION_ERROR) {
            // If the response error is null, that indicates that no error occurred but the leader was
            // in a state that was incapable of handling the join request. Attempt to join the leader
            // again after an election timeout.
            LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address());
            resetJoinTimer();
          } else {
            // If the response error was non-null, attempt to join via the next server in the members list.
            LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address());
            join(iterator);
          }
        } else {
          LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address());
          join(iterator);
        }
      });
    }
    // If join attempts remain, schedule another attempt after two election timeouts. This allows enough time
    // for servers to potentially timeout and elect a leader.
    else {
      LOGGER.debug("{} - Failed to join cluster, retrying...", member.address());
      resetJoinTimer(); //如果遍历完还不成功,reset
    }
  }

对任何一个member,join成功,即可,因为join request无论发给谁,都会forward给leader

时间: 2024-10-02 19:51:46

Copycat - CopycatServer的相关文章

Copycat - Overview

Copycat's primary role is as a framework for building highly consistent, fault-tolerant replicated state machines. Copycat servers receive state machine operations from clients, log and replicate the operations as necessary, and apply them to a state

Copycat - MemberShip

为了便于实现,Copycat把member分成3种, active, passive, and reserve members - each of which play some role in supporting rapid replacement of failed servers.   Active members are full voting members which participate in all aspects of the Raft consensus algorith

Copycat - StateMachine

fxjwind Copycat - StateMachine 看下用户注册StateMachine的过程, CopycatServer.Builder builder = CopycatServer.builder(address); builder.withStateMachine(MapStateMachine::new); MapStateMachine::new这会构造一个supplier /** * Sets the Raft state machine factory. * * @p

Copycat - configure

Copycat server之间的configure是如何,何时被同步的?   大家可以看到,只有leader可以同步配置   1. 显式的调用LeaderState.configure LeaderState.configure /** * Commits the given configuration. */ protected CompletableFuture<Long> configure(Collection<Member> members) { final long

Copycat - AppendRequest

对于Command,Configuration都要通过appendEntries的方式,把Entries同步给follower LeaderState.configure /** * Commits the given configuration. */ protected CompletableFuture<Long> configure(Collection<Member> members) { final long index; try (ConfigurationEntry

不一样的海豚:极具硅谷范儿 不做中国Copycat

一个平均年龄25岁的技术团队如何打造出风靡智能终端市场的酷产品?这个主体是80后年轻人的酷公司孕育着创新公司的未来. 杨永智看起来一点也不像个极客(Geek),和公司里那些刚刚大学毕业的年轻人没什么不同. 在这间刚刚成立不到两年的技术公司里,平均年龄只有25岁,年纪最大的杨永智不过30岁出头,他的"老搭档"刘铁峰生于1983年.而在公司创立之初,团队中的两个人甚至还在华中科技大学读研究生,没有毕业. 这家初创公司名为百纳信息(MoboTap),它看上去也很"中国",

Check Point解析CopyCat恶意软件 如何感染全球Android设备

Check Point以色列捷邦安全软件科技有限公司的移动威胁研究人员最近发现一种针对移动设备的恶意软件,并称之为CopyCat.它目前已经感染了1,400万部安卓设备,获取了其中800万部的root权限,在短短两个月内,黑客通过使用CopyCat 已经赚取了约150万美元的假冒广告收入. Check Point 移动威胁研究人员指出,CopyCat采用一种新的技术来制造和窃取广告收入.CopyCat 主要感染位于东南亚的用户,但它也传播至美国,感染了超过28万名安卓设备用户. CopyCat是

Copycat - command

client.submit(new PutCommand("foo", "Hello world!"));   ServerContext connection.handler(CommandRequest.class, request -> state.command(request));   State.command ReserveState开始,会把command forward到leader,只有leader可以处理command @Override

在增长中市场做copycat也有春天:德国孵化器Roc

摘要: 随着德国电商网站Zalando的上市,德国孵化器公司Rocket Internet也即将IPO,在经历了IPO每股预期价格的提升后,其IPO整体规模或达84亿美元,计划于10月9日在法兰克福证交所上市. Rocket I 随着德国电商网站Zalando的上市,德国孵化器公司Rocket Internet也即将IPO,在经历了IPO每股预期价格的提升后,其IPO整体规模或达84亿美元,计划于10月9日在法兰克福证交所上市. Rocket Internet是一家商业模式非常有趣的公司,从其官