Copycat - StateMachine

fxjwind

Copycat - StateMachine

看下用户注册StateMachine的过程,

CopycatServer.Builder builder = CopycatServer.builder(address);
builder.withStateMachine(MapStateMachine::new);

MapStateMachine::new这会构造一个supplier

/**
     * Sets the Raft state machine factory.
     *
     * @param factory The Raft state machine factory.
     * @return The server builder.
     * @throws NullPointerException if the {@code factory} is {@code null}
     */
    public Builder withStateMachine(Supplier<StateMachine> factory) {
      this.stateMachineFactory = Assert.notNull(factory, "factory");
      return this;
    }

在build中,传入初始化ServerContext

ServerContext context = new ServerContext(name, type, serverAddress, clientAddress, storage, serializer, stateMachineFactory, connections, threadContext);

ServerContext中,

this.stateMachineFactory = Assert.notNull(stateMachineFactory, "stateMachineFactory");
threadContext.execute(this::reset).join();

 

reset逻辑中,

    // Create a new user state machine.
    StateMachine stateMachine = stateMachineFactory.get();

    // Create a new internal server state machine.
    this.stateMachine = new ServerStateMachine(stateMachine, this, stateContext);

这里看到stateContext的定义,

this.stateContext = new SingleThreadContext(String.format("copycat-server-%s-%s-state", serverAddress, name), threadContext.serializer().clone());

也是一个单线程,所以这里有两个threadContext

这个stateContex是专门用于更新state

 

ServerStateMachine,用于管理StateMachine

ServerStateMachine(StateMachine stateMachine, ServerContext state, ThreadContext executor) {
    this.stateMachine = Assert.notNull(stateMachine, "stateMachine");
    this.state = Assert.notNull(state, "state");
    this.log = state.getLog();
    this.executor = new ServerStateMachineExecutor(new ServerStateMachineContext(state.getConnections(), new ServerSessionManager(state)), executor);
    this.commits = new ServerCommitPool(log, this.executor.context().sessions());
    init();
  }

 

ServerStateMachineExecutor

作为StateMachine的执行环境

class ServerStateMachineExecutor implements StateMachineExecutor {
  private static final Logger LOGGER = LoggerFactory.getLogger(ServerStateMachineExecutor.class);
  private final ThreadContext executor;
  private final ServerStateMachineContext context;
  private final Queue<ServerTask> tasks = new ArrayDeque<>();
  private final List<ServerScheduledTask> scheduledTasks = new ArrayList<>();
  private final List<ServerScheduledTask> complete = new ArrayList<>();
  private final Map<Class, Function> operations = new HashMap<>();

 

init

/**
   * Initializes the state machine.
   */
  private void init() {
    stateMachine.init(executor);
  }

注意这里stateMachine类是用户定义的,

public void init(StateMachineExecutor executor) {
    this.executor = Assert.notNull(executor, "executor");
    this.context = executor.context();
    this.clock = context.clock();
    this.sessions = context.sessions();
    if (this instanceof SessionListener) {
      executor.context().sessions().addListener((SessionListener) this);
    }
    configure(executor);
  }

configure

protected void configure(StateMachineExecutor executor) {
    registerOperations();
  }

 

/**
   * Registers operations for the class.
   */
  private void registerOperations() {
    Class<?> type = getClass();
    for (Method method : type.getMethods()) {
      if (isOperationMethod(method)) {
        registerMethod(method);
      }
    }
  }

  /**
   * Returns a boolean value indicating whether the given method is an operation method.
   */
  private boolean isOperationMethod(Method method) {
    Class<?>[] paramTypes = method.getParameterTypes();
    return paramTypes.length == 1 && paramTypes[0] == Commit.class;
  }

 

我们看下,用户是如何定义operations的?

public class MapStateMachine extends StateMachine {
  private Map<Object, Object> map = new HashMap<>();

  public Object put(Commit<PutCommand> commit) {
    try {
      map.put(commit.operation().key(), commit.operation().value());
    } finally {
      commit.close();
    }
  }

  public Object get(Commit<GetQuery> commit) {
    try {
      return map.get(commit.operation().key());
    } finally {
      commit.close();
    }
  }
}

你就理解这里通过reflection来找到Operation,

逻辑就是有一个参数,参数的类型是Commit

如果是Operation,调用registerMethod

private void registerMethod(Method method) {
    Type genericType = method.getGenericParameterTypes()[0];
    Class<?> argumentType = resolveArgument(genericType);
    if (argumentType != null && Operation.class.isAssignableFrom(argumentType)) {
      registerMethod(argumentType, method);
    }
  }

取得泛型的类型,例子里面的Put

private void registerMethod(Class<?> type, Method method) {
    Class<?> returnType = method.getReturnType();
    if (returnType == void.class || returnType == Void.class) {
      registerVoidMethod(type, method);
    } else {
      registerValueMethod(type, method);
    }
  }

private void registerValueMethod(Class type, Method method) {
    executor.register(type, wrapValueMethod(method));
  }

