CQRS框架:AxonFramework 之 Hello World

Command Query Responsibility Segregation,CQRS 这个架构好象最近博客园里讨论得比较多,有几篇园友的文章很有深度,推荐阅读:

CQRS架构简介 

浅谈命令查询职责分离(CQRS)模式

DDD CQRS架构和传统架构的优缺点比较

比较有趣的是,以往一断谈及架构思路、OO这些,往往都是java大佬们的专长,而CQRS这个话题,好象.NET占了上风。园友汤雪华ENODE开源大作,在github上人气也很旺。

于是,我逆向思路搜索了下java的类似项目,果然有一个AxonFramework,甚至还有一个专门的网站。按文档上的介绍,弄了一个hello world,记录一下:

CRQS是基于事件驱动的,其主要架构并不复杂,见下图:

简单来讲,对数据库的修改操作,UI层只管发送各种命令(Command),触发事件(Event),然后由EventHandler去异步处理,最终写入master DB,对于数据库的查询,则查询slave DB(注:这里的master db, slave db只是一个逻辑上的区分,可以是真正的主-从库,也可以都是一个库)。 这样的架构,很容易实现读写分离,也易于大型项目的扩展。

项目结构:

package的名称上大概就能看出用途:

command包定义各种命令,

event包定义各种事件,

handler包定义事件处理逻辑,

model包相当于领域模型

最外层的ToDOItemRunner相当于应用程序入口。

gradle依赖项:

group 'yjmyzz'
version '1.0'

apply plugin: 'java'
apply plugin: 'application'

sourceCompatibility = 1.8

repositories {
    mavenLocal()
    maven {
        url 'http://maven.oschina.net/content/groups/public/'
    }
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile "org.axonframework:axon:2.4.3"
    compile "org.axonframework:axon-core:2.4.3"
    compile "org.axonframework:axon-test:2.4.3"
    compile 'org.springframework:spring-core:4.2.3.RELEASE'
    compile 'org.springframework:spring-beans:4.2.3.RELEASE'
    compile 'org.springframework:spring-context:4.2.3.RELEASE'
    compile 'org.springframework:spring-context-support:4.2.3.RELEASE'
    compile 'org.springframework:spring-aop:4.2.3.RELEASE'
    compile 'org.springframework:spring-test:4.2.3.RELEASE'
    compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.5'
    compile 'org.apache.logging.log4j:log4j-core:2.5'
    compile 'javax.persistence:persistence-api:2.1'
}

mainClassName='demo.axon.ToDoItemRunner'

command命令:

这里我们假设了二个命令:创建命令、完成命令

CreateToDoItemCommand

package demo.axon.command;

import org.axonframework.commandhandling.annotation.TargetAggregateIdentifier;

public class CreateToDoItemCommand {

    @TargetAggregateIdentifier
    private final String todoId;
    private final String description;

    public CreateToDoItemCommand(String todoId, String description) {
        this.todoId = todoId;
        this.description = description;
    }

    public String getTodoId() {
        return todoId;
    }

    public String getDescription() {
        return description;
    }
}

MarkCompletedCommand

package demo.axon.command;

import org.axonframework.commandhandling.annotation.TargetAggregateIdentifier;

public class MarkCompletedCommand {

    @TargetAggregateIdentifier
    private final String todoId;

    public MarkCompletedCommand(String todoId) {
        this.todoId = todoId;
    }

    public String getTodoId() {
        return todoId;
    }
}

Event事件:

ToDoItemCreatedEvent

package demo.axon.event;

public class ToDoItemCreatedEvent {

    private final String todoId;
    private final String description;

    public ToDoItemCreatedEvent(String todoId, String description) {
        this.todoId = todoId;
        this.description = description;
    }

    public String getTodoId() {
        return todoId;
    }

    public String getDescription() {
        return description;
    }
}
ToDoItemCompletedEvent
package demo.axon.event;

