ENode 2.0 - 第一个真实案例剖析-一个简易论坛(Forum)

前言

经过不断的坚持和努力,ENode 2.0的第一个真实案例终于出来了。这个案例是一个简易的论坛,开发这个论坛的初衷是为了验证用ENode框架来开发一个真实项目的可行性。目前这个论坛在UI上是使用了最终一致性,也就是说当我们发帖或回帖后不会立马显示你的帖子或回复。当我们下一次刷新页面时,会显示出来。这点貌似很多人向我反馈不太习惯,接受不了,呵呵。我之所以这样做也是想看看最终一致性大家的接受程度如何,看来UI层面上的最终一致性,大部分人接受不了。回头我改进下效果,改为立即可以看到帖子或回复吧!另外,关于ENode是什么,本文就不多介绍了,可以参考这篇文章的介绍。本文重点介绍一下ENode是如何帮助我们开发一个基于DDD+CQRS+Event Sourcing架构的应用程序的。这个论坛使用到了ENode, EQueue两个框架,EQueue是一个分布式的消息队列组件,该组件的主体思想是参考阿里的RocketMQ。当我们使用EQueue时,面向的不是Queue,而是Topic。EQueue也是完全用C#实现的,关于EQueue详细介绍,大家可以看一下这篇文章

ENode, EQueue, Forum 开源项目地址

  1. ENode开源地址:https://github.com/tangxuehua/enode
  2. EQueue开源地址:https://github.com/tangxuehua/equeue
  3. ECommon开源地址:https://github.com/tangxuehua/ecommon
  4. Forum开源地址:https://github.com/tangxuehua/forum
  5. Forum论坛线上地址(临时域名,以后会改为enode.me):http://www.enode.me

另外,项目中如果要开发引用程序集,可以通过Nuget来获取,输入关键字ENode就能看到所有相关的Package了,如下图所示:

Forum总体架构分析

Forum采用DDD+CQRS+Event Sourcing的架构。借助于ENode,使得Forum本身无须再做技术架构方面的设计了,直接使用ENode就能完成这种架构。所以我们只要明白了ENode的架构,就知道这个Forum的架构是怎样的了。以下是ENode的架构图(已经理解了这个图的朋友请直接跳过这一节):

上图是一个CQRS架构的数据流向图。UI请求会分为两类:Command和Query。

Command用于写数据,Query用于读数据,写数据和读数据完全采用不同的架构实现。写数据支持同步和异步的方式,读数据完全走简单高效思路来实现。当我们要对系统做写操作时,如果你是用ASP.NET MVC来开发站点,那就可以在Controller中创建并发送一个Command即可。该Command会被发送到消息队列(EQueue中),然后消息队列的订阅方,也就是处理Command的进程会拉取这些Command,然后调用Command Handler完成Command的处理;Command Handler处理Command时,是调用Domain的方法来完成相关的业务逻辑操作。Domain就是DDD中的领域层,负责实现整个系统的业务逻辑。 然后由于是Event Sourcing的架构,所以Domain中任何聚合根的修改都会产生相应的领域事件(Domain Event),领域事件会先被持久化到EventStore中,持久化如果没有遇到并发冲突,成功后,则会被发布(Publish)到消息队列(EQueue中),然后消息队列的订阅方,也就是处理Domain Event的进程会拉取这些Domain Event,然后调用相关的Event Handler做相关的更新,比如有些Event Handler是会更新读库(Read DB),有些是会产生新的Command,这种我把它叫做流程管理器(Process Manager,也有人叫做Saga)。当我们有时一个业务场景需要涉及到多个聚合根的修改时,我们会需要用到Process Manager。Process Manager负责对流程进行建模,它的原理是基于事件驱动的流程实现。Process Manager处理事件,然后产生响应的Command,从而完成聚合根之间的交互。一般一个流程,我们会设计一个流程聚合根以及其他的参与该流程的聚合根,Process Manager则是用于负责协调这些聚合根之间的交互。具体的例子可以看一下ENode源代码中的BankTransferSample。