  /**
   * Wraps a value method.
   */
  private Function wrapValueMethod(Method method) {
    return c -> {
      try {
        return method.invoke(this, c);
      } catch (InvocationTargetException e) {
        throw new CommandException(e);
      } catch (IllegalAccessException e) {
        throw new AssertionError(e);
      }
    };
  }

 

ServerStateMachineExecutor.register

@Override
  public <T extends Operation<U>, U> StateMachineExecutor register(Class<T> type, Function<Commit<T>, U> callback) {
    operations.put(type, callback);
    return this;
  }

这里,会把operations注册到ServerStateMachineExecutor里面,便于后面调用

 

继续ServerStateMachine,

ServerStateMachine最主要的逻辑,就是apply,即把command apply到state machine上,

 

可以apply到某index为止的所有commit

/**
   * Applies all commits up to the given index.
   * <p>
   * Calls to this method are assumed not to expect a result. This allows some optimizations to be
   * made internally since linearizable events don't have to be waited to complete the command.
   *
   * @param index The index up to which to apply commits.
   */
  public void applyAll(long index) {
    // If the effective commit index is greater than the last index applied to the state machine then apply remaining entries.
    long lastIndex = Math.min(index, log.lastIndex());
    if (lastIndex > lastApplied) {
      for (long i = lastApplied + 1; i <= lastIndex; i++) { // 接着上次最后apply的index,继续
        Entry entry = log.get(i);
        if (entry != null) {
          apply(entry).whenComplete((result, error) -> entry.release());
        }
        setLastApplied(i);
      }
    }
  }

也可以单独apply一条index对应的entry

public <T> CompletableFuture<T> apply(long index) {
    // If entries remain to be applied prior to this entry then synchronously apply them.
    if (index > lastApplied + 1) {
      applyAll(index - 1);  //按顺序apply,所以之前的先要apply掉
    }

    // Read the entry from the log. If the entry is non-null them apply the entry, otherwise
    // simply update the last applied index and return a null result.
    try (Entry entry = log.get(index)) {
      if (entry != null) {
        return apply(entry);
      } else {
        return CompletableFuture.completedFuture(null);
      }
    } finally {
      setLastApplied(index);
    }
  }

 

apply(entry)

/**
   * Applies an entry to the state machine.
   * <p>
   * Calls to this method are assumed to expect a result. This means linearizable session events
   * triggered by the application of the given entry will be awaited before completing the returned future.
   *
   * @param entry The entry to apply.
   * @return A completable future to be completed with the result.
   */
  @SuppressWarnings("unchecked")
  public <T> CompletableFuture<T> apply(Entry entry) {
    if (entry instanceof QueryEntry) {
      return (CompletableFuture<T>) apply((QueryEntry) entry);
    } else if (entry instanceof CommandEntry) {
      return (CompletableFuture<T>) apply((CommandEntry) entry);
    } else if (entry instanceof RegisterEntry) {
      return (CompletableFuture<T>) apply((RegisterEntry) entry);
    } else if (entry instanceof KeepAliveEntry) {
      return (CompletableFuture<T>) apply((KeepAliveEntry) entry);
    } else if (entry instanceof UnregisterEntry) {
      return (CompletableFuture<T>) apply((UnregisterEntry) entry);
    } else if (entry instanceof InitializeEntry) {
      return (CompletableFuture<T>) apply((InitializeEntry) entry);
    } else if (entry instanceof ConfigurationEntry) {
      return (CompletableFuture<T>) apply((ConfigurationEntry) entry);
    }
    return Futures.exceptionalFuture(new InternalException("unknown state machine operation"));
  }

 

看到不同的entry类型有不同的apply逻辑,

apply((CommandEntry) entry)

private CompletableFuture<Result> apply(CommandEntry entry) {
    final CompletableFuture<Result> future = new CompletableFuture<>();
    final ThreadContext context = ThreadContext.currentContextOrThrow(); //这里保留当前thread的引用

    // First check to ensure that the session exists.
    ServerSessionContext session = executor.context().sessions().getSession(entry.getSession());

    // If the session is null, return an UnknownSessionException. Commands applied to the state machine must
    // have a session. We ensure that session register/unregister entries are not compacted from the log
    // until all associated commands have been cleaned.
    if (session == null) { //session不存在
      log.release(entry.getIndex());
      return Futures.exceptionalFuture(new UnknownSessionException("unknown session: " + entry.getSession()));
    }
    // If the session is not in an active state, return an UnknownSessionException. Sessions are retained in the
    // session registry until all prior commands have been released by the state machine, but new commands can
    // only be applied for sessions in an active state.
    else if (!session.state().active()) { //session的状态非active
      log.release(entry.getIndex());
      return Futures.exceptionalFuture(new UnknownSessionException("inactive session: " + entry.getSession()));
    }
    // If the command's sequence number is less than the next session sequence number then that indicates that
    // we've received a command that was previously applied to the state machine. Ensure linearizability by
    // returning the cached response instead of applying it to the user defined state machine.
    else if (entry.getSequence() > 0 && entry.getSequence() < session.nextCommandSequence()) { //已经apply过的entry
      // Ensure the response check is executed in the state machine thread in order to ensure the
      // command was applied, otherwise there will be a race condition and concurrent modification issues.
      long sequence = entry.getSequence();

      // Switch to the state machine thread and get the existing response.
      executor.executor().execute(() -> sequenceCommand(sequence, session, future, context)); //直接返回之前apply的结果
      return future;
    }
    // If we've made it this far, the command must have been applied in the proper order as sequenced by the
    // session. This should be the case for most commands applied to the state machine.
    else {
      // Allow the executor to execute any scheduled events.
      long index = entry.getIndex();
      long sequence = entry.getSequence();

      // Calculate the updated timestamp for the command.
      long timestamp = executor.timestamp(entry.getTimestamp());

      // Execute the command in the state machine thread. Once complete, the CompletableFuture callback will be completed
      // in the state machine thread. Register the result in that thread and then complete the future in the caller's thread.
      ServerCommit commit = commits.acquire(entry, session, timestamp); //这里有个ServerCommitPool的实现,为了避免反复生成ServerCommit对象,直接从pool里面拿一个,用完放回去
      executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));

