C#实现异步消息队列

原文:C#实现异步消息队列

拿到新书《.net框架设计》,到手之后迅速读了好多,虽然这本书不像很多教程一样从头到尾系统的讲明一些知识,但是从项目实战角度告诉我们如何使用我们的知识,从这本书中提炼了一篇,正好符合我前几篇的“数据驱动框架”设计的问题;

消息队列

消息队列英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件贮列用来处理一系列的输入,通常是来自使用者。消息队列提供了异步通信协议,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入装置的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。

简单的说队列就是贮存了我们需要处理的Command但是并不是及时的拿到其处理结果;

实现

实际上,消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。

目前,有很多消息队列有很多开源的实现,包括JBoss MessagingJORAMApache ActiveMQSun Open Message QueueApache Qpid和HTTPSQS。

优点,缺点

消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。例如WWW中使用的HTTP协议是同步的,因为客户端在发出请求后必须等待服务器回应。然而,很多情况下我们需要异步的通信协议。比如,一个进程通知另一个进程发生了一个事件,但不需要等待回应。但消息队列的异步特点,也造成了一个缺点,就是接收者必须轮询消息队列,才能收到最近的消息。

信号相比,消息队列能够传递更多的信息。与管道相比,消息队列提供了有格式的数据,这可以减少开发人员的工作量。但消息队列仍然有大小限制。

读取队列消息

主要有两种(1)服务端的推;(2)客户端的拉;

拉:主要是客户端定时轮询拿走消息处理;

推:通过事件订阅方式主动通知订阅者进行处理;

消息的贮存

简单的是通过内存链表实现贮存;也可以借助DB,比如Redis;还可以持久到本地文件中;

如何保证异步处理的一致性

尽管队列主要目的是实现消息贮存,同时将调用与实现异步化。但是如果想达到处理消息一致性,好的方式是区别业务处理顺序,比如操作主从DB,主负责写,从负责读,我们没有机会在写之后立马从读数据库拿到你想要的结果;同时我们需要借助中间状态,当多个中间状态同时符合调用结果才到到业务时间被处理,否则将“异常消息”持久化,待下次操作;

上代码

建立消息对立核心队列

{
    public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message);

    public class MessageQueue:Queue<BaseMessage>
    {
        public static MessageQueue GlobalQueue = new MessageQueue();

        private Timer timer = new Timer();
        public MessageQueue() {
            this.timer.Interval = 5000;
            this.timer.Elapsed += Notify;
            this.timer.Enabled = true;
        }
        private void Notify(object sender, ElapsedEventArgs e) {
            lock (this) {
                if (this.Count > 0) {
                    //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue());
                    var message = this.Dequeue();
                    this.messageNotifyEvent(message);
                }
            }
        }

        private MessageQueueEventNotifyHandler messageNotifyEvent;
        public event MessageQueueEventNotifyHandler MessageNotifyEvent {
            add {
                this.messageNotifyEvent += value;
            }

            remove {
                if (this.messageNotifyEvent != null) {
                    this.messageNotifyEvent -= value;
                }
            }
        }
    }
}

事件处理

public const string OrderCodePrefix = "P";
        public void Submit(Message.BaseMessage message)
        {
            Order order = message.Body as Order;

            if (order.OrderCode.StartsWith(OrderCodePrefix))
            {
                System.Console.WriteLine("这个是个正确的以({0})开头的订单:{1}", OrderCodePrefix,order.OrderCode);
            }
            else {
                System.Console.WriteLine("这个是个错误的订单,没有以({0})开头:{1}",OrderCodePrefix,order.OrderCode);
            }
        }

可依据具体业务进行个性化处理;

通过Proxy向队列追加消息

public class OrderServiceProxy:IOrderService
    {
        public void Submit(Message.BaseMessage message)
        {
            MessageQueue.MessageQueue.GlobalQueue.Enqueue(message);
        }
    }

客户端调用

OrderService orderService = new OrderService();
            MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit;

            var orders = new List<Order>() {
                new Order(){OrderCode="P001"},
                new Order(){OrderCode="P002"},
                new Order(){OrderCode="B003"}
            };

            OrderServiceProxy proxy = new OrderServiceProxy();
            orders.ForEach(order => proxy.Submit(new Message.BaseMessage() { Body=order}));

            Console.ReadLine();

这样就满足了事件的绑定与触发个性化处理,同时达到了消息异步化的目的,希望更细致的拓展用到后期的项目中。

时间: 2024-09-14 19:17:00

C#实现异步消息队列的相关文章

.NET C# 异步读取Windows消息队列无法实现

问题描述 场景:由于项目需求,在WCF与WindowsService之间需要用到Windows消息队列这个东西WindowsService可以正常写入队列消息,但是,在WCF服务中要去异步读取消息队列中的数据,却不能正常访问得到数据并处理.WindowsService在创建队列实例时已将将权限给到了最高级别Everyone了错误提示:消息队列系统的访问被拒绝.求各位大神,帮帮忙吧,很着急-- 解决方案 解决方案二:遇到同样的问题.求大神解答..