public class ToDoItemCompletedEvent {

    private final String todoId;

    public ToDoItemCompletedEvent(String todoId) {
        this.todoId = todoId;
    }

    public String getTodoId() {
        return todoId;
    }
}

EventHandler事件处理

package demo.axon.handler;

import demo.axon.event.ToDoItemCompletedEvent;
import demo.axon.event.ToDoItemCreatedEvent;
import org.axonframework.eventhandling.annotation.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ToDoEventHandler {

    Logger logger = LoggerFactory.getLogger(ToDoEventHandler.class);

    @EventHandler
    public void handle(ToDoItemCreatedEvent event) {
        logger.info("We've got something to do: " + event.getDescription() + " (" + event.getTodoId() + ")");
    }

    @EventHandler
    public void handle(ToDoItemCompletedEvent event) {
        logger.info("We've completed a task: " + event.getTodoId());
    }
}

上面的代码只是演示,将事件信息输出而已,真实应用中,这里可以完成对db的更新操作。 

领域模型model

package demo.axon.model;

import demo.axon.command.CreateToDoItemCommand;
import demo.axon.command.MarkCompletedCommand;
import demo.axon.event.ToDoItemCompletedEvent;
import demo.axon.event.ToDoItemCreatedEvent;
import org.axonframework.commandhandling.annotation.CommandHandler;
import org.axonframework.eventhandling.annotation.EventHandler;
import org.axonframework.eventsourcing.annotation.AbstractAnnotatedAggregateRoot;
import org.axonframework.eventsourcing.annotation.AggregateIdentifier;

public class ToDoItem extends AbstractAnnotatedAggregateRoot {

    @AggregateIdentifier
    private String id;

    public ToDoItem() {
    }

    @CommandHandler
    public ToDoItem(CreateToDoItemCommand command) {
        apply(new ToDoItemCreatedEvent(command.getTodoId(), command.getDescription()));
    }

    @EventHandler
    public void on(ToDoItemCreatedEvent event) {
        this.id = event.getTodoId();
    }

    @CommandHandler
    public void markCompleted(MarkCompletedCommand command) {
        apply(new ToDoItemCompletedEvent(id));
    }
}

然后让Spring将这些东西串在一起,配置文件如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xmlns:axon="http://www.axonframework.org/schema/core"
 5        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 6                            http://www.axonframework.org/schema/core http://www.axonframework.org/schema/axon-core-2.0.xsd">
 7
 8     <axon:command-bus id="commandBus"/>
 9     <axon:event-bus id="eventBus"/>
10
11     <axon:event-sourcing-repository id="toDoRepository"
12                                     aggregate-type="demo.axon.model.ToDoItem"/>
13
14     <axon:aggregate-command-handler id="toDoItemHandler"
15                                     aggregate-type="demo.axon.model.ToDoItem"
16                                     repository="toDoRepository"
17                                     command-bus="commandBus"/>
18
19     <axon:filesystem-event-store id="eventStore" base-dir="events"/>
20
21     <axon:annotation-config />
22
23     <bean class="demo.axon.handler.ToDoEventHandler"/>
24
25     <bean class="org.axonframework.commandhandling.gateway.CommandGatewayFactoryBean">
26         <property name="commandBus" ref="commandBus"/>
27     </bean>
28
29 </beans>

View Code

最后,提供一个舞台,让整个应用run起来:

package demo.axon;

import demo.axon.command.CreateToDoItemCommand;
import demo.axon.command.MarkCompletedCommand;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.UUID;

public class ToDoItemRunner {

    private CommandGateway commandGateway;

    public ToDoItemRunner(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }

    public static void main(String[] args) {
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("sampleContext.xml");
        ToDoItemRunner runner = new ToDoItemRunner(applicationContext.getBean(CommandGateway.class));
        runner.run();
    }

    private void run() {
        final String itemId = UUID.randomUUID().toString();
        commandGateway.send(new CreateToDoItemCommand(itemId, "Need to do this"));
        commandGateway.send(new MarkCompletedCommand(itemId));
    }
}