关于Query端,由于都是查询,这些查询都是用于UI展示数据或者为第三方接口提供数据为目的,查询对系统无副作用。我们可以用我们自己任意喜欢的方式来实现Query端。查询面向的是Read DB。上面提到,Read DB中的数据是通过Event Handler(老外叫Denormalizer)来更新的。

所以我们可以看到,整个架构中,Command端和Query端的数据源是完全分离的。Command端最后的结果就是Domain Event,Domain Event是持久化在Event Store中的;Query端的数据源就是Read DB,一般可以用关系型数据库来作为存储。CQ两端的数据同步通过Domain Event来实现。

上图的CQRS架构最大的好处是在架构级别以及数据存储级别,把读写都分离了。这样我们可以方便的对读或写单独做优化。另外由于使用了Event Sourcing的架构,使得我们的Command端只要持久化了Domain Event,就意味着保存了这个Domain的所有状态。这个特性,可以让我们的框架有很多设计余地,比如不必考虑Domain Event和业务数据要强一致等问题,因为Domain Event本身就是业务数据本身了,我们通过Domain Event随时可以还原出任意时刻的Domain的状态。当我们要查询Domain的当前最新数据时,就走Query端即可。当然,由于Query端是异步更新的,所以Query端的数据可能会有一点点延迟。这点也就是我们平时一直讲到的最终一致性(CQ两端的数据最终会一致)。

通过上面的架构图,我们知道,一个Command发出后会经过两个阶段的处理:1)先被某个Command Service处理(调用Domain完成业务逻辑产生Domain Event);2)再被Event Service处理(响应Domain Event,完成Read DB的更新或者产生新的Command);理解这两个阶段对理解下面的Forum的项目结构很有用处。

Forum项目结构分析

 

以上是Forum的项目工程结构,项目中包含四个宿主工程,分别是:

Forum.BrokerService:

这个工程用于宿主EQueue的Broker,整个论坛中所有的Command,Domain Event的消息,都会被放在Broker上。比如Controller发送的Command会被发送到Broker,同样Domain产生的Domain Event也会被发送到Broker;然后消费者消费消息则都是从BrokerService拉取消息。由于该宿主工程不需要和用户交互,所以我部署为Windows Service。

Forum.CommandService:

这个工程就是用于处理Command的进程,同样也部署为Windows Service。

Forum.EventService:

这个工程就是用于处理Domain Event的进程,同样也部署为Windows Service。

Forum.Web:

这个就是论坛的Web站点了,不用多讲了;这个Web站点做的事情就是发送Command或者调用Query端的查询服务查询数据;Web站点只需要依赖于Forum.Commands和Forum.QueryServices即可,因为它只需要发送Command和查询数据即可。

Forum.CommandHandlers:

所有的Command Handler都在这个工程,Command Handler的职责是处理Command,调用Domain的方法完成业务逻辑;

Forum.Commands:

所有的Command都在这个工程中,每个Command都是一个DTO,会被封装为消息发送到EQueue。

Forum.Domain:

就是论坛的领域层了,所有的聚合以、工厂、领域服务,以及领域事件等都在这个工程中。这个工程是整个Forum最有价值的地方,是业务逻辑所在的工程。

Forum.Domain.Dapper:

由于Domain中可能会定义一些接口,这些接口背后的持久化需要在外部实现;如果按照经典DDD的架构,比如仓储接口是在Domain层定义,而实现则是在基础层(Infrastructure)中。而从经典DDD的分层架构图上来看,Domain层是依赖于Infrastructure层的,但是Infrastructure层中又有一些仓储的实现类要依赖于Domain层;虽然我能理解这种双向依赖,但很容易会给不少学DDD的人带来困惑,所以我更加倾向于,把Domain看做是架构的核心,其他一切都是Domain的外围。这个思想其实和六边形架构是一个思路。就是从架构上来看,不是上层依赖于下层,而是外层依赖于内层;内层通过定义出接口,外层实现接口,内层只要面向自己定义的接口即可。所以基于这个思路,我会把Forum.Domain中定义的接口,如果用Dapper来实现,那我就定义一个Forum.Domain.Dapper这样的工程,意思是实现Forum.Domain.Dapper依赖于内层的Forum.Domain。假如以后我们有一个基于EntityFramework的实现,则只要再创建一个Forum.Domain.EntityFramework这样的工程即可。所以可以看出,Forum.Domain.Dapper这种工程实际上是Forum.Domain对外部的适配器,Forum.Domain里定义好适配接口,Forum.Domain.Dapper这种工程实现这些适配接口。基于这种思想,我们的架构就没有了上层依赖下层的概念了,而是替换为内外层的关系,内层不依赖外层,外层依赖于内层,内层与外层直接通过适配器接口来交互,或者通过Domain Event也可以。这样我们就不用再去纠结经典DDD中看似双向依赖的问题了。

