NET 下RabbitMQ实践 [实战篇]

  之前的文章中,介绍了如何将RabbitMQ以WCF方式进行发布。今天就介绍一下我们产品中如何使用RabbitMQ的!
  在Discuz!NT企业版中,提供了对HTTP错误日志的记录功能,这一点对企业版非常重要,另外存储错误日志使用了MongoDB,理由很简单,MongoDB的添加操作飞快,即使数量过亿之后插入速度依旧不减。    
  在开始正文之前,先说明一下本文的代码分析顺序,即:程序入口==》RabbitMQ客户端===>RabbitMQ服务端。好了,闲话少说,开始正文!    
  首先是程序入口,也就是WCF+RabbitMQ客户端实现:因为Discuz!NT使用了HttpModule方式来接管HTTP链接请求,而在.NET的HttpModule模板中,可以通过如下方法来接管程序运行时发生的ERROR,如下:

  context.Error += new EventHandler(Application_OnError);

  而“记录错误日志"的功能入口就在这里:

public void Application_OnError(Object sender, EventArgs e)

{

string requestUrl = DNTRequest.GetUrl();

HttpApplication application = (HttpApplication)sender;

HttpContext context = application.Context;#if EntLib

if (RabbitMQConfigs.GetConfig() != null && RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)//当开启errlog错误日志记录功能时

{

RabbitMQClientHelper.GetHttpModuleErrLogClient().AsyncAddLog(new HttpModuleErrLogData(LogLevel.High, context.Server.GetLastError().ToString()));//异步方式

//RabbitMQHelper.GetHttpModuleErrLogClient().AddLog(new HttpModuleErrLogData(LogLevel.High, "wrong message infomation!"));//同步方式

return;

}

#endif

...

}

  当然从代码可以看出,记录日志的工作基本是通过配置文件控制的,即“HttpModuleErrLog.Enable”。而RabbitMQClientHelper是一个封装类,主要用于反射生成IHttpModuleErrlogClient接口实例,该实例就是“基于WCF发布的RabbitMQ”的客户端访问对象。

/// <summary>

/// RabbitMQ

/// </summary>

public class RabbitMQClientHelper

{

static IHttpModuleErrlogClient ihttpModuleErrLogClient;

private static object lockHelper = new object();

public static IHttpModuleErrlogClient GetHttpModuleErrLogClient()

{

if (ihttpModuleErrLogClient == null)

{

lock (lockHelper)

{

if (ihttpModuleErrLogClient == null)

{

try

{

if (RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)

{

ihttpModuleErrLogClient = (IHttpModuleErrlogClient)Activator.CreateInstance(Type.GetType(

"Discuz.EntLib.RabbitMQ.Client.HttpModuleErrLogClient, Discuz.EntLib.RabbitMQ.Client", false, true));

}

}

catch

{

throw new Exception("请检查 Discuz.EntLib.RabbitMQ.dll 文件是否被放置到了bin目录下!");

}

}

}

}

return ihttpModuleErrLogClient;

}

}

  可以看出它反射的是Discuz.EntLib.RabbitMQ.dll文件的HttpModuleErrLogClient对象(注:使用反射的原因主要是解决企业版代码与普遍版代码在项目引用上的相互依赖),下面就是其接口和具体要求实现:

/// <summary>

/// IHttpModuleErrlogClient 客户端接口类,用于反射实例化绑定

/// </summary>

public interface IHttpModuleErrlogClient

{

void AddLog(HttpModuleErrLogData httpModuleErrLogData);

void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData);

}

public class HttpModuleErrLogClient : IHttpModuleErrlogClient

{

public void AddLog(HttpModuleErrLogData httpModuleErrLogData)

{

try

{

//((RabbitMQBinding)binding).OneWayOnly = true;

Channel
Factory<IHttpModuleErrLogService> m_factory = new ChannelFactory<IHttpModuleErrLogService>(GetBinding(), "soap.amqp:///HttpModuleErrLogService");

m_factory.Open();

IHttpModuleErrLogService m_client = m_factory.CreateChannel();

m_client.AddLog(httpModuleErrLogData);

((IClientChannel)m_client).Close();

m_factory.Close();

}

catch (System.Exception e)

{

string msg = e.Message;

}

}

private delegate void delegateAddLog(HttpModuleErrLogData httpModuleErrLogData);

public void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData)

{

delegateAddLog AddLog_aysncallback = new delegateAddLog(AddLog);

AddLog_aysncallback.BeginInvoke(httpModuleErrLogData, null, null);

}

public Binding GetBinding()

{

return new RabbitMQBinding(RabbitMQConfigs.GetConfig().HttpModuleErrLog.RabbitMQAddress);

}

}

  可以看出,AddLog方法与上一篇中的客户端内容基本上没什么太大差别,只不过它提供了同步和异步访问两种方式,这样做的目的主要是用户可根据生产环境来灵活配置。

  下面就来看一下RabbitMQ的服务端实现,首先看一下其运行效果,如下图:

  接着看一下启动rabbitmq服务的代码:

