求教多线程接收消息队列的问题

问题描述

写了个服务,用多线程来接收消息队列,但是每次测试20万条数据,接收过来都要少几百条数据,百思不得其解……我的业务逻辑:1.定期扫描msmq2.如果有消息,开多线程循环接收3.每个线程接收到消息后放在内存里4.等所有线程结束,用bulkcopy入库现在问题出在第3步,假设消息队列里有20万条数据,每次线程全部结束后内存里只有19万9千条左右,总要少1000条左右……请问有什么可能导致这个问题?下面是代码///<summary>///间隔一定时间(ExecuteTime)调用Receive方法///</summary>publicvoidExecute(objectsource,System.Timers.ElapsedEventArgse){try{System.Timers.Timert=(System.Timers.Timer)source;t.Enabled=false;ReceiveWithTransactional();if(ce.CurrentCount==1){Insert();}t.Enabled=true;}catch(Exceptionex){EventLog.WriteEntry(ex.Message);}}///<summary>///带事务接收消息队列///</summary>privatevoidReceiveWithTransactional(){//onExec=true;varmessageQueue=newMessageQueue(".\private$\testQueue"){Formatter=newXmlMessageFormatter(newType[]{typeof(string)})};//messageQueue.PeekCompleted+=newPeekCompletedEventHandler(OnPeekCompleted);using(varmessageEnumerator=messageQueue.GetMessageEnumerator2()){while(messageEnumerator.MoveNext()){if(ce.CurrentCount<=2000){Threadthread=newThread(newThreadStart(delegate{MultiReceive(messageQueue);})){IsBackground=true};thread.Start();}}}}privatevoidMultiReceive(MessageQueuemessageQueue){MessageQueueTransactionmessageQueueTransaction=newMessageQueueTransaction();try{ce.AddCount();messageQueueTransaction.Begin();Messagemessage=messageQueue.Receive(MessageQueueTransactionType.Automatic);messageQueueTransaction.Commit();if(message!=null){stringmessageObj=message.Body.ToString();stringdesc=string.Empty;Newtonsoft.Json.Linq.JObjectjObj=null;if(!Check(messageObj,refdesc,refjObj)){LY.Frame.LogHandle.Write("QueueError","接收消息队列失败1:"+desc+"|"+messageObj);}}else{LY.Frame.LogHandle.Write("QueueError","接收消息队列失败:message为null");}}catch(Exceptionex){LY.Frame.LogHandle.Write("QueueError","接收消息队列失败2:"+ex.Message);messageQueueTransaction.Abort();ce.Signal();//onExec=false;}finally{ce.Signal();}}///<summary>///检查json串///</summary>privateboolCheck(stringmessage,refstringdescription,refNewtonsoft.Json.Linq.JObjectjObj){try{jObj=Newtonsoft.Json.JsonConvert.DeserializeObject<Newtonsoft.Json.Linq.JObject>(message);}catch(Exceptione){description="json串格式错误";returnfalse;}if(jObj==null){description="json串反序列化为null";returnfalse;}if(!jObj.HasValues){description="json串内容为空";returnfalse;}if(jObj["queuetype"]==null){description="缺少queuetype字段";returnfalse;}try{switch(jObj["queuetype"].ToString().Trim().ToLower()){case"register":if(jObj["xxx"]==null){description="register接口缺少必要字段";returnfalse;}Entity.Registerreg=newEntity.Register();reg.xxx=jObj["xxx"].ToString();Entity.Register.RegisterList.Add(reg);break;default:description="未知类型";returnfalse;}}catch(Exceptione){description="请求JSON串转换失败,"+e.Message;returnfalse;}returntrue;}///<summary>///批量入库///</summary>publicvoidInsert(){try{DataTabledtReg=ToDataTable<Entity.Register>(Entity.Register.RegisterList);using(SqlBulkCopysqlbulkcopy=newSqlBulkCopy(connstr,SqlBulkCopyOptions.UseInternalTransaction)){if(dtReg!=null&&dtReg.Rows.Count>0){sqlbulkcopy.DestinationTableName="T_xxx";sqlbulkcopy.ColumnMappings.Add("xxx","xxx");sqlbulkcopy.WriteToServer(dtReg);Entity.Register.RegisterList.Clear();}}}catch(Exceptionex){EventLog.WriteEntry(ex.Message);LY.Frame.LogHandle.Write("QueueError","入库失败:"+ex.Message);}}

解决方案

本帖最后由 specializedcaiguai 于 2015-11-27 20:33:51 编辑
解决方案二:
啊啊啊求大神关注
解决方案三:
你就问问自己代码里有地方保证一定能接收到发送来的数据吗?
解决方案四:
引用2楼shingoscar的回复:

你就问问自己代码里有地方保证一定能接收到发送来的数据吗?

接收到一条少一条,消息队列里空了,那肯定是调用了足够多的receive方法啊
解决方案五:
把你的Thread去掉,直接顺序调用MultiReceive(messageQueue)就可以了。不要滥用线程。