Forum.Domain.Tests:

这个工程就是对Forum.Domain的一个测试工程。每个测试用例会模拟Controller发起Command,然后最后检查Domain中的状态是否正确修改。

Forum.QueryServices:

这个工程定义了Query端的所有查询接口,Forum.Web站点依赖于这个工程中的查询服务接口;然后这些查询接口的实现则是放在Forum.QueryServices.Dapper中。Forum.QueryServices与Forum.QueryServices.Dapper之间的关系和Forum.Domain与Forum.Domain.Dapper之间的关系类似,这里就不在重复了。

Forum.Denormalizers.Dapper:

这个工程中的就是所有的Denormalizer,Denormalizer就是负责处理Domain Event,然后更新读库。然后由于目前使用Dapper实现数据持久化,所以工程名以Dapper结尾。

Forum.Infrastructure:

这是一个基础工程,存放所有基础的公共的东西,比如一些业务无关的服务或配置信息或全局变量等东西;需要强调的是:这里的Forum.Infrastructure和经典DDD中的Infrastructure不是同一个概念。DDD中的Infrastructure是一个逻辑上的分层,领域层中所有的技术支撑实现都在Infrastructure中;而这里的Infrastructure,则仅仅只是一些Common的基础的公用的东西,Infrastructure不是为了为其它哪一层服务的,它可以被其他任何项目使用;

好了,以上简单介绍了每个工程的作用和设计目的。下面我们来看看Forum的领域模型的设计吧!

Forum的Domain Model的设计

核心功能需求分析

1. 提供用户注册、登录、注销三个功能;注册用户时需要验证用户名是否唯一;

2. 提供发帖、回帖、修改帖子、修改回复,以及回复的回复这些基本核心功能;

3. 系统管理员可以对论坛版块进行维护;

聚合识别

识别出来的聚合有:论坛账号、帖子、回复、版块这四个。

账号的最少信息应该有:账号名称+密码;

版块要有名称即可;

帖子要有标题、内容、发帖人、发帖时间、所属版块;

回复要有回复内容、回复时间、回复人、所属版块,父回复(可以为空);

场景走查

注册就是创建账号(账号唯一性的设计后面在详细分析);

登录本质就是调用Query端的查询服务查找账号是否存在,所以不需要Domain做什么处理,注销也是;

发帖就是创建帖子;

回帖就是创建回复;

修改帖子就是对帖子聚合根做修改;

修改回复就是对回复聚合根做修改;

版块添加就是创建一个版块聚合根;

关键业务规则识别

1)账号名称不能重复;2)帖子必须要有所属版块和发帖人;3)回复必须要有一个对应的帖子和回复人;