输出结果:

12:01:36,113 <demo.axon.handler.ToDoEventHandler>  INFO [main]: We've got something to do: Need to do this (3126f293-67fd-4bb7-b152-069acba775b6)
12:01:36,114 <org.axonframework.commandhandling.callbacks.LoggingCallback>  INFO [main]: Command executed successfully: demo.axon.command.CreateToDoItemCommand
12:01:36,205 <demo.axon.handler.ToDoEventHandler>  INFO [main]: We've completed a task: 3126f293-67fd-4bb7-b152-069acba775b6
12:01:36,206 <org.axonframework.commandhandling.callbacks.LoggingCallback>  INFO [main]: Command executed successfully: demo.axon.command.MarkCompletedCommand

axon框架测试也很容易:

package test.demo.axon;

import demo.axon.command.CreateToDoItemCommand;
import demo.axon.command.MarkCompletedCommand;
import demo.axon.event.ToDoItemCompletedEvent;
import demo.axon.event.ToDoItemCreatedEvent;
import demo.axon.model.ToDoItem;
import org.axonframework.test.FixtureConfiguration;
import org.axonframework.test.Fixtures;
import org.junit.Before;
import org.junit.Test;

public class ToDoItemTest {

    private FixtureConfiguration fixture;

    @Before
    public void setUp() throws Exception {
        fixture = Fixtures.newGivenWhenThenFixture(ToDoItem.class);
    }

    @Test
    public void testCreateToDoItem() throws Exception {
        fixture.given()
                .when(new CreateToDoItemCommand("todo1", "need to implement the aggregate"))
                .expectEvents(new ToDoItemCreatedEvent("todo1", "need to implement the aggregate"));
    }

    @Test
    public void testMarkToDoItemAsCompleted() throws Exception {
        fixture.given(new ToDoItemCreatedEvent("todo1", "need to implement the aggregate"))
                .when(new MarkCompletedCommand("todo1"))
                .expectEvents(new ToDoItemCompletedEvent("todo1"));
    }

}

given/when/expectEvents的意思是,给(given)一个事件,然后当(when)某个命令被调用时,期待(expectEvents)某个事件被触发。

最后 github上还有一个比较复杂的示例项目:https://github.com/AxonFramework/Axon-trader,想深入了解的可以研究下

时间: 2024-09-11 11:35:12

CQRS框架:AxonFramework 之 Hello World的相关文章

DDD CQRS架构和传统架构的优缺点比较

明天就是大年三十了,今天在家有空,想集中整理一下CQRS架构的特点以及相比传统架构的优缺点分析.先提前祝大家猴年新春快乐.万事如意.身体健康! 最近几年,在DDD的领域,我们经常会看到CQRS架构的概念.我个人也写了一个ENode框架,专门用来实现这个架构.CQRS架构本身的思想其实非常简单,就是读写分离.是一个很好理解的思想.就像我们用MySQL数据库的主备,数据写到主,然后查询从备来查,主备数据的同步由MySQL数据库自己负责,这是一种数据库层面的读写分离.关于CQRS架构的介绍其实已经非常

CQRS架构PPT分享

好久没有写文章了,最近工作比较忙.下周要到公司另一个部门做CQRS的分享,所以用一周时间整理了一个PPT.为了方便大家查看,我想直接贴到博客里最简单直接. CQRS是一个不错的架构,但是要真正实践,还是很难的.我虽然学习了很多的理论,框架也实践了不少.但要真正应用到实际项目中,还是不那么容易的.到目前为止我个人也只在一个项目中实践过,但当初实践的时候也没有采用本PPT所提到的最终一致性的技术.不过我想,有兴趣就要坚持,坚持就是胜利.最近我在做另一个CQRS的案例,就是微软的那个CQRS Conf

谈一下关于CQRS架构如何实现高性能

