EventBus In eShop -- 解析微软微服务架构Demo(四)

引言

大家好像对分析源码厌倦了,说实在我也会厌倦,不过不看是无法分析其后面的东西,从易到难是一个必要的过程。

今天说下EventBus,前几天园里的大神已经把其解刨,我今天就借着大神的肩膀,分析下在eShop项目中EventBus的实现。

最近发觉转发文章不写出处的,特此加上链接:http://inday.cnblogs.com

解析源码

我们知道使用EventBus是为了解除Publisher和Subscriber之间的依赖性,这样我们的Publisher就不需要知道有多少Subscribers,只需要通过EventBus进行注册管理就好了,在eShop项目中,有一个这样的接口IEventBus(eShopOnContainers\src\BuildingBlocks\EventBus\EventBus\Abstractions)

public interface IEventBus
{
    void Subscribe<T, TH>(Func<TH> handler)
        where T : IntegrationEvent
        where TH : IIntegrationEventHandler<T>;
    void Unsubscribe<T, TH>()
        where TH : IIntegrationEventHandler<T>
        where T : IntegrationEvent;

    void Publish(IntegrationEvent @event);
}

我们可以看到这个接口定义了EventBus所需的一些操作, 对比大神的EventBus,相关功能都是一致的,我们看下它的实现类:EventBusRabbitMQ,从名字上可以看出,这是一个通过RabbitMQ来进行管理的EventBus,我们可以看到它使用了IEventBusSubscriptionsManager进行订阅存储,也就是大神文中的:

private readonly ConcurrentDictionary<Type, List<Type>> _eventAndHandlerMapping;

微软在Demo中把其提取出了接口,把一些常用方法给提炼了出来,但是核心还是Dictionary<string, List<Delegate>>, 使用Dictionary进行Map映射。通过Subscribe和UnSubscribe进行订阅和取消,使用Publish方法进行发布操作。

public void Subscribe<T, TH>(Func<TH> handler)
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{
    var eventName = typeof(T).Name;
    var containsKey = _subsManager.HasSubscriptionsForEvent<T>();
    if (!containsKey)
    {
        if (!_persistentConnection.IsConnected)
        {
            _persistentConnection.TryConnect();
        }

        using (var channel = _persistentConnection.CreateModel())
        {
            channel.QueueBind(queue: _queueName,
                                exchange: BROKER_NAME,
                                routingKey: eventName);
        }
    }

    _subsManager.AddSubscription<T, TH>(handler);

}

我们看到在订阅的时候,EventBus会检查下在Map中是否有相应的注册,如果没有的话首先回去RabbitMQ中创建一个新的channel进行绑定,随后在Map中进行注册映射。

UnSubscribe则直接从Map中取消映射,通过OnEventRemoved事件判断Map下此映射的subscriber是否为空,为空则从RabbitMQ中关闭channel。

在RabbitMQ的构造方法中,我们看到这样一个创建:CreateConsumerChannel(),这里创建了一个EventingBasicConsumer,当Queue中有新的消息时会通过ProcessEvent执行Map中注册的handler(subscribers),看图可能更清晰些:

 

在ProcessEvent方法中,回去Map中找寻subscribers,然后通过动态反射进行执行:

private async Task ProcessEvent(string eventName, string message)
{

    if (_subsManager.HasSubscriptionsForEvent(eventName))
    {
        var eventType = _subsManager.GetEventTypeByName(eventName);
        var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
        var handlers = _subsManager.GetHandlersForEvent(eventName);

        foreach (var handlerfactory in handlers)
        {
            var handler = handlerfactory.DynamicInvoke();
            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
        }
    }
}

微软通过简单的代码解耦了Publisher和Subscribers之间的依赖关系,我们引用大神的总结:

应用

在catalog.api中,微软出现了EventBus,我在上一篇中也提到了,这是我的一个疑惑,因为在catalog中并没有订阅操作,直接执行了Publish操作,原先以为是一个空操作,后来看了Basket.Api我才知道为何微软要用RabbitMQ。