下面我们分析一下每个业务规则的实现。

  1. 如何实现账号名称不能重复?首先它是一条业务规则,所以必须在Domain里实现,而不应该在Command Handler里。然后由于Event Sourcing的架构,天生有一个缺陷就是无法实现唯一性约束这种需求。所以我们需要在Domain中显式设计出可以表达聚合根索引的东西,我把它们叫做IndexStore,表示是一种聚合根索引的存储。这个思路非常类似于在经典DDD中,我们有仓储(Repository)的概念,仓储维护了所有的聚合根;而我这里的IndexStore则是维护了聚合根的索引信息。有了这个索引信息后,我们就能在注册新账号时,在Domain中设计一个RegisterAccountService这样的领域服务,领域服务里通过AccountIndexStore来检查账号名称是否重复,如果不重复,则将当前账号名称添加到AccountIndexStore中,如果重复,则报异常。另外一个非业务的点需要考虑,那就是如何实现并发注册用户的处理。我们可以在Command Handler中实现db级别的锁(但不不需要锁整个账号表,而是锁一个其他表中的某一条记录),确保同一时刻,不会有两个Account名称添加到AccountIndexStore中;我们通过RegisterAccountService把“账号名称不能重复”的这个业务规则显式的表达出来,从而在代码级别体现领域内实现了这个业务规则。以前,如果没有用Event Sourcing,我们可能会依赖db的唯一索引来实现这个唯一性,虽然功能上也可以实现,但实际上账号名称不能重复的这个业务规则没有体现在领域内。这点也是我这次通过实现基于Event Sourcing而实现的唯一性验证而想到的点。
  2. 帖子必须要有所属版块和发帖人,这条业务规则很容易保证,只要在帖子聚合根上,对版块和发帖子判断是否为空就行了;
  3. 回复必须要有一个对应的帖子和回复人,也是同理,只要在构造函数中判断是否为空即可;

以注册新用户为例,展示代码实现

客户端JS通过angularJS提交注册信息

    $scope.submit = function () {
        if (isStringEmpty($scope.newAccount.accountName)) {
            $scope.errorMsg = '请输入账号。';
            return false;
        }
        if (isStringEmpty($scope.newAccount.password)) {
            $scope.errorMsg = '请输入密码。';
            return false;
        }
        if (isStringEmpty($scope.newAccount.confirmPassword)) {
            $scope.errorMsg = '请输入密码确认。';
            return false;
        }
        if ($scope.newAccount.password != $scope.newAccount.confirmPassword) {
            $scope.errorMsg = '密码输入不一致。';
            return false;
        }

        $http({
            method: 'POST',
            url: '/account/register',
            data: $scope.newAccount
        })
        .success(function (result, status, headers, config) {
            if (result.success) {
                $window.location.href = '/home/index';
            } else {
                $scope.errorMsg = result.errorMsg;
            }
        })
        .error(function (result, status, headers, config) {
            $scope.errorMsg = result.errorMsg;
        });
    };

Controller处理请求

[HttpPost]
[AjaxValidateAntiForgeryToken]
[AsyncTimeout(5000)]
public async Task<ActionResult> Register(RegisterModel model, CancellationToken token)
{
    var command = new RegisterNewAccountCommand(model.AccountName, model.Password);
    var result = await _commandService.Execute(command, CommandReturnType.EventHandled);

    if (result.Status == CommandStatus.Failed)
    {
        if (result.ExceptionTypeName == typeof(DuplicateAccountException).Name)
        {
            return Json(new { success = false, errorMsg = "该账号已被注册,请用其他账号注册。" });
        }
        return Json(new { success = false, errorMsg = result.ErrorMessage });
    }

    _authenticationService.SignIn(result.AggregateRootId, model.AccountName, false);
    return Json(new { success = true });
}

CommandHandler处理Command

[Component(LifeStyle.Singleton)]
public class AccountCommandHandler : ICommandHandler<RegisterNewAccountCommand>
{
    private readonly ILockService _lockService;
    private readonly RegisterAccountService _registerAccountService;

    public AccountCommandHandler(ILockService lockService, RegisterAccountService registerAccountService)
    {
        _lockService = lockService;
        _registerAccountService = registerAccountService;
    }

    public void Handle(ICommandContext context, RegisterNewAccountCommand command)
    {
        _lockService.ExecuteInLock(typeof(Account).Name, () =>
        {
            context.Add(_registerAccountService.RegisterNewAccount(command.Id, command.Name, command.Password));
        });
    }
}