CQRS架构简介 前不久,看到博客园一位园友写了一篇文章,其中的观点是,要想高性能,需要尽量:避开网络开销(IO),避开海量数据,避开资源争夺.对于这3点,我觉得很有道理.所以也想谈一下,CQRS架构下是如何实现高性能的. 关于CQRS(Command Query Responsibility Segration)架构,大家应该不会陌生了.简单的说,就是一个系统,从架构上把它拆分为两部分:命令处理(写请求)+查询处理(读请求).然后读写两边可以用不同的架构实现,以实现CQ两端(即Command

enode框架入门:消息队列的设计思路

上一篇文章,简单介绍了enode框架内部的整体实现思路,用到了staged event-driven architecture的思 想.通过前一篇文章,我们知道了enode内部有两种队列:command queue.event queue:用户发送的command 会进入command queue排队,domain model产生的domain event会进入event queue,然后等待被dispatch到所 有的event handlers.本文介绍一下enode框架中这两种消息队列到底

enode框架入门:框架的总体目标

本文想介绍一下enode框架要实现的目标以及部分实现分析思路剖析.总体来说enode框架是一个基于cqrs 架构和消息驱动的应用开发框架.在说实现思路之前,我们先看一下enode框架希望实现的一些目标吧! 框架总体目标 高吞吐量(High Throughput).低延迟(Low Latency).高可用性(High Availability): 需要能充分利用CPU,即要允许方便配置需要使用的并行处理线程数,以提高单台机器的command处理能力 : 支持command的同步和异步处理,同步处理

enode框架入门:saga的思想与实现

因为enode框架的思想是,一次修改只能新建或修改一个聚合根:那么,如果一个用户请求要涉及多个聚合 根的新建或修改该怎么办呢?本文的目的就是要分析清楚这个问题在enode框架下是如何解决的.如果想直接 通过看代码的朋友,可以直接下载源代码,源码中共有三个例子,BankTransferSagaSample这个例子就是本文 所用的例子. saga的由来 saga这个术语,可能很多人都还很陌生.saga的提出,最早是为了解 决可能会长时间运行的分布式事务(long-running process)的问

enode框架入门:开篇

前言 今天是个开心的日子,又是周末,可以安心轻松的写写文章了.经过了大概3年的DDD理论积累 ,以及去年年初的第一个版本的event sourcing框架的开发以及项目实践经验,再通过今年上半年利用业余时 间的设计与开发,我的enode框架终于可以和大家见面了. 自从Eric Evan提出DDD领域驱动设计以来已 经过了很多年了,现在已经有很多人在学习或实践DDD.但是我发现目前能够支持DDD开发的框架还不多,至少 在国内还不多.据我所知道的java和.net平台,国外比较有名的有:基于java

ENode 1.0 - 框架的总体目标

开源地址:https://github.com/tangxuehua/enode 本文想介绍一下enode框架要实现的目标以及部分实现分析思路剖析.总体来说enode框架是一个基于cqrs架构和消息驱动的应用开发框架.在说实现思路之前,我们先看一下enode框架希望实现的一些目标吧! 框架总体目标 高吞吐量(High Throughput).低延迟(Low Latency).高可用性(High Availability): 需要能充分利用CPU,即要允许方便配置需要使用的并行处理线程数,以提高单

基于事件驱动的DDD领域驱动设计框架分享(附源代码)

补充:现在再回过头来看这篇文章,感觉当初自己偏激了,呵呵.不过没有以前的我,怎么会有现在的我和现在的enode框架呢?发现自己进步了真好! 从去年10月份开始,学了几个月的领域驱动设计(Domain Driven Design,简称DDD).主要是学习领域驱动设计之父Eric Evans的名著:<Domain-driven design:领域驱动设计:软件核心复杂性应对之道>,以及另外一本Martin Flower的<企业应用架构模式>,学习到了不少关于如何组织业务逻辑方面的知识.