在上一篇文章中,介绍了在window环境下安装erlang,rabbitmq-server,以免配置用户,权限,虚拟机等内容。今天将会介绍如果使用rabbitmq进行简单的消息入队,出队操作,因为本文演示的环境要用到上文中配置的环境,所以要运行本文sample,请先按上一篇中完成相应环境配置。
首先,我们下载官方的.net客户端软件,链接:http://www.rabbitmq.com/dotnet.html。下载并安装之后,将安装目录下的这两个DLL文件复制到我们示例项目中,并添加引用:
RabbitMQ.
Client.dll //基于的发布订阅消息的功能类
RabbitMQ.ServiceModel.dll //包括基于WCF方式发布订阅服务模型类
如下图: 接着,我们创建两个类,一个是ProducerMQ.cs(用于产生消息),一个是CustmerMq.cs(用于消费消息),代码如下:
首先是ProducerMQ:
public class ProducerMQ
{
public static void InitProducerMQ()
{
Uri uri = new Uri("amqp://10.0.4.85:5672/");
string exchange = "ex1";
string exchangeType = "direct";
string routingKey = "m1";
bool persistMode = true;
Connection
Factory cf = new ConnectionFactory();
cf.UserName = "daizhj";
cf.Password = "617595";
cf.VirtualHost = "dnt_mq";
cf.RequestedHeartbeat = 0;
cf.Endpoint = new AmqpTcpEndpoint(uri);
using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
if (exchangeType != null)
{
ch.ExchangeDeclare(exchange, exchangeType);//,true,true,false,false, true,null);
ch.QueueDeclare("q1", true);//true, true, true, false, false, null);
ch.QueueBind("q1", "ex1", "m1", false, null);
}
IMapMessageBuilder b = new MapMessageBuilder(ch);
IDictionary target = b.Headers;
target["header"] = "hello world";
IDictionary targetBody = b.Body;
targetBody["body"] = "daizhj";
if (persistMode)
{
((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;
}
ch.BasicPublish(exchange, routingKey,
(IBasicProperties)b.GetContentHeader(),
b.GetContentBody());
}
}
}
}
下面对上面代码进行说明:
1. 定义要链接的rabbitmq-server地址(基于amqp协议):
Uri uri = new Uri("amqp://10.0.4.85:5672/");
2. 定义交换方式
string exchange = "ex1";
string exchangeType = "direct";
string routingKey = "m1";
说明:rabbitmq交换方式分为三种,分别是:
Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
更多内容参见:RabbitMQ 三种Exchange
3. 是否对消息队列持久化保存
bool persistMode = true;
4. 使用ConnectionFactory创建连接,虽然创建时指定了多个server address,但每个connection只与一个物理的server进行连接。
ConnectionFactory cf = new ConnectionFactory();
//使用前文的配置环境信息
cf.UserName = "daizhj";
cf.Password = "617595";
cf.VirtualHost = "dnt_mq";
cf.RequestedHeartbeat = 0;
cf.Endpoint = new AmqpTcpEndpoint(uri);
5. 实例化IConnection对象,并设置交换方式:
using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
if (exchangeType != null)
{
ch.ExchangeDeclare(exchange, exchangeType);//,true,true,false,false, true,null);
ch.QueueDeclare("q1", true);//true, true, true, false, false, null);
ch.QueueBind("q1", "ex1", "m1", false, null);
}
....
6. 构造消息实体对象并发布到消息队列上:
IMapMessageBuilder b = new MapMessageBuilder(ch);
IDictionary target = b.Headers;
target["header"] = "hello world";
IDictionary targetBody = b.Body;
targetBody["body"] = "daizhj";
if (persistMode)
{
((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;
}
//简单发布方式
ch.BasicPublish(exchange, routingKey,
(IBasicProperties)b.GetContentHeader(),
b.GetContentBody());
这样就完成了单条消息的发布。 下面是CustmerMq.cs(用于消费消息)实例代码:
public class CustmerMq
{
public static int InitCustmerMq()
{
string exchange = "ex1";
string exchangeType = "direct";
string routingKey = "m1";
string serverAddress = "10.0.4.85:5672";
ConnectionFactory cf = new ConnectionFactory();
cf.Address = serverAddress;
cf.UserName = "daizhj";
cf.Password = "617595";
cf.VirtualHost = "dnt_mq";
cf.RequestedHeartbeat = 0;
可以看出上面的代码与 ProducerMQ的开头代码类似,下面使用ConnectionFactory来构造链接并接收队列消息:
using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
//普通使用方式BasicGet
//noAck = true,不需要回复,接收到消息后,queue上的消息
就会清除
//noAck = false,需要回复,接收到消息后,queue上的消息不会被清除,直到调用channel.basicAck(deliveryTag, false); queue上的消息才会被清除