      // Update the last applied index prior to the command sequence number. This is necessary to ensure queries sequenced
      // at this index receive the index of the command.
      setLastApplied(index);

      // Update the session timestamp and command sequence number. This is done in the caller's thread since all
      // timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread.
      session.setTimestamp(timestamp).setCommandSequence(sequence);
      return future;
    }
  }

 

executeCommand
ServerCommit commit = commits.acquire(entry, session, timestamp);
executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));

注意这里有两个线程,

一个是context,是

ThreadContext threadContext

用来响应server请求的

还有一个是executor里面的stateContext,用来改变stateMachine的状态的

所以这里是用executor来执行executeCommand,但把ThreadContext传入

/**
   * Executes a state machine command.
   */
  private void executeCommand(long index, long sequence, long timestamp, ServerCommit commit, ServerSessionContext session, CompletableFuture<Result> future, ThreadContext context) {

    // Trigger scheduled callbacks in the state machine.
    executor.tick(index, timestamp);

    // Update the state machine context with the commit index and local server context. The synchronous flag
    // indicates whether the server expects linearizable completion of published events. Events will be published
    // based on the configured consistency level for the context.
    executor.init(commit.index(), commit.time(), ServerStateMachineContext.Type.COMMAND);

    // Store the event index to return in the command response.
    long eventIndex = session.getEventIndex();

    try {
      // Execute the state machine operation and get the result.
      Object output = executor.executeOperation(commit);

      // Once the operation has been applied to the state machine, commit events published by the command.
      // The state machine context will build a composite future for events published to all sessions.
      executor.commit();

      // Store the result for linearizability and complete the command.
      Result result = new Result(index, eventIndex, output);
      session.registerResult(sequence, result); // 缓存执行结果
      context.executor().execute(() -> future.complete(result)); // complete future,表示future执行结束
    } catch (Exception e) {
      // If an exception occurs during execution of the command, store the exception.
      Result result = new Result(index, eventIndex, e);
      session.registerResult(sequence, result);
      context.executor().execute(() -> future.complete(result));
    }
  }

 

ServerStateMachineExecutor.tick
根据时间,去触发scheduledTasks中已经到时间的task
 
ServerStateMachineExecutor.init
更新state machine的context

void init(long index, Instant instant, ServerStateMachineContext.Type type) {
    context.update(index, instant, type);
  }

  //ServerStateMachineContext
  void update(long index, Instant instant, Type type) {
    this.index = index;
    this.type = type;
    clock.set(instant);
  }

 
ServerStateMachineExecutor.executeOperation

