Copycat’s primary role is as a framework for building highly consistent, fault-tolerant replicated state machines.
Copycat servers receive state machine operations from clients, log and replicate the operations as necessary, and apply them to a state machine on each server.
State machine operations are guaranteed to be applied in the same order on all servers, and Copycat handles the persistence and replication of the state machine state internally.
Copycat是用来管理分布式状态机的,要保证所有操作以相同的顺序在每个server上被执行,从而得到一直的状态机的状态
为了做fault-tolerant,当状态机crash可以恢复,所以要先把operation写入log,并保证所有server上的log是一致的,这样只需要按log回放就可以得到一致的状态
这种replication技术,成为operation transfer
还有一种是state transfer
也可以参考kudu的论文,kudu
Kudu does not replicate the on-disk storage of a tablet, but rather just its operation log.
The physical storage of each replica of a tablet is fully decoupled.
这样做对于server的状态机,或kudu所说的tablet存储是不感知分布式的,fully decoupled;
用户使用Copycat,
首先需要创建一个statemachine类,这就是用户需要同步的对象,
public class MapStateMachine extends StateMachine { }
Copycat replicated state machines are modified and queried by defining operations through which a client and state machine can communicate.
Operations are replicated by the Copycat cluster and are translated into arguments to methods on the replicated state machine.
Users must define the interface between the client and the cluster by implementing Operation
classes that clients will submit to the replicated state machine.
然后用户要定义,这个StateMachine之上的操作,
操作分为两类,
Command,可以修改状态机
Query,只读
Command
For example, in a map state machine some commands might include put
and remove
. To implement a state machine command, simply implement theCommand
interface.
public class PutCommand implements Command<Object> { private final Object key; private final Object value; public PutCommand(Object key, Object value) { this.key = key; this.value = value; } public Object key() { return key; } public Object value() { return value; } }
上面就定义一个put command,这个命令就是要把key:value put到状态机
Query
Queries are state machine operations that read the system’s state but do not modify it. For example, in a map state machine some queries might include get
, size
, and isEmpty
. To implement a state machine query, implement the Query
interface.
public class GetQuery implements Query<Object> { private final Object key; public GetQuery(Object key) { this.key = key; } public Object key() { return key; } }
再者,要在状态机上实现这些操作,
Implementing State Machine Operations
State machine operations are implemented as public
methods on the state machine class which accept a singleCommit
parameter where the generic argument for the commit is the operation accepted by the method. Copycat automatically detects the command or query that applies to a given state machine methods based on the generic argument to the Commit
parameter.
public class MapStateMachine extends StateMachine { private Map<Object, Object> map = new HashMap<>(); public Object put(Commit<PutCommand> commit) { try { map.put(commit.operation().key(), commit.operation().value()); } finally { commit.close(); } } public Object get(Commit<GetQuery> commit) { try { return map.get(commit.operation().key()); } finally { commit.close(); } } }
Commit可以认为是command的封装
snapshot逻辑的实现,
State machine operations are replicated and written to a log on disk on each server in the cluster.
As commands are submitted to the cluster over time, the disk capacity will eventually be consumed.
Copycat must periodically remove unneeded commands from the replicated log to conserve disk space. This is known as log compaction.
log越来越大就需要删掉老的log,但是为了保证数据不丢,就需要把当前的statemachine做snapshot存储下来;这样就可以把当前状态以前的log给删除掉
public class MapStateMachine extends StateMachine implements Snapshottable { private Map<Object, Object> map = new HashMap<>(); @Override public void snapshot(SnapshotWriter writer) { writer.writeObject(map); } @Override public void install(SnapshotReader reader) { map = reader.readObject(); } }
For snapshottable state machines, Copycat will periodically request a binary snapshot of the state machine’s state and write the snapshot to disk. If the server is restarted, the state machine’s state will be recovered from the on-disk snapshot. When a new server joins the cluster, the snapshot of the state machine will be replicated to the joining server to catch up its state. This allows Copycat to remove commits that contributed to the snapshot from the replicated log, thus conserving disk space.
最后,创建cluster
1. 先建立一个server,
Once a state machine and its operations have been defined, we can create a CopycatServer
to manage the state machine.
Address address = new Address("123.456.789.0", 5000); CopycatServer.Builder builder = CopycatServer.builder(address); builder.withStateMachine(MapStateMachine::new);
用我们上面定义的MapStateMachine,拉起server
builder.withTransport(NettyTransport.builder() .withThreads(4) .build()); builder.withStorage(Storage.builder() .withDirectory(new File("logs")) .withStorageLevel(StorageLevel.DISK) .build()); CopycatServer server = builder.build();
可以自定义的,transport和storage
注册我们定义的command
One final task is necessary to complete the configuration of the server. We’ve created two state machine operations -PutCommand
and GetQuery
- which are Serializable
. By default, Copycat’s serialization framework will serialize these operations using Java’s serialization. However, users can explicitly register serializable classes and implement custom binary serializers for more efficient serialization.
server.serializer().register(PutCommand.class);
server.serializer().register(GetQuery.class);
serializer默认是Java’s serialization,如果对性能有要求,可以自己实现序列化
2. 拉起集群
Bootstrapping the Cluster
Once the server has been built, we can bootstrap a new cluster by calling the bootstrap()
method:
CompletableFuture<CopycatServer> future = server.bootstrap();
future.join();
When a server is bootstrapped, it forms a new single node cluster to which additional servers can be joined.
3. 加入已有集群
Joining an Existing Cluster
Once an initial cluster has been bootstrapped, additional servers can be added to the cluster via the join()
method. When joining an existing cluster, the existing cluster configuration must be provided to the join
method:
Collection<Address> cluster = Collections.singleton(new Address("127.0.0.1", 8700))
server.join(cluster).join();