时间: 2024-10-03 00:29:33

求教多线程接收消息队列的问题的相关文章

消息队列简记

消息队列简记 在了解了信号量和共享内存之后,消息队列自然就比较容易理解了. 之前提到共享内存的操作不是原子的, 那么便可以结合信号量来进行控制. 消息队列是另外一种进程间通信的手段, 使用以下几个函数调用.         #include <sys/types.h>         #include <sys/ipc.h>         #include <sys/msg.h>//一般上述两个头文件都被此文件包含         int msgget(key_t k

java-Java httpserver 多线程接收问题

问题描述 Java httpserver 多线程接收问题 我使用了java 自带的com.sun.net.httpserver,使用多线程接收消息,但是当我在一个线程中加入死循环,客户端第二个请求就接收不到,求大神指点 HttpServerProvider provider = HttpServerProvider.provider(); HttpServer httpserver =provider.createHttpServer(new InetSocketAddress(port), r

消息队列(一)——消息的简单发送与接收

    背景           开发者经常遇到需要异步执行操作的情况(即过程不等到操作完成就开始).消息队列提供一个中心位置或池,您可以在其中放置或从中提取数据,从而满足了这一要求.一个应用程序能够把消息存放在队列中,然后继续自己的业务,另一个应用程序在运行时再提取这些数据.   简单理解                             感觉这里的消息队列还是一个典型的"buffer"思想:即就像喝水一样,如果有一杯水,我可能就直接喝掉了:但是如果有一壶水,我可能要先把水倒进

深入浅出Win32多线程设计之MFC的多线程-线程与消息队列(经典)

1.创建和终止线程 在MFC程序中创建一个线程,宜调用AfxBeginThread函数.该函数因参数不同而具有两种重载版本,分别对应工作者线程和用户接口(UI)线程. 工作者线程 CWinThread *AfxBeginThread( AFX_THREADPROC pfnThreadProc, //控制函数 LPVOID pParam, //传递给控制函数的参数 int nPriority = THREAD_PRIORITY_NORMAL, //线程的优先级 UINT nStackSize =

消息队列入门(四)ActiveMQ的应用实例

部署和启动ActiveMQ 去官网下载:http://activemq.apache.org/ 我下载的是apache-activemq-5.12.0-bin.tar.gz, 解压到本地目录,进入到bin路径下, 运行activemq启动ActiveMQ. 运行方式:启动 ./activemq start ActiveMQ默认使用的TCP连接端口是61616, 5.0以上版本默认启动时,开启了内置的Jetty服务器,可以进入控制台查看管理. 启动ActiveMQ以后,登陆:http://loca

消息队列入门(二)消息队列的规范和开源实现

1.AMQP规范 AMQP 是 Advanced Message Queuing Protocol,即高级消息队列协议.AMQP不是一个具体的消息队列实现,而 是一个标准化的消息中间件协议.目标是让不同语言,不同系统的应用互相通信,并提供一个简单统一的模型和编程接口. 目前主流的ActiveMQ和RabbitMQ都支持AMQP协议. AMQP相关的角色和职责 Producer 消息生产者 一个给exchange发送消息的程序,发送方式大致是:它首先创建一个空消息,然后填上内容.路由KEY,最后发

PHP高级编程之消息队列

PHP高级编程之消息队列 http://netkiller.github.io/journal/php.mq.html Mr. Neo Chen (陈景峰), netkiller, BG7NYT 中国广东省深圳市龙华新区民治街道溪山美地518131+86 13113668890+86 755 29812080<netkiller@msn.com> 版权声明 转载请与作者联系,转载时请务必标明文章原始出处和作者信息及本声明. 文档出处: http://netkiller.github.io ht

EQueue - 一个C#写的开源分布式消息队列的总体介绍

前言 本文想介绍一下前段时间在写enode时,顺便实现的一个分布式消息队列equeue.这个消息队列的思想不是我想出来的,而是通过学习阿里的rocketmq后,自己用c#实现了一个轻量级的简单版本.一方面可以通过写这个队列让自己更深入的掌握消息队列的一些常见问题:另一方面也可以用来和enode集成,为enode中的command和domain event的消息传递提供支持.目前在.net平台,比较好用的消息队列,最常见的是微软的MSMQ了吧,还有像rabbitmq也有.net的client端.这

C#分布式消息队列 EQueue 2.0 发布啦

前言 最近花了我几个月的业余时间,对EQueue做了一个重大的改造,消息持久化采用本地写文件的方式.到现在为止,总算完成了,所以第一时间写文章分享给大家这段时间我所积累的一些成果. EQueue开源地址:https://github.com/tangxuehua/equeue EQueue相关文档:http://www.cnblogs.com/netfocus/category/598000.html EQueue Nuget地址:http://www.nuget.org/packages/eq