public void StartService(System.ServiceModel.Channels.Binding binding)

{

m_host = new ServiceHost(typeof(HttpModuleErrLogService), new Uri("soap.amqp:///"));

//((RabbitMQBinding)binding).OneWayOnly = true;

m_host.AddServiceEndpoint(typeof(IHttpModuleErrLogService), binding, "HttpModuleErrLogService");

m_host.Open();

m_serviceStarted = true;

}

  上面代码会添加IHttpModuleErrLogService接口实现类HttpModuleErrLogService 的Endpoint,并启动它,下面就是该接口声明:

/// <summary>

/// IHttpModuleErrLogService 接口类

/// </summary>

[ServiceContract]

public interface IHttpModuleErrLogService

{

/// <summary>

/// 添加 httpModuleErrLogData日志信息

/// </summary>

/// <param name="httpModuleErrLogData"></param>

[OperationContract]

void AddLog(HttpModuleErrLogData httpModuleErrLogData);

}

  代码很简单,就是定义了一个添加日志的方法:void AddLog(HttpModuleErrLogData httpModuleErrLogData)

  下面就是接口的具体实现,首先是类声明及初始化代码:

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] //Single - 为所有客户端调用分配一个服务实例。

public class HttpModuleErrLogService : IHttpModuleErrLogService

{

/// <summary>

///
获取 HttpModuleErrLogInfo配置文件对象实例

/// </summary>

private static HttpModuleErrLogInfo httpModuleErrorLogInfo = RabbitMQConfigs.GetConfig().HttpModuleErrLog;

/// <summary>

/// 定时器对象

/// </summary>

private static System.Timers.Timer _timer;

/// <summary>

/// 定时器的时间

/// </summary>

private static int _elapsed = 0;

public static void Initial(System.Windows.Forms.RichTextBox msgBox, int elapsed)

{

_msgBox = msgBox;

_elapsed = elapsed;

//初始定时器

if (_elapsed > 0)

{

_timer = new System.Timers.Timer() { Interval = elapsed * 1000, Enabled = true, AutoReset = true };

_timer.Elapsed += new System.Timers.ElapsedEventHandler(Timer_Elapsed);

_timer.Start();

}

}

/// <summary>

/// 时间到时执行出队操作

/// </summary>

/// <param name="sender"></param>

/// <param name="e"></param>

private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)

{

Dequeue();

}

  可以看出,这里使用了静态定时器对象,来进行定时访问队列信息功能(“非同步出队”操作),这样设计的原因主要是为用户提供适合的配置方式,即如果不使用定时器(为0时),则系统会在日志入队后,就立即启动出队(“同步出队”)操作获取日志信息并插入到MongoDB数据库中。

  下面介绍一下入队操作实现:

/// <summary>

/// 添加 httpModuleErrLogData日志信息

/// </summary>

/// <param name="httpModuleErrLogData"></param>

public void AddLog(HttpModuleErrLogData httpModuleErrLogData)

{

Enqueue(httpModuleErrLogData);

if (_elapsed <=0) //如果使用定时器(为0 时),则立即执行出队操作

Dequeue();

}

/// <summary>

/// 交换机名称

/// </summary>

private const string EXCHANGE = "ex1";

/// <summary>

/// 交换方法,更多内容参见:http://melin.javaeye.com/blog/691265

/// </summary>

private const string EXCHANGE_TYPE = "direct";

/// <summary>

/// 路由key,更多内容参见:http://sunjun041640.blog.163.com/blog/static/256268322010328102029919/

/// </summary>

private const string ROUTING_KEY = "m1";

/// <summary>

/// 日志入队

/// </summary>

/// <param name="httpModuleErrLogData"></param>

public static void Enqueue(HttpModuleErrLogData httpModuleErrLogData)