<T extends Operation<U>, U> U executeOperation(Commit commit) {

    // Get the function registered for the operation. If no function is registered, attempt to
    // use a global function if available.
    Function function = operations.get(commit.type()); //从operations找到type对应的function

    if (function == null) {
      // If no operation function was found for the class, try to find an operation function
      // registered with a parent class.
      for (Map.Entry<Class, Function> entry : operations.entrySet()) {
        if (entry.getKey().isAssignableFrom(commit.type())) { //如果注册的type是commit.type的父类
          function = entry.getValue();
          break;
        }
      }

      // If a parent operation function was found, store the function for future reference.
      if (function != null) {
        operations.put(commit.type(), function);
      }
    }

    if (function == null) {
      throw new IllegalStateException("unknown state machine operation: " + commit.type());
    } else {
      // Execute the operation. If the operation return value is a Future, await the result,
      // otherwise immediately complete the execution future.
      try {
        return (U) function.apply(commit); //真正执行function
      } catch (Exception e) {
        throw new ApplicationException(e, "An application error occurred");
      }
    }
  }
时间: 2024-11-05 21:48:16

Copycat - StateMachine的相关文章

Copycat - Overview

Copycat's primary role is as a framework for building highly consistent, fault-tolerant replicated state machines. Copycat servers receive state machine operations from clients, log and replicate the operations as necessary, and apply them to a state

Copycat - configure

Copycat server之间的configure是如何,何时被同步的?   大家可以看到,只有leader可以同步配置   1. 显式的调用LeaderState.configure LeaderState.configure /** * Commits the given configuration. */ protected CompletableFuture<Long> configure(Collection<Member> members) { final long

Copycat - command

client.submit(new PutCommand("foo", "Hello world!"));   ServerContext connection.handler(CommandRequest.class, request -> state.command(request));   State.command ReserveState开始,会把command forward到leader,只有leader可以处理command @Override

Copycat - MemberShip

为了便于实现,Copycat把member分成3种, active, passive, and reserve members - each of which play some role in supporting rapid replacement of failed servers.   Active members are full voting members which participate in all aspects of the Raft consensus algorith

不一样的海豚:极具硅谷范儿 不做中国Copycat

一个平均年龄25岁的技术团队如何打造出风靡智能终端市场的酷产品?这个主体是80后年轻人的酷公司孕育着创新公司的未来. 杨永智看起来一点也不像个极客(Geek),和公司里那些刚刚大学毕业的年轻人没什么不同. 在这间刚刚成立不到两年的技术公司里,平均年龄只有25岁,年纪最大的杨永智不过30岁出头,他的"老搭档"刘铁峰生于1983年.而在公司创立之初,团队中的两个人甚至还在华中科技大学读研究生,没有毕业. 这家初创公司名为百纳信息(MoboTap),它看上去也很"中国",

Copycat - CopycatServer

Server被拉起有两种方式, Address address = new Address("123.456.789.0", 5000); CopycatServer.Builder builder = CopycatServer.builder(address); builder.withStateMachine(MapStateMachine::new); 自己拉起一个cluster, CompletableFuture<CopycatServer> future =

Check Point解析CopyCat恶意软件 如何感染全球Android设备

Check Point以色列捷邦安全软件科技有限公司的移动威胁研究人员最近发现一种针对移动设备的恶意软件,并称之为CopyCat.它目前已经感染了1,400万部安卓设备,获取了其中800万部的root权限,在短短两个月内,黑客通过使用CopyCat 已经赚取了约150万美元的假冒广告收入. Check Point 移动威胁研究人员指出,CopyCat采用一种新的技术来制造和窃取广告收入.CopyCat 主要感染位于东南亚的用户,但它也传播至美国,感染了超过28万名安卓设备用户. CopyCat是

Copycat - AppendRequest

对于Command,Configuration都要通过appendEntries的方式,把Entries同步给follower LeaderState.configure /** * Commits the given configuration. */ protected CompletableFuture<Long> configure(Collection<Member> members) { final long index; try (ConfigurationEntry

在增长中市场做copycat也有春天:德国孵化器Roc

摘要: 随着德国电商网站Zalando的上市,德国孵化器公司Rocket Internet也即将IPO,在经历了IPO每股预期价格的提升后,其IPO整体规模或达84亿美元,计划于10月9日在法兰克福证交所上市. Rocket I 随着德国电商网站Zalando的上市,德国孵化器公司Rocket Internet也即将IPO,在经历了IPO每股预期价格的提升后,其IPO整体规模或达84亿美元,计划于10月9日在法兰克福证交所上市. Rocket Internet是一家商业模式非常有趣的公司,从其官