远程消息队列 异步回调

问题描述 我现在要实现的一个功能就是远程服务器里面有个计算函数这个函数里面有个for循环计算,要把每次的计算结果都实时的传递给客户端,现在要满足同时几百个人来同时调用这个函数,我用消息队列来实现,我的思路是这样的,不同的客户调用和服务器连接起来后,在客户端通过指定服务器路径来生成(接受)消息队列对象,在服务器端我直接通过客户的ip传递给服务器端,然后服务器端创建一个和客户端一样的名称的消息队列,从而满足客户端和服务器端同时功用一个消息队列,应该就不会出现上面的问题吧.问题一在服务器端为什么不能够

消息队列入门(二)消息队列的规范和开源实现

1.AMQP规范 AMQP 是 Advanced Message Queuing Protocol,即高级消息队列协议.AMQP不是一个具体的消息队列实现,而 是一个标准化的消息中间件协议.目标是让不同语言,不同系统的应用互相通信,并提供一个简单统一的模型和编程接口. 目前主流的ActiveMQ和RabbitMQ都支持AMQP协议. AMQP相关的角色和职责 Producer 消息生产者 一个给exchange发送消息的程序,发送方式大致是:它首先创建一个空消息,然后填上内容.路由KEY,最后发

消息队列和管道的区别(转载)

转载自:http://bbs.chinaunix.net/viewthread.php?tid=265266 作者:beginner-bj 请问管道和消息队列有什么不同  管道通信(PIPE) 管道通信方式的中间介质是文件,通常称这种文件为管道文件.两个进程利用管道文件进行通信时,一个 进程为写进程,另一个进程为读进程.写进程通过写端(发送端)往管道文件中写入信息:读进程通过读 端(接收端)从管道文件中读取信息.两个进程协调不断地进行写.读,便会构成双方通过管道传递信息 的流水线. 利用系统调用

J2EE的异步消息机制(下)

三.消息驱动豆简介 异步消息也可以由消息驱动豆来实现.在EJB 1.1规范中,定义了两种类型的EJB.分别是实体豆(Entity Bean)和会话豆(Session Bean).客户端通常是以同步的,阻塞方式来调用豆的方法.消息驱动豆将EJB和JMS的功能结合在一起. 正如前述,会话豆通常实现商务逻辑,客户端不能共享一个会话豆.实体豆通常和一些在永久存储中的一些实体条目相对应的.这两种豆通常都有REMOTE和HOME接口,用来与客户端交互.并且,这些交互都是同步的,阻塞方式进行的.比如,一个请求

J2EE的异步消息机制(上)

在分布式企业级应用程序中,异步消息机制用于有效地协调各个部分的工作. J2EE为我们提供了JMS和消息驱动豆(Message-Driven Bean),用来实现应用程序各个部件之间的异步消息传递. 一.什么是消息系统? 通常一个消息系统允许分开的未耦合的应用程序之间可靠地异步通信.在企业应用时,需要一种异步的,非阻塞的消息传递.比如,一个客户端可能希望给一个服务器发送一个请求后,不在乎是否马上能得到回应.这样,客户端没有理由必须等待服务器处理请求.客户端应用程序在递交一个请求之后,只需确保请求到

linux网络编程之POSIX消息队列和系列函数

一.在前面介绍了system v 消息队列的相关知识,现在来稍微看看posix 消息队列. 其实消息队列就是一个可 以让进程间交换数据的场所,而两个标准的消息队列最大的不同可能只是api 函数的不同,如system v 的系列函数是 msgxxx,而posix 是mq_xxx.posix 消息队列也有一些对消息长度等的限制,man 7 mq_overview: simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ cat /proc

WCF分布式开发必备知识(1):MSMQ消息队列

学习WCF是不是就不需要学习.Net Remoting.ASMX.WSE和MSMQ了? 这个问题一直是很多开发者关注的问题.按照微软的说法,WCF是微软分布式应用程序开发的集大成者,学习WCF编程,就不需要了解其他的技术.这个说法有一定的道理.WCF的出现确实解决了很多问题,它整合了.Net平台下所有的和分布式系统有关的技术,例如.Net Remoting.ASMX.WSE和MSMQ.以通信(Communiation)范围而论,它可以跨进程.跨机器.跨子网.企业网乃至于 Internet:可以以

WCF分布式开发步步为赢(13):WCF服务离线操作与消息队列MSMQ

之前曾经写过一个关于MSMQ消息队列的文章:WCF分布式开发必备知识 (1):MSMQ消息队列 ,当时的目的也是用它来作为学习WCF 消息队列MSMQ编程的 基础文章.在那篇文章里,我们详细介绍了MSMQ消息队列的基本概念.安装.部 署.开发.调试等相关问题.今天我们来学习WCF分布式开发步步为赢(13):WCF 服务离线操作与消息队列MSMQ.在WCF框架下使用MSMQ消息队列服务编程. 这 里我会给出一个使用WCF MSMQ实现离线请求的DEMO示例程序. 全文结构是:[1]MSMQ基本概念