{

Uri uri = new Uri(httpModuleErrorLogInfo.RabbitMQAddress);

ConnectionFactory cf = new ConnectionFactory()

{

UserName = httpModuleErrorLogInfo.UserName,

Password = httpModuleErrorLogInfo.PassWord,

VirtualHost = "dnt_mq",

RequestedHeartbeat = 0,

Endpoint = new AmqpTcpEndpoint(uri)

};

using (IConnection conn = cf.CreateConnection())

{

using (IModel ch = conn.CreateModel())

{

if (EXCHANGE_TYPE != null)

{

ch.ExchangeDeclare(EXCHANGE, EXCHANGE_TYPE);//,true,true,false,false, true,null);

ch.QueueDeclare(httpModuleErrorLogInfo.QueueName, true);

时间: 2024-09-13 09:31:00

NET 下RabbitMQ实践 [实战篇]的相关文章

NET下RabbitMQ实践 [配置篇]

这个系列目前计划写四篇,分别是配置,示例,WCF发布,实战.当然不排除加餐情况. 介绍: rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.他遵循Mozilla Public License开源协议.采用 Erlang 实现的工业级的消息队列(MQ)服务器. RabbitMQ的官方站:http://www.rabbitmq.com/          AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为线路层协议,而不是API(例如JMS),AM

一起谈.NET技术,NET下RabbitMQ实践 [配置篇]

     这个系列目前计划写四篇,分别是配置,示例,WCF发布,实战.当然不排除加餐情况.      介绍:      rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统.他遵循Mozilla Public License开源协议.采用 Erlang 实现的工业级的消息队列(MQ)服务器.      RabbitMQ的官方站:http://www.rabbitmq.com/          AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,作为线路

NET下RabbitMQ实践 [示例篇]

在上一篇文章中,介绍了在window环境下安装erlang,rabbitmq-server,以免配置用户,权限,虚拟机等内容.今天将会介绍如果使用rabbitmq进行简单的消息入队,出队操作,因为本文演示的环境要用到上文中配置的环境,所以要运行本文sample,请先按上一篇中完成相应环境配置.         首先,我们下载官方的.net客户端软件,链接:http://www.rabbitmq.com/dotnet.html.下载并安装之后,将安装目录下的这两个DLL文件复制到我们示例项目中,并

NET下RabbitMQ实践 [WCF发布篇]

在之前的两篇文章中,主要介绍了RabbitMQ环境配置,简单示例的编写.今天将会介绍如何使用WCF将RabbitMQ列队以服务的方式进行发布. 注:因为RabbitMQ的官方.net客户端中包括了WCF的SAMPLE代码演示,很适合初学者,所以我就偷了个懒,直接对照它的SAMPLE来说明了,算是借花献佛吧,呵呵.首先我们下载相应源码(基于.NET 3.0),本文主要对该源码包中的代码进行讲解,链接如下:    Binary, compiled for .NET 3.0 and newer (zi

一起谈.NET技术,NET下RabbitMQ实践 [WCF发布篇]

在之前的两篇文章中,主要介绍了RabbitMQ环境配置,简单示例的编写.今天将会介绍如何使用WCF将RabbitMQ列队以服务的方式进行发布. 注:因为RabbitMQ的官方.net客户端中包括了WCF的SAMPLE代码演示,很适合初学者,所以我就偷了个懒,直接对照它的SAMPLE来说明了,算是借花献佛吧,呵呵.首先我们下载相应源码(基于.NET 3.0),本文主要对该源码包中的代码进行讲解,链接如下:    Binary, compiled for .NET 3.0 and newer (zi

关于下载GAE High Replication Datastore数据[实战篇]下

通过bulk loader可以批量上传下载数据,GAE支持xml,csv格式数据批量上传,以及xml,csv和文本 格式下载. 你可以选择自动生成一个bulkloader.yaml,或者手动编码来写一个bulk loader.谷歌不推荐手动编 写,个人也觉得自动生成才是王道. 如何自动生成bulkloader.yaml bulkloader.yaml是一个描述数据格式的配置文件,格式例如xml或csv.bulk loader进行数据导入时 需要这个配置文件来将外部数据(xml或csv)转换为中间

[PhalApi实战篇(1)]Redis队列处理异步任务

[PhalApi实战篇(1)]Redis队列处理异步任务 前言 先在这里感谢phalapi框架创始人@dogstar,为我们提供了这样一个优秀的开源框架. 哈喽大家好呀!之前编写的PhalApi入门篇和进阶篇已经过去了好久了,在此之间也回答了很多小伙伴各种各样的问题,这里也希望吧里面一些问的比较多的和比较有趣的以及笔者在使用PhalApi一些新的体会,都提取出来为大家带来一些能够在实际开发中可以使用的技术或思想,那么我们就开始我们实战篇中的第一节 Redis队列处理异步任务 大家希望喵咪在Pha

JBuilder9+Weblogic7实战篇之工具篇(JDBC 1)

JBuilder9+Weblogic7实战篇<?xml:namespace prefix = o ns = "urn:schemas-microsoft-com:office:office" /> 工具篇(配置JDBC 1)                                                                              作者:黄 凯         E_mail:hk_sz@163.com Weblogic7中配置J

工作经常使用的SQL整理,实战篇(二)

原文:工作经常使用的SQL整理,实战篇(二)[原创] 工作经常使用的SQL整理,实战篇,地址一览: 工作经常使用的SQL整理,实战篇(一) 工作经常使用的SQL整理,实战篇(二) 工作经常使用的SQL整理,实战篇(三)   接着上一篇"工作经常使用的SQL整理,实战篇(一)"继续讨论,这一篇中主要讨论增删改查,连接,分组和排序,通配符,视图,存储过程和事务,游标,触发器这些东西. 6.增删改查 插入 --插入用户表数据 insert into Tse_User(UserID, User