之前的文章中,介绍了如何将RabbitMQ以WCF方式进行发布。今天就介绍一下我们产品中如何使用RabbitMQ的!
在Discuz!NT企业版中,提供了对HTTP错误日志的记录功能,这一点对企业版非常重要,另外存储错误日志使用了MongoDB,理由很简单,MongoDB的添加操作飞快,即使数量过亿之后插入速度依旧不减。
在开始正文之前,先说明一下本文的代码分析顺序,即:程序入口==》RabbitMQ客户端===>RabbitMQ服务端。好了,闲话少说,开始正文!
首先是程序入口,也就是WCF+RabbitMQ客户端实现:因为Discuz!NT使用了HttpModule方式来接管HTTP链接请求,而在.NET的HttpModule模板中,可以通过如下方法来接管程序运行时发生的ERROR,如下:
context.Error += new EventHandler(Application_OnError);
而“记录错误日志"的功能入口就在这里:
public void Application_OnError(Object sender, EventArgs e)
{
string requestUrl = DNTRequest.GetUrl();
HttpApplication application = (HttpApplication)sender;
HttpContext context = application.Context;#if EntLib
if (RabbitMQConfigs.GetConfig() != null && RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)//当开启errlog错误日志记录功能时
{
RabbitMQClientHelper.GetHttpModuleErrLogClient().AsyncAddLog(new HttpModuleErrLogData(LogLevel.High, context.Server.GetLastError().ToString()));//异步方式
//RabbitMQHelper.GetHttpModuleErrLogClient().AddLog(new HttpModuleErrLogData(LogLevel.High, "wrong message infomation!"));//同步方式
return;
}
#endif
...
}
当然从代码可以看出,记录日志的工作基本是通过配置文件控制的,即“HttpModuleErrLog.Enable”。而RabbitMQClientHelper是一个封装类,主要用于反射生成IHttpModuleErrlogClient接口实例,该实例就是“基于WCF发布的RabbitMQ”的客户端访问对象。
/// <summary>
/// RabbitMQ
/// </summary>
public class RabbitMQClientHelper
{
static IHttpModuleErrlogClient ihttpModuleErrLogClient;
private static object lockHelper = new object();
public static IHttpModuleErrlogClient GetHttpModuleErrLogClient()
{
if (ihttpModuleErrLogClient == null)
{
lock (lockHelper)
{
if (ihttpModuleErrLogClient == null)
{
try
{
if (RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)
{
ihttpModuleErrLogClient = (IHttpModuleErrlogClient)Activator.CreateInstance(Type.GetType(
"Discuz.EntLib.RabbitMQ.Client.HttpModuleErrLogClient, Discuz.EntLib.RabbitMQ.Client", false, true));
}
}
catch
{
throw new Exception("请检查 Discuz.EntLib.RabbitMQ.dll 文件是否被放置到了bin目录下!");
}
}
}
}
return ihttpModuleErrLogClient;
}
}
可以看出它反射的是Discuz.EntLib.RabbitMQ.dll文件的HttpModuleErrLogClient对象(注:使用反射的原因主要是解决企业版代码与普遍版代码在项目引用上的相互依赖),下面就是其接口和具体要求实现:
/// <summary>
/// IHttpModuleErrlogClient 客户端接口类,用于反射实例化绑定
/// </summary>
public interface IHttpModuleErrlogClient
{
void AddLog(HttpModuleErrLogData httpModuleErrLogData);
void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData);
}
public class HttpModuleErrLogClient : IHttpModuleErrlogClient
{
public void AddLog(HttpModuleErrLogData httpModuleErrLogData)
{
try
{
//((RabbitMQBinding)binding).OneWayOnly = true;
Channel
Factory<IHttpModuleErrLogService> m_factory = new ChannelFactory<IHttpModuleErrLogService>(GetBinding(), "soap.amqp:///HttpModuleErrLogService");
m_factory.Open();
IHttpModuleErrLogService m_client = m_factory.CreateChannel();
m_client.AddLog(httpModuleErrLogData);
((IClientChannel)m_client).Close();
m_factory.Close();
}
catch (System.Exception e)
{
string msg = e.Message;
}
}
private delegate void delegateAddLog(HttpModuleErrLogData httpModuleErrLogData);
public void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData)
{
delegateAddLog AddLog_aysncallback = new delegateAddLog(AddLog);
AddLog_aysncallback.BeginInvoke(httpModuleErrLogData, null, null);
}
public Binding GetBinding()
{
return new RabbitMQBinding(RabbitMQConfigs.GetConfig().HttpModuleErrLog.RabbitMQAddress);
}
}
可以看出,AddLog方法与上一篇中的客户端内容基本上没什么太大差别,只不过它提供了同步和异步访问两种方式,这样做的目的主要是用户可根据生产环境来灵活配置。
下面就来看一下RabbitMQ的服务端实现,首先看一下其运行效果,如下图:
接着看一下启动rabbitmq服务的代码:
public void StartService(System.ServiceModel.Channels.Binding binding)
{
m_host = new ServiceHost(typeof(HttpModuleErrLogService), new Uri("soap.amqp:///"));
//((RabbitMQBinding)binding).OneWayOnly = true;
m_host.AddServiceEndpoint(typeof(IHttpModuleErrLogService), binding, "HttpModuleErrLogService");
m_host.Open();
m_serviceStarted = true;
}
上面代码会添加IHttpModuleErrLogService接口实现类HttpModuleErrLogService 的Endpoint,并启动它,下面就是该接口声明:
/// <summary>
/// IHttpModuleErrLogService 接口类
/// </summary>
[ServiceContract]
public interface IHttpModuleErrLogService
{
/// <summary>
/// 添加 httpModuleErrLogData日志信息
/// </summary>
/// <param name="httpModuleErrLogData"></param>
[OperationContract]
void AddLog(HttpModuleErrLogData httpModuleErrLogData);
}
代码很简单,就是定义了一个添加日志的方法:void AddLog(HttpModuleErrLogData httpModuleErrLogData)
下面就是接口的具体实现,首先是类声明及初始化代码:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] //Single - 为所有客户端调用分配一个服务实例。
public class HttpModuleErrLogService : IHttpModuleErrLogService
{
/// <summary>
///
获取 HttpModuleErrLogInfo配置文件对象实例
/// </summary>
private static HttpModuleErrLogInfo httpModuleErrorLogInfo = RabbitMQConfigs.GetConfig().HttpModuleErrLog;
/// <summary>
/// 定时器对象
/// </summary>
private static System.Timers.Timer _timer;
/// <summary>
/// 定时器的时间
/// </summary>
private static int _elapsed = 0;
public static void Initial(System.Windows.Forms.RichTextBox msgBox, int elapsed)
{
_msgBox = msgBox;
_elapsed = elapsed;
//初始定时器
if (_elapsed > 0)
{
_timer = new System.Timers.Timer() { Interval = elapsed * 1000, Enabled = true, AutoReset = true };
_timer.Elapsed += new System.Timers.ElapsedEventHandler(Timer_Elapsed);
_timer.Start();
}
}
/// <summary>
/// 时间到时执行出队操作
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
Dequeue();
}
可以看出,这里使用了静态定时器对象,来进行定时访问队列信息功能(“非同步出队”操作),这样设计的原因主要是为用户提供适合的配置方式,即如果不使用定时器(为0时),则系统会在日志入队后,就立即启动出队(“同步出队”)操作获取日志信息并插入到MongoDB数据库中。
下面介绍一下入队操作实现:
/// <summary>
/// 添加 httpModuleErrLogData日志信息
/// </summary>
/// <param name="httpModuleErrLogData"></param>
public void AddLog(HttpModuleErrLogData httpModuleErrLogData)
{
Enqueue(httpModuleErrLogData);
if (_elapsed <=0) //如果使用定时器(为0 时),则立即执行出队操作
Dequeue();
}
/// <summary>
/// 交换机名称
/// </summary>
private const string EXCHANGE = "ex1";
/// <summary>
/// 交换方法,更多内容参见:http://melin.javaeye.com/blog/691265
/// </summary>
private const string EXCHANGE_TYPE = "direct";
/// <summary>
/// 路由key,更多内容参见:http://sunjun041640.blog.163.com/blog/static/256268322010328102029919/
/// </summary>
private const string ROUTING_KEY = "m1";
/// <summary>
/// 日志入队
/// </summary>
/// <param name="httpModuleErrLogData"></param>
public static void Enqueue(HttpModuleErrLogData httpModuleErrLogData)
{
Uri uri = new Uri(httpModuleErrorLogInfo.RabbitMQAddress);
ConnectionFactory cf = new ConnectionFactory()
{
UserName = httpModuleErrorLogInfo.UserName,
Password = httpModuleErrorLogInfo.PassWord,
VirtualHost = "dnt_mq",
RequestedHeartbeat = 0,
Endpoint = new AmqpTcpEndpoint(uri)
};
using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
if (EXCHANGE_TYPE != null)
{
ch.ExchangeDeclare(EXCHANGE, EXCHANGE_TYPE);//,true,true,false,false, true,null);
ch.QueueDeclare(httpModuleErrorLogInfo.QueueName, true);