RegisterAccountService领域服务

    /// <summary>提供账号注册的领域服务,封装账号注册的业务规则,比如账号唯一性检查
    /// </summary>
    [Component(LifeStyle.Singleton)]
    public class RegisterAccountService
    {
        private readonly IIdentityGenerator _identityGenerator;
        private readonly IAccountIndexStore _accountIndexStore;
        private readonly AggregateRootFactory _factory;

        public RegisterAccountService(IIdentityGenerator identityGenerator, AggregateRootFactory factory, IAccountIndexStore accountIndexStore)
        {
            _identityGenerator = identityGenerator;
            _factory = factory;
            _accountIndexStore = accountIndexStore;
        }

        /// <summary>注册新账号
        /// </summary>
        /// <param name="accountIndexId"></param>
        /// <param name="accountName"></param>
        /// <param name="accountPassword"></param>
        /// <returns></returns>
        public Account RegisterNewAccount(string accountIndexId, string accountName, string accountPassword)
        {
            //首先创建一个新账号
            var account = _factory.CreateAccount(accountName, accountPassword);

            //先判断该账号是否存在
            var accountIndex = _accountIndexStore.FindByAccountName(account.Name);
            if (accountIndex == null)
            {
                //如果不存在,则添加到账号索引
                _accountIndexStore.Add(new AccountIndex(accountIndexId, account.Id, account.Name));
            }
            else if (accountIndex.IndexId != accountIndexId)
            {
                //如果存在但和当前的索引ID不同,则认为是账号有重复
                throw new DuplicateAccountException(accountName);
            }

            return account;
        }
    }

EventHandler处理Domain Event

[Component(LifeStyle.Singleton)]
public class AccountEventHandler : BaseEventHandler, IEventHandler<NewAccountRegisteredEvent>
{
    public void Handle(IEventContext context, NewAccountRegisteredEvent evnt)
    {
        using (var connection = GetConnection())
        {
            connection.Insert(
                new
                {
                    Id = evnt.AggregateRootId,
                    Name = evnt.Name,
                    Password = evnt.Password,
                    CreatedOn = evnt.Timestamp,
                    UpdatedOn = evnt.Timestamp,
                    Version = evnt.Version
                }, Constants.AccountTable);
        }
    }
}

结束语

好了,大概就这些吧。好久没写文章了,都不知道该怎么写了,呵呵。接下来准备再好好分享下ENode,EQueue最近几个月在不断完善中我遇到的一些技术问题。对了,大家可以去体验下这个论坛的功能哦,虽然很简单,但基本的功能还是有的。http://www.enode.me

下一篇文章准备仔细介绍下ENode中各个环节的幂等的处理,比如Command的幂等处理,Domain Event持久化的幂等处理,Event Handler的幂等处理,Saga Process Manager的幂等处理。

时间: 2024-10-26 09:46:21

ENode 2.0 - 第一个真实案例剖析-一个简易论坛(Forum)的相关文章

电商网站产品优化:打造爆款的真实案例

文章描述:打造爆款的真实案例. 2010年11月底,接到一个门户网站老总的小舅子的电话,让我帮他把1万多件长袖T恤库存在处理掉,"大冬天让我帮你卖T恤,这不是傻B吗",但不敢得罪他,只能硬着头皮告诉他试试看吧.可这家伙又问我能不能打照爆款,气的我差点晕过去,"这个小子一定被傻子吻过".但去淘宝浏览一下T恤的数据与他店铺的T恤数据后,发现这些T恤还真有打照爆款的可能,对方提出的条件是,这款长袖T恤进价是18元,只要18元能处理了就可以,于是决定拟定一个计划试一下. 计

ENode 2.0 - 整体架构介绍

前言 今天是个开心的日子,又是周末,可以轻轻松松的写写文章了.去年,我写了ENode 1.0版本,那时我也写了一个分析系列.经过了大半年的时间,我对第一个版本做了很多架构上的改进,最重要的就是让ENode实现了分布式,通过新增一个分布式消息队列EQueue来实现.之所以要设计一个分布式的消息队列是因为在enode 1.0版本中,某个特定的消息队列只能被某个特定的消费者消费.这样就会导致一个问题,就是如果这个消费者挂了,那这个消费者对应的消息队列就不能自动被其他消费者消费了.这个问题会直接导致系统

【数据泵】EXPDP导出表结构(真实案例)

