问题描述
写了个服务,用多线程来接收消息队列,但是每次测试20万条数据,接收过来都要少几百条数据,百思不得其解……我的业务逻辑:1.定期扫描msmq2.如果有消息,开多线程循环接收3.每个线程接收到消息后放在内存里4.等所有线程结束,用bulkcopy入库现在问题出在第3步,假设消息队列里有20万条数据,每次线程全部结束后内存里只有19万9千条左右,总要少1000条左右……请问有什么可能导致这个问题?下面是代码///<summary>///间隔一定时间(ExecuteTime)调用Receive方法///</summary>publicvoidExecute(objectsource,System.Timers.ElapsedEventArgse){try{System.Timers.Timert=(System.Timers.Timer)source;t.Enabled=false;ReceiveWithTransactional();if(ce.CurrentCount==1){Insert();}t.Enabled=true;}catch(Exceptionex){EventLog.WriteEntry(ex.Message);}}///<summary>///带事务接收消息队列///</summary>privatevoidReceiveWithTransactional(){//onExec=true;varmessageQueue=newMessageQueue(".\private$\testQueue"){Formatter=newXmlMessageFormatter(newType[]{typeof(string)})};//messageQueue.PeekCompleted+=newPeekCompletedEventHandler(OnPeekCompleted);using(varmessageEnumerator=messageQueue.GetMessageEnumerator2()){while(messageEnumerator.MoveNext()){if(ce.CurrentCount<=2000){Threadthread=newThread(newThreadStart(delegate{MultiReceive(messageQueue);})){IsBackground=true};thread.Start();}}}}privatevoidMultiReceive(MessageQueuemessageQueue){MessageQueueTransactionmessageQueueTransaction=newMessageQueueTransaction();try{ce.AddCount();messageQueueTransaction.Begin();Messagemessage=messageQueue.Receive(MessageQueueTransactionType.Automatic);messageQueueTransaction.Commit();if(message!=null){stringmessageObj=message.Body.ToString();stringdesc=string.Empty;Newtonsoft.Json.Linq.JObjectjObj=null;if(!Check(messageObj,refdesc,refjObj)){LY.Frame.LogHandle.Write("QueueError","接收消息队列失败1:"+desc+"|"+messageObj);}}else{LY.Frame.LogHandle.Write("QueueError","接收消息队列失败:message为null");}}catch(Exceptionex){LY.Frame.LogHandle.Write("QueueError","接收消息队列失败2:"+ex.Message);messageQueueTransaction.Abort();ce.Signal();//onExec=false;}finally{ce.Signal();}}///<summary>///检查json串///</summary>privateboolCheck(stringmessage,refstringdescription,refNewtonsoft.Json.Linq.JObjectjObj){try{jObj=Newtonsoft.Json.JsonConvert.DeserializeObject<Newtonsoft.Json.Linq.JObject>(message);}catch(Exceptione){description="json串格式错误";returnfalse;}if(jObj==null){description="json串反序列化为null";returnfalse;}if(!jObj.HasValues){description="json串内容为空";returnfalse;}if(jObj["queuetype"]==null){description="缺少queuetype字段";returnfalse;}try{switch(jObj["queuetype"].ToString().Trim().ToLower()){case"register":if(jObj["xxx"]==null){description="register接口缺少必要字段";returnfalse;}Entity.Registerreg=newEntity.Register();reg.xxx=jObj["xxx"].ToString();Entity.Register.RegisterList.Add(reg);break;default:description="未知类型";returnfalse;}}catch(Exceptione){description="请求JSON串转换失败,"+e.Message;returnfalse;}returntrue;}///<summary>///批量入库///</summary>publicvoidInsert(){try{DataTabledtReg=ToDataTable<Entity.Register>(Entity.Register.RegisterList);using(SqlBulkCopysqlbulkcopy=newSqlBulkCopy(connstr,SqlBulkCopyOptions.UseInternalTransaction)){if(dtReg!=null&&dtReg.Rows.Count>0){sqlbulkcopy.DestinationTableName="T_xxx";sqlbulkcopy.ColumnMappings.Add("xxx","xxx");sqlbulkcopy.WriteToServer(dtReg);Entity.Register.RegisterList.Clear();}}}catch(Exceptionex){EventLog.WriteEntry(ex.Message);LY.Frame.LogHandle.Write("QueueError","入库失败:"+ex.Message);}}
解决方案
本帖最后由 specializedcaiguai 于 2015-11-27 20:33:51 编辑
解决方案二:
啊啊啊求大神关注
解决方案三:
你就问问自己代码里有地方保证一定能接收到发送来的数据吗?
解决方案四:
引用2楼shingoscar的回复:
你就问问自己代码里有地方保证一定能接收到发送来的数据吗?
接收到一条少一条,消息队列里空了,那肯定是调用了足够多的receive方法啊
解决方案五:
把你的Thread去掉,直接顺序调用MultiReceive(messageQueue)就可以了。不要滥用线程。