一起谈.NET技术,NET下RabbitMQ实践 [WCF发布篇]

  在之前的两篇文章中,主要介绍了RabbitMQ环境配置,简单示例的编写。今天将会介绍如何使用WCF将RabbitMQ列队以服务的方式进行发布。
  注:因为RabbitMQ的官方.net客户端中包括了WCF的SAMPLE代码演示,很适合初学者,所以我就偷了个懒,直接对照它的SAMPLE来说明了,算是借花献佛吧,呵呵。首先我们下载相应源码(基于.NET 3.0),本文主要对该源码包中的代码进行讲解,链接如下:   
  Binary, compiled for .NET 3.0 and newer (zip) - includes example code, the WCF binding and WCF examples
  当然官方还提供了基本.NET 2.0 版本的示例版本,但其中只是一些简单的示例,并不包括WCF部分,这里只发个链接,感兴趣的朋友可自行研究。   
  Binary, compiled for .NET 2.0 (zip) - includes example code      
  下载基于.NET 3.0的版本源码之后,解压其中的projects\examples\wcf目录,可看到如下的项目:      几个文件夹分别对应如下应用场景:
  OneWay: 单向通信(无返回值)
  TwoWay: 双向通信(请求/响应)
  Session:会话方式
  Duplex: 双向通信(可以指定一个Callback回调函数)
  OneWay  
  在OneWayTest示例中,演示了插入日志数据,因为日志操作一般只是单纯的写入操作,不考虑返回值,所以使用OneWay方式。下面是其WCF接口声明和实例代码,如下:      

    [ServiceContract]
    public interface ILogServiceContract
    {
        [OperationContract(IsOneWay=true)]
        void Log(LogData entry);
    }
   
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
    public class LogService : ILogServiceContract
    {
        public int m_i;
        public void Log(LogData entry)
        {
            Util.WriteLine(ConsoleColor.Magenta, "  [SVC] {3} [{0,-6}] {1, 12}: {2}", entry.Level, entry.TimeStamp, entry.Message, m_i++);
        }
    }

  其只包含一个方法:Log(LogData entry) ---用于添加日志记录,可以看出它与我们以往写WCF代码没什么两样。不过这里要说明一下,在类属性InstanceContextMode枚举类型中,使用了“Single”模式,而该枚举提供了如下三种情况:       
  Single - 为所有客户端调用分配一个服务实例。
  PerCall – 为每个客户端调用分配一个服务实例。
  PerSession – 为每个客户端会话分配一个服务实例。每个Session内多线程操作实例的话会有并发问题。       
  InstanceContextMode 的默认设置为 PerSession
  这三个值通常是要与并发模式(ConcurrencyMode)搭配使用,以解决并发效率,共享资源等复杂场景下的问题的。下面是并发模式的说明:
  ConcurrencyMode 控制一次允许多少个线程进入服务。ConcurrencyMode 可以设置为以下值之一:   
  Single - 一次可以有一个线程进入服务。
  Reentrant - 一次可以有一个线程进入服务,但允许回调。
  Multiple - 一次可以有多个线程进入服务。       
  ConcurrencyMode 的默认设置为 Single。   
  InstanceContextMode 和 ConcurrencyMode 设置会相互影响,因此为了提升并发效能,必须协调这两项设置。   
  例如,将 InstanceContextMode 设置为 PerCall 时,会忽略 ConcurrencyMode 设置。这是因为,每个客户端调用都将路由到新的服务实例,因此一次只会有一个线程在服务实例中运行。对于PerCall的实例模型,每个客户端请求都会与服务端的一个独立的服务实例进行交互,就不会出现多个客户端请求争用一个服务实例的情况,也就不会出现并发冲突,不会影响吞吐量的问题。但对于实例内部的共享变量(static)还是会可能出现冲突。
  但对于当前Single设置,原因很多,可能包括:
  1. 创建服务实例需要大量的处理工作。当多个客户端访问服务时,仅允许创建一个服务实例可以降低所需处理量。
  2. 可以降低垃圾回收成本,因为不必为每个调用创建和销毁服务创建的对象。
  3. 可以在多个客户端之间共享服务实例。
  4. 避免对static静态属性的访问冲突。   
  但如果使用Single,问题也就出来了---就是性能,因为如果 ConcurrencyMode也同时设置成Single时,当前示例中的(唯一)服务实例不会同时处理多个(单线程客户端)请求。因为服务在处理请求时会对当前服务加锁,如果再有其它请求需要该服务处理的时候,需要排队等候。如果有大量客户端访问,这可能会导致较大的瓶颈。
  当然如果考虑到多线程客户端使用的情况,可能问题会更严重。 
  聊了这些,无非就是要结合具体应用场景来灵活搭配ConcurrencyMode,InstanceContextMode这两个枚举值。下面言归正传,来看一下如何将该服务与RabbitMQ进行绑定,以实现以WCF方式访问RabbitMQ服务的效果。这里暂且略过LogData数据结构信息类,直接看一下如果绑定服务代码(位于OneWayTest.cs):   