[数据泵]EXPDP导出表结构(真实案例) BLOG文档结构图         因工作需要现需要把一个生产库下的元数据(表定义,索引定义,函数定义,包定义,存储过程)导出到测试库上,本来以为很简单的,可是做的过程发现很多的问题,现记录如下,希望有同样需要的朋友不要再走弯路了.     一.1  导读 各位技术爱好者,看完本文后,你可以掌握如下的技能,也可以学到一些其它你所不知道的知识,~O(∩_∩)O~: ① EXPDP和IMPDP如何导出导入元数据,包括表定义,索引定义,函数定义,包定义,存储

梭子鱼WEB应用访问安全案例剖析:没有安全何来应用!

日前在北京召开的2009年系统架构师大会,无疑是今年最为重头的技术研讨会之一.不仅参会人员大多是公司IT运维的精英,演讲人也是各大公司高级系统架构设计者. 不过谈架构必然要谈及 整体架构的安全策略实施,如何才能确保网络运维在一个安全高效的环境中,也是 各位系统管理员所要面对的问题.梭子鱼网络有限公司华北区销售总监郑爽对此也有自己的看法.498)this.width=498;' onmousewheel = 'javascript:return big(this)' height=235 alt=

Windows Azure客户真实案例:Associated Press (美联社)

世界范围的新闻提供商通过托管服务创建了新的内容通道  Associated Press (AP,美联社)是世界最大的新闻机构,在全球97个国家设有办事处. 平均而言,每天全世界一半新闻都来自AP. AP 想要通过鼓励开发者在他们的应用程序中融入AP的内容,来拓展它的覆盖面.通过使用  Microsoft托管的 Windows AzureTM 平台 , AP创建了高度可拓展的解决方案,它简化了方案,创造了新的商业机会. 业务需求: Associated Press (AP) 每周7天,每天24小时

孤星:吐血分享网站降权的原因真实案例分析

众所周知,网站优化运营过程中有很多影响排名的关键因素,其中包括频繁修改网站标题,外部链接大起大落,常见的黑冒手法等等,这些关键指标如果侥幸运用可能会逃过搜索引擎一时的注意,但是这样的做法八九不离十会引起网站的降权,笔者今天就以惨痛的教训和大家分享服务器因素导致网站降权的真实案例.好了咱们闲话短续进入今天主题,服务器因素导致网站降权的原因分析. 第一,首先简要介绍下笔者的网站,笔者的网站主要从事信息安全方面计算机软硬件产品的研发和销售的一个企业站,主做关键词是内网安全(http://www.xki

Centos vpn服务器间ip隧道跳转多ip路由走向分流的真实案例

本文系统Centos6.0:这里的vpn服务以pptpd为例:其中底层涉及到pptpd+freeradius+mysql认证: 相关url:http://lansgg.blog.51cto.com/5675165/1225461 本文系真实案例:leader需求大 多是这样的,节约成本还要达到所需要的效果:没办法,总的做个效果出来: 需求,国内一台vpn机器 (server1),多ip,如:10.10.10.1-5(5个ip):国外一台vpn机器(server2),多ip,如:20.20.20.

【impdp】IMPDP中的TRANSFORM参数--【数据泵】EXPDP导出表结构(真实案例)后传

        在上一篇文章[数据泵]EXPDP导出表结构(真实案例) 中:http://blog.itpub.net/26736162/viewspace-1657828/  ,由于表的storage参数存储很大,导致不能导入到测试库,我提出了2种办法,但是今天经过网友selectshen (http://blog.itpub.net/28539951/)的提醒说是有一个参数TRANSFORM可以解决这个问题,于是就研究了一下这个参数,发现大牛的一篇文章,http://blog.itpub.n

vpn之ip隧道多对一模式实现跳转功能的真实案例

本文又是一真实案例:对于真实ip只能显示一部分,望见谅 - -~ 话说leader的需求永远是无止境的:他想要的,你就需要去想办法实现 前几天刚做了,国内机器vpn跳转到国外机器的路由分流:今天突然心血来潮,改变方案:如果说上次是节省成本,那么这次的方案就是更加节省成本:赶快埋头去做吧:测试吧 先看下拓扑如下: 很明了,国内的三台跳转到海外的一个机器上:海外机器上的三个ip还要可以实现直接连接:大概节省了多少成本呢? 这里先来分配下ip吧:这里以pptp服务为例:首先实现每台机器都可以单独进行正