使用RabbitMQ,我们不仅是从类之间的解耦,更可以跨项目,跨语言,跨平台的解耦,publisher仅仅需要把消息体(IntegrationEvent)传送到RabbitMQ,Consumer从Queue中获取消息体,然后推送到Subscribers执行相应的操作。我们看下Basket.Api.Startup.cs:

protected virtual void ConfigureEventBus(IApplicationBuilder app)
{
    var catalogPriceHandler = app.ApplicationServices
        .GetService<IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>>();

    var orderStartedHandler = app.ApplicationServices
        .GetService<IIntegrationEventHandler<OrderStartedIntegrationEvent>>();

    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();

    eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>
        (() => app.ApplicationServices.GetRequiredService<ProductPriceChangedIntegrationEventHandler>());

    eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler>
        (() => app.ApplicationServices.GetRequiredService<OrderStartedIntegrationEventHandler>());
}

在这个方法里,我们看到了Subscribe操作,想想之前的提问有点搞笑,不过研究明白了也不错,对吧!

总结

今天我们看了EventBus在Demo中的应用,总结一下。

1、EventBus可以很好的解耦订阅者和发布者之间的依赖

2、使用RabbitMQ能够跨项目、跨平台、跨语言的解耦订阅者和发布者

虽然在Demo中我们看到对订阅者的管理是通过Dictionary内存的方式,所以我们的Subscribe仅仅只在Basket.Api中看到,但微软是通过IEventBusSubscriptionsManager接口定义的,我们可以通过自己的需求来进行定制,可以做成分布式的,比如使用memcached。

写在最后

每个月到下旬就会比较忙,所以文章发布会比较慢,但我也会坚持学习完eShop的,为了学习,我建了个群,大家可以进来一起学习,有什么建议和问题都可以进来哦。

eShop虽好,但不建议大家放到生产环境,毕竟是一个Demo,而且目前还是ALPHA版本,用来学习是一个很好的教材,这就是一个大杂烩,学习中你会学到很多新的东西,大家如果看好core的发展,可以一起研究下。

QQ群:376248054

时间: 2024-09-22 21:45:44

EventBus In eShop -- 解析微软微服务架构Demo(四)的相关文章

Health Check in eShop -- 解析微软微服务架构Demo(五)

引言 What is the Health Check     Health Check(健康状态检查)不仅是对自己应用程序内部检测各个项目之间的健康状态(各项目的运行情况.项目之间的连接情况等),还包括了应用程序对外部或者第三方依赖库的状态检测. Why use Health Check     现在我们的项目越来越多的从单体多层架构转换成多项目多层架构即现在流行的微服务架构.     原来我们的App把各个模块分层分项目处理,比如Users项目仅仅处理User的一些业务需求,但在整个项目使用

开篇有益-解析微软微服务架构eShopOnContainers(一)

为了推广.Net Core,微软为我们提供了一个开源Demo-eShopOnContainers,这是一个使用Net Core框架开发的,跨平台(几乎涵盖了所有平台,windows.mac.linux.android.ios)的,基于微服务架构的,运行在容器中的小型应用,其不仅展示了.Net Core的跨平台性,更展示了VS2017的强大,所有代码都在VS2017下开发.从名字上可以看出,这是一个运行在容器上的电子店铺应用,利用Docker的跨平台性,使我们可以"build once, run

Identity Service - 解析微软微服务架构eShopOnContainers(二)

接上一篇,众所周知一个网站的用户登录是非常重要,一站式的登录(SSO)也成了大家讨论的热点.微软在这个Demo中,把登录单独拉了出来,形成了一个Service,用户的注册.登录.找回密码等都在其中进行. 这套service是基于IdentityServer4开发的, 它是一套基于 .Net Core的OAuth2和OpenID框架,这套框架目前已经很完善了,我们可以把它使用到任何项目中. 我们先看下目录结构: 从目录结构可以看出它是一套MVC架构的网站,我们可以单独进行运行和调试,当然,我们也可