private ServiceHost m_host;
    
public void StartService(Binding binding)
{

    m_host = new ServiceHost(typeof(LogService), new Uri("soap.amqp:///"));
    ((RabbitMQBinding)binding).OneWayOnly = true;    
    m_host.AddServiceEndpoint(typeof(ILogServiceContract), binding, "LogService");
    m_host.Open();
    m_serviceStarted = true;  
}

  StartService方法的主体与我们平时启动WCF服务的方法差不多,只不过是将其中的URL协议部分换成了“soap.amqp”形式,而其中的传入参数binding则是RabbitMQBinding类型,该类型是rabbitmq客户端类库提供的用于对应Binding类的RabbitMQBinding实现。下面就是其类实始化代码:   

     return new RabbitMQBinding(System.Configuration.ConfigurationManager.AppSettings["manual-test-broker-uri"],RabbitMQ.Client.Protocols.FromConfiguration("manual-test-broker-protocol"));

    其包括两个参数,一个是rabbitmq服务地址,一个是所用的协议,其对应示例app.config文件中的如下结点:    

<add key="manual-test-broker-uri" value="amqp://10.0.4.79:5672/"/> <!--本系列第一篇中的环境设置-->
<add key="manual-test-broker-protocol" value="AMQP_0_8"/>

  这样,我们就完成了初始化服务实例工作。接着来构造客户端代码,如下:   

private ChannelFactory<ILogServiceContract> m_factory;

private ILogServiceContract m_client;                          
    
public ILogServiceContract GetClient(Binding binding)
{
    ((RabbitMQBinding)binding).OneWayOnly = true;
    m_factory = new ChannelFactory<ILogServiceContract>(binding, "soap.amqp:///LogService");
    m_factory.Open();
    return m_factory.CreateChannel();
}

  与平时写的代码相似,但传入参数就是上面提到的那个RabbitMQBinding实例,这样通过下面代码访问WCF中的LOG方法:

    m_client = GetClient(Program.GetBinding());
    m_client.Log(new LogData(LogLevel.High, "Hello Rabbit"));
    m_client.Log(new LogData(LogLevel.Medium, "Hello Rabbit"));
    ....

  到这里,我们可以看出,它的实现还是很简单的。我们只要把10.0.4.79:5672上的rabbitmq的环境跑起来,就可以看出最终的效果了。 之后我将C#的服务端(startservice)与客户端(getclient)分开布署到不同IP的主机上,也实现了示例中的结果。   
  TwoWay   
  下面介绍一下 TwoWay双向通信示例,首先是WCF接口声明和实现:     

    [ServiceContract]
    public interface ICalculator
    {
        [OperationContract]
        int Add(int x, int y);
        
        [OperationContract]
        int Subtract(int x, int y);
    }
    
   [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerCall)] /*为每个客户端调用分配一个服务实例*/
    public sealed class Calculator : ICalculator
    {
        public int Add(int x, int y)
        {
            return x + y;
        }

        public int Subtract(int x, int y)
        {
            return x - y;
        }
    }

  因为其服务的启动startservice和客户端实例构造与oneway方法类似,为了节约篇幅,这时就略过了,下面是其最终调用代码(位于TwoWayTest.cs):

  public void Run()
 {
     StartService(Program.GetBinding());

     ICalculator calc = GetClient(Program.GetBinding());

     int result = 0, x = 3, y = 4;
     Util.WriteLine(ConsoleColor.Magenta, "  {0} + {1} = {2}", x, y, result = calc.Add(x, y));
     if (result != x + y)
         throw new Exception("Test Failed");

    ......
 }   

  与普通的WCF TWOWAY 返回调用方式相同,就不多说了。   
  Session   
  下面是基于Session会话方式的代码,WCF接口声明和实现: 

    [ServiceContract(SessionMode= SessionMode.Required)]
    public interface ICart
    {
        [OperationContract]
        void Add(CartItem item);

        [OperationContract]
        double GetTotal();
        
        Guid Id { [OperationContract]get; }
    }
    
    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
    public class Cart : ICart
    {
        public Cart()
        {
            Items = new List<CartItem>();
            m_id = Guid.NewGuid();
        }
        
        private Guid m_id;
        private List<CartItem> m_items;

        private List<CartItem> Items {
            get { return m_items; }
            set { m_items = value; }
        }

        public void Add(CartItem item)
        {
            Items.Add(item);
        }
        
        public double GetTotal()
        {
            double total = 0;
            foreach (CartItem i in Items)
                total += i.Price;

            return total;
        }

        public Guid Id { get { return m_id; } }
    }

  该接口实现一个购物车功能,可以添加商品并计算总价,考虑到并发场景,这里将其实例为PerSession枚举类型,即为每个客户端会话分配一个服务实例。这样就可以在用户点击购买一件商品里,为其购物车商品列表List<CartItem>添加一条信息,而不会与其它用户的购物车商品列表相冲突。其最终的调用方法如下:   

public void Run()
{
    StartService(Program.GetBinding());

    ICart cart = GetClient(Program.GetBinding());

    AddToCart(cart, "Beans", 0.49);//添加商品到购物车
    AddToCart(cart, "Bread", 0.89);
    AddToCart(cart, "Toaster", 4.99);

    double total = cart.GetTotal();//计算总价
    if (total != (0.49 + 0.89 + 4.99))
        throw new Exception("Incorrect Total");
    ......
}

  Duplex   
  最后,再介绍一下如何基于Duplex双向通信模式进行开发,DuplexTest这是个“PIZZA订单”的场景,用户下单之后,等待服务端将PIZZA加工完毕,然后服务端用callback方法通知客户端PIZZA已做好,相应WCF接口声明和实现如下: 

   [ServiceContract(CallbackContract=typeof(IPizzaCallback))] /*绑定回调接口*/
    public interface IPizzaService
    {
        [OperationContract(IsOneWay=true)]
        void PlaceOrder(Order order);
    }

    [ServiceContract]
    public interface IPizzaCallback
    {
        [OperationContract(IsOneWay=true)]
        void OrderReady(Guid id); /*用于通知客户端*/
    }    
    
    public class PizzaService : IPizzaService
    {
        public void PlaceOrder(Order order)
        {
            foreach (Pizza p in order.Items)
            {
                Util.WriteLine(ConsoleColor.Magenta, "  [SVC] Cooking a {0} {1} Pizza...", p.Base, p.Toppings);
            }
            Util.WriteLine(ConsoleColor.Magenta, "  [SVC] Order {0} is Ready!", order.Id);

            Callback.OrderReady(order.Id);
        }

        IPizzaCallback Callback
        {
            get { return OperationContext.Current.GetCallbackChannel<IPizzaCallback>(); } //当前上下文中调用客户端绑定的回调方法
        }
    }

  这里要说明的是IPizzaCallback接口的OrderReady方法被绑定了IsOneWay=true属性,主要是因为如果使用“请求-响应”模式,客户端必须等服务端“响应”完成上一次“请求”后才能发出下一步“请求”。因此虽然客户端可以使用多线程方式来调用服务,但最后的执行结果仍然表现出顺序处理(效率低)。要想使服务端能够并行处理客户端请求的话,那我们就不能使用“请求-响应”的调用模式,所以这里使用One-Way的方式来调用服务。
  下面是客户端回调接口实现:      

    public class PizzaClient : DuplexClientBase<IPizzaService>, IPizzaService
    {
        public PizzaClient(InstanceContext context, Binding binding, EndpointAddress remoteAddress)
            : base(context, binding, remoteAddress) { }

        public void PlaceOrder(Order order)
        {
            Channel.PlaceOrder(order);
        }
    }

  最终客户端实例化(startservice)略过,因与之前示例类似。   

    public IPizzaService GetClient(Binding binding)
    {
        PizzaClient client = new PizzaClient(new InstanceContext(this), binding, new EndpointAddress(serverUri.ToString()));
        client.Open();
        return client;
    }

  上面的方法中将当前客户端实例this(实现了IServiceTest<IPizzaService>, IPizzaCallback接口)注册到上下文中,目的是为了将其方法的回传调用传递到服务端(还记得服务端的这行代码吗?=>Callback.OrderReady(order.Id))