Catalog Service - 解析微软微服务架构eShopOnContainers(三)

上一篇我们说了Identity Service,因为其基于IdentityServer4开发的,所以知识点不是很多,今天我们来看下Catalog Service,今后的讲解都会把不同的.重点的拿出来讲,希望大家明白. 源码分析 我们先看下它的目录结构,很标准的webapi目录: 首先看下Program,跟IdentityService类似,多了一个UseWebRoot("Pics"),把pics这个目录设置成了webroot,其他都一样. 在Startup的构造方法中,我们也看到了使用

基于Nginx搭建一个安全的、快速的微服务架构

本文讲的是基于Nginx搭建一个安全的.快速的微服务架构[编者的话]本文改编自Chris Stetson发表在nginx.conf 2016上的一个有关如今的微服务以及如何使用Nginx构建一个快速的.安全的网络系统的演讲,大家可以在YourTube上回看此次演讲. 0:00 - 自我介绍 Chris Stetson:Hi,我的名字是Chris Stetson,我在Nginx带领专业服务部门,同时也领导微服务实践. 今天我们要谈论微服务以及如何使用Nginx构建一个快速的.安全的网络系统.在我们

《Spring Cloud与Docker微服务架构实战》配套代码

不才写了本使用Spring Cloud玩转微服务架构的书,书名是<Spring Cloud与Docker微服务架构实战> - 周立,已于2017-01-12交稿.不少朋友想先看看源码,现将代码放出. 本次放出的代码: 共计70+个DEMO 覆盖Eureka.Ribbon.Feign.Hystrix.Zuul.Spring Cloud Config.Spring Cloud Bus.Spring Cloud Sleuth.Docker.Docker Compose等. 1-11章代码地址: ht

微服务架构的理论基础 - 康威定律

概述 关于微服务的介绍,可以参考微服务那点事. 微服务是最近非常火热的新概念,大家都在追,也都觉得很对,但是似乎没有很充足的理论基础说明这是正确的,给人的感觉是 不明觉厉 .前段时间看了Mike Amundsen <远距离条件下的康威定律--分布式世界中实现团队构建>(是Design RESTful API的作者)在InfoQ上的一个分享,觉得很有帮助,结合自己的一些思考,整理了该演讲的内容. 可能出乎很多人意料之外的一个事实是,微服务很多核心理念其实在半个世纪前的一篇文章中就被阐述过了,而且

基于微服务架构,实解容器级DevOps平台的建设

导读:本文以"实践过程中问题与思考"为主体,与大家分享其中的过程和经验,希望大家在后续的工作中能够避免相关问题,形成更佳实践. 首先简单说下我们要做什么,不谈理念,不谈哲学,我们要做一款基于微服务架构,可以同时运行在公有云和私有云上的容器云平台,以DevOps为目标,提升协作效率,快速交付. 为什么选择阿里云 现在的公有云如雨后春笋,国外如AWS.Azure.Bluemix,国内如阿里云.腾讯云.DaoCloud.goodrain等,都可以给大家提供丰富的云基础设施和上层服务,那为什么

如何构建微服务架构

本文讲的是如何构建微服务架构[编者的话]"微服务"的概念兴起于四五年前,近几年尤其火热,各大厂都在进行微服务化改造和微服务建设.最近一年来我们也参与了微服务化的改造大军,这里写下一些做微服务系统设计和开发时的切身感受. [3 天烧脑式基于Docker的CI/CD实战训练营 | 北京站]本次培训围绕基于Docker的CI/CD实战展开,具体内容包括:持续集成与持续交付(CI/CD)概览:持续集成系统介绍:客户端与服务端的 CI/CD 实践:开发流程中引入 CI.CD:Gitlab 和 C