public void OrderReady(Guid id)
{
    Util.WriteLine(ConsoleColor.Magenta, "  [CLI] Order {0} has been delivered.",id);
    mre.Set();
}

  这样,服务端完成pizza时,就可以调用客户端的OrderReady方法来实现通知功能了。下面就是一个整个的下单流程实现:

public void Run()
{
       ......
       StartService(Program.GetBinding());

       IPizzaService client = GetClient(Program.GetBinding());
       Order lunch = new Order();
       lunch.Items = new List<Pizza>();
       lunch.Items.Add(new Pizza(PizzaBase.ThinCrust, "Meat Feast"));
       client.PlaceOrder(lunch);
       ......
}

  好了,今天的主要内容就先到这里了,在接下来的文章中,将会介绍一个rabbitmq的实际应用场景,也是我们Discuz!NT企业版中的一个功能:记录系统运行的错误日志。     

时间: 2024-10-23 08:05:45

一起谈.NET技术,NET下RabbitMQ实践 [WCF发布篇]的相关文章

NET下RabbitMQ实践 [WCF发布篇]

在之前的两篇文章中,主要介绍了RabbitMQ环境配置,简单示例的编写.今天将会介绍如何使用WCF将RabbitMQ列队以服务的方式进行发布. 注:因为RabbitMQ的官方.net客户端中包括了WCF的SAMPLE代码演示,很适合初学者,所以我就偷了个懒,直接对照它的SAMPLE来说明了,算是借花献佛吧,呵呵.首先我们下载相应源码(基于.NET 3.0),本文主要对该源码包中的代码进行讲解,链接如下:    Binary, compiled for .NET 3.0 and newer (zi

一起谈.NET技术,NET下RabbitMQ实践 [配置篇]

     这个系列目前计划写四篇,分别是配置,示例,WCF发布,实战.当然不排除加餐情况.      介绍:      rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.他遵循Mozilla Public License开源协议.采用 Erlang 实现的工业级的消息队列(MQ)服务器.      RabbitMQ的官方站:http://www.rabbitmq.com/          AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为线路

NET 下RabbitMQ实践 [实战篇]

之前的文章中,介绍了如何将RabbitMQ以WCF方式进行发布.今天就介绍一下我们产品中如何使用RabbitMQ的! 在Discuz!NT企业版中,提供了对HTTP错误日志的记录功能,这一点对企业版非常重要,另外存储错误日志使用了MongoDB,理由很简单,MongoDB的添加操作飞快,即使数量过亿之后插入速度依旧不减.     在开始正文之前,先说明一下本文的代码分析顺序,即:程序入口==>RabbitMQ客户端===>RabbitMQ服务端.好了,闲话少说,开始正文!     首先是程序入

NET下RabbitMQ实践 [配置篇]

这个系列目前计划写四篇,分别是配置,示例,WCF发布,实战.当然不排除加餐情况. 介绍: rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.他遵循Mozilla Public License开源协议.采用 Erlang 实现的工业级的消息队列(MQ)服务器. RabbitMQ的官方站:http://www.rabbitmq.com/          AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为线路层协议,而不是API(例如JMS),AM

NET下RabbitMQ实践 [示例篇]

在上一篇文章中,介绍了在window环境下安装erlang,rabbitmq-server,以免配置用户,权限,虚拟机等内容.今天将会介绍如果使用rabbitmq进行简单的消息入队,出队操作,因为本文演示的环境要用到上文中配置的环境,所以要运行本文sample,请先按上一篇中完成相应环境配置.         首先,我们下载官方的.net客户端软件,链接:http://www.rabbitmq.com/dotnet.html.下载并安装之后,将安装目录下的这两个DLL文件复制到我们示例项目中,并

中间件技术及双十一实践·稳定性平台篇

稳定性平台--系统稳定运行的保障者 综述 大多数互联网公司都会根据业务对自身系统做一些拆分,大变小,1变n,系统的复杂度也n倍上升.当面对几十甚至几百个应用的时候,再熟悉系统的架构师也显得无能为力.稳定性平台从2011年就开始了依赖治理方面的探索,目前实现了应用级别和接口级别的依赖自动化治理.在2013的双11稳定性准备中,为共享交易链路的依赖验证和天猫破坏性测试都提供了支持,大幅度减小了依赖治理的成本和时间.另一方面,线上容量规划的一面是稳定性,另一面是成本.在稳定性和成本上找到一个最佳的交汇

中间件技术及双十一实践·软负载篇

软负载--分布式系统的引路人 综述 软负载是分布式系统中极为普遍的技术之一.在分布式环境中,为了保证高可用性,通常同一个应用或同一个服务的提供方都会部署多份,以达到对等服务.而软负载就像一个引路人,帮助服务的消费者在这些对等的服务中合理地选择一个来执行相关的业务逻辑. 1.1.ConfigServer ConfigServer主要提供非持久配置的发布和订阅.07/08年间在淘宝内部开发使用的时候,由于ZooKeeper还没有开源,不然可能会基于ZooKeeper来进行改造.主要使用场景是为分布式

中间件技术及双十一实践·服务框架篇

分布式服务框架--分布式服务的组织者 综述 06/07年以后,随着淘宝用户数量和网站流量的增长,应用系统的数量和复杂程度也急剧增加.诸多前台系统都需要使用一些公共的业务逻辑,这些业务逻辑通常具有共性的东西,比如,获取用户信息或查询宝贝详情等.如果将这些业务逻辑在各个系统内部都实现一遍,则大大增加了开发成本和后期维护成本.于是,像服务框架这类的中间件产品就应运而生.服务框架帮助各个系统将那些相似的业务逻辑抽离出来,单独部署,而前台系统在需要调用这些业务逻辑时,只需要通过服务框架远程调用即可,大大节

一起谈.NET技术,NHibernate3.0剖析:Query篇之NHibernate.Linq标准查询

系列引入 NHibernate3.0剖析系列分别从Configuration篇.Mapping篇.Query篇.Session策略篇.应用篇等方面全面揭示NHibernate3.0新特性和应用及其各种应用程序的集成,基于NHibernte3.0版本.如果你还不熟悉NHibernate,可以快速阅读NHibernate之旅系列文章导航系列入门,如果你已经在用NHibernate了,那么请跟上NHibernate3.0剖析系列吧. NHibernate专题:http://kb.cnblogs.com