.NET平台下可复用的Tcp通信层实现(续)

     上一篇主要讲到了Tcp通信层中的核心组件――Tcp组件的实现,Tcp组件是整个通信层的消息驱动源,甚至,可以将Tcp组件看作是我们整个服务器系统的消息驱动源,消息处理过程从这里引发。类似的消息驱动源还有发布的WebService接口、Remoting接口等。今天我们需要关注的是Tcp通信层中的“中央”组件――消息分派器组件ITcpReqStreamDispatcher,大家已经从前文的组件关系图中看到了消息分派器的大致位置和作用了,它是Tcp通信组件和消息处理器之间的“桥梁”。我们再对前文描述的通信层组件之间关系的一段话回顾一下:
    “当网络(Tcp)组件从某个Tcp连接上接收到一个请求时,会将请求转发给消息分派器,消息分派器通过IDataStreamHelper组件获取请求消息的类型,然后根据此类型要求处理器工厂创建对应类型的请求处理器,请求处理器处理请求并返回结果。接下来再由网络组件把结果返回给终端用户。在消息分派器进行请求消息分派之前,可能涉及一系列的操作,像消息加密/解密、消息分裂/重组、消息验证等。”
    上面的描述中已经体现出了消息分派器的主要职责,在理解了消息分派器职责的基础上,我们可以进一步来看看消息分派器的定义和实现了。 

二.消息分派器组件

1.消息分派器组件接口的定义
    消息分派器的接口很简单:

    public interface ITcpReqStreamDispatcher : IReqestStreamDispatcher 
    {        
        ArrayList DealRequestMessage(RequestData requestData ,out byte[] leftData ,ref RequestValidation validation) ;//同步回复
        bool      DealRequestMessage(RequestData requestData , NetworkStream userStream ,out byte[] leftData) ; //异步回复        
    }

    这个接口只有两个方法,第二个方法用于异步发送回复(即绕开Tcp组件发送回复),该方法的核心部分可以由第一个方法实现,我们把注意力放在第一个方法上,而Tcp组件与消息分派器进行交互的也正是第一个方法。我先解释一下这个方法的几个参数的含义:
    RequestData是对请求消息的封装:

    //从网络接收到的原始数据的封装
    public class RequestData
    {
        public int  ConnectID = 0 ;
        public bool IsFirstMsg = false ; //标志是否为连接建立后的第一条消息
        public byte[] Buff     = null ; //接收数据缓冲区 ,可能其头部包含上次未处理完的数据
        public int ValidCount  = 0 ; //缓冲区中有效字节的个数 >= 本次接收的字节数        
    }

    前面已经提到过,ConnectID用于标志每一个Tcp连接,IsFirstMsg用于表明是否为tcp连接建立后的第一个消息,因为我们可能需要对第一个消息进行额外的验证,比如,果第一个消息不是登录请求,就关闭该Tcp连接。
    第二个参数leftData,表示RequestData.Buff中的数据经过消息分裂器分裂之后余下的数据(一条非完整的消息),这些数据被Tcp组件用来放在下一次收到的数据的头部进行消息重组。
    第三个参数validation,是个ref参数,用于通知Tcp组件对消息验证的结果,如果验证失败,Tcp组件将关闭对应的Tcp连接。
    该方法的返回值是回复的集合,每一个回复对应一个请求,而RequestData.Buff中的数据可能分裂成多个请求。另外要注意,有些请求可能是没有回复消息的。
    在我们的Tcp组件的两种实现中,都可以看到类似下面的与消息分派器交互的语句:

                //处理请求    
                byte[] leftData = null ;                
                ArrayList repondList = this.messageDispatcher.DealRequestMessage(key.RequestData  ,out leftData , ref key.Validation) ;
                
                if(this.validateRequest)
                {
                    if(key.Validation.gotoCloseConnection)
                    {
                        this.DisposeOneConnection(streamHashCode ,key.Validation.cause) ;
                    }
                }

  
2.消息分派器组件基本元素的实现
    正如在实现Tcp组件之前需要构建一些基本元素,在实现消息分派器之前也是如此,用于支持消息分派器实现的基本元素包括:IDataStreamHelper、消息分裂器、消息处理器工厂、ITcpStreamDispatcherHook等。
(1)IDataStreamHelper消息分裂器
    IDataStreamHelper,前文中已经提到,IDataStreamHelper用于从请求/回复消息中提取消息的“元数据”,并提供一些辅助方法,每个特定的应用,它们对IDataStreamHelper的实现可能是不一样的。IDataStreamHelper接口定义如下:

     /// <summary>
    /// IDataStreamHelper 通信协议的面向流辅助设施。
    /// </summary>
    public interface IDataStreamHelper :IStringEncoder
    {
        int MaxRecieveBuffSize{get ;} //接收缓冲区的大小
        int MessageHeaderLength{get ;} //消息头的长度
        int OffsetOfLengthField{get ;} //表示消息长度的字段在消息头中的偏移

        IDataStreamHeader ParseMessageHeader(byte[] data ,int offset) ; //解析消息头

        LengthTypeInHeader LengthTypeInHeader{get ;} 
        
        byte[] GetRespondWhenFailure(byte[] reqData ,ServiceFailureType failType) ;    //根据服务失败类型获取失败回复消息
        byte[] GetRespondWhenFailure(byte[] reqData ,string errorMsg) ;            
    }

    /// <summary>
    /// StringEncoder 限定字符串编码格式
    /// </summary>
    public interface IStringEncoder
    {
        string GetStrFromStream(byte[] stream ,int offset ,int len) ;
        byte[] GetBytesFromStr(string ss) ;
    }

    /// <summary>
    /// ServiceFailureType 服务失败类型
    /// </summary>
    public enum ServiceFailureType
    {
        InvalidMessge ,ParseFailure ,HandleFailure ,ServiceStopped ,ServiceIsNotExit ,ServerIsBusy 
    }

    IDataStreamHeader即是我们所说的消息的“元数据”,如其名所示,它也是消息的“消息头”。请让我补充说明一下,依照我的经验,消息由消息头Header和消息主体Body组成,消息头用于存放消息的“元数据”等信息,而消息主体用于存放与特定请求相关的数据。消息头的长度固定,比如都是64字节或都是128字节。请求消息和回复消息公用相同格式的消息头。我们来看看消息头接口IDataStreamHeader的定义:

    public interface IDataStreamHeader
    {
        int MessageLength    {get ;set ;} //本消息长度
        int TypeKey            {get ;set ;} //请求的目录类型
        int ServiceKey        {get ;set ;} //请求类型
        int ServiceItemIndex{get ;set ;} //请求细分索引
        int RandomNum        {get ;set ;} //用于将回复与请求一一对应起来        
        int Result            {get ;set ;} //服务结果    
    
        string UserID        {get ;set ;} //发出请求的用户编号

        byte[] ToDataStream() ;              //将消息头转化为流,流的长度位消息头的长度
        void   ToDataStream(byte[] buff ,int offset);    
    }

    需要解释一下TypeKey、ServiceKey、ServiceItemIndex,我们实际上将服务类型分为三级,可以举个不太恰当的例子让大家有个感性的认识。比如,生活中的衣、食、住、行可以作为不同的TypeKey,而“衣”中的春装、冬装可作为ServiceKey,而“春装”中的T恤、夹克可作为ServiceItemIndex。对于服务的类型,你可以根据自己的意愿分成任意层级,但据我的经验,通常情况下,三层已经够用了。 

(2)消息分裂器
    前面已经多次提到消息分裂器MessageSplitter,它用于将接收缓冲区中的数据分裂成一个个完整的消息,并且把余下的非完整数据返回,其接口定义如下:

public interface IMessageSplitter
    {
        void Initialize(int maxBuffSize ,int headerLen ,int offSetLenField ,LengthTypeInHeader lenType) ;
        ArrayList SplitRequestMsgs(byte[] buff ,int validCount , out byte[] leftData) ;//ArrayList 中每条记录都是是byte[],表示一个完整的请求
    }

    //消息头中的长度是body长度还是总长度
    public enum LengthTypeInHeader
    {
        TotalLen ,BodyLen 
    }

    其中,Initialize方法中的参数都可以由IDataStreamHeader提供。leftData是余下的非完整消息的数据。SplitRequestMsgs方法返回的集合中是一条条完整的请求消息。

(3)消息处理器工厂
    消息处理器工厂根据消息的类型(TypeKey、ServiceKey)创建对应的消息处理器来出来该消息,其接口定义如下:

    public interface IRequestDealerFactory
    {
        IRequestDealer CreateDealer(int requestType ,int serverTypeKey)  ;//serverTypeKey 比如城市代号

        event CbackRequestRecieved RequestRecieved ;
    }

    CreateDealer方法返回的IRequestDealer就是消息处理器,每一个消息处理器用于处理某种特定类型(ServiceKey)的所有请求。通常,可以将消息处理器封装成插件DLL,以实现功能服务的“热插拔”。 

(4)消息处理器
    消息处理器IRequestDealer定义如下:

    public interface IRequestDealer
    {        
        byte[]  DealRequestMessage(RoundedRequestMsg reqMsg ) ;//同步回复

        event CbackRequestRecieved RequestRecieved ;
    }

    public delegate void CbackRequestRecieved(RoundedRequestMsg roundedMsg) ;

    /// <summary>
    /// RoundedRequestMsg 对应于一条完整的请求
    /// </summary>
    public struct RoundedRequestMsg
    {
        public int ConnectID ; //请求所对应的Tcp连接
        public byte[] Data ;
    }

    RoundedRequestMsg.Data是经消息分裂器分裂得到的一个完整的请求消息,一个字节不多、一个字节也不少。 

(5)ITcpStreamDispatcherHook
    ITcpStreamDispatcherHook是一个Hook,它为用户提供了一个自定义的对请求/回复消息进行操作的插入点。ITcpStreamDispatcherHook由TcpStreamDispatcher使用,用于对请求消息和回复消息进行截获,然后处理或转换这些消息,比如常用的处理/转换操作包括:加密/解密、消息验证等等。ITcpStreamDispatcherHook定义如下:

    /// <summary>
    /// ITcpStreamDispatcherHook 由TcpStreamDispatcher使用,用于对请求消息和回复消息进行截获,然后处理转换这些消息,
    /// 比如加密/解密。  
    /// </summary>
    public interface ITcpStreamDispatcherHook
    {
        //转换消息
        byte[] CaptureRequestMsg(byte[] roundedMsg) ;
        byte[] CaptureRespondMsg(byte[] roundedMsg) ;

        //验证消息,以下验证的消息是还没有被捕获的消息
        bool VerifyFirstMsgOfUser(byte[] roundedMsg ,ref RequestValidation validation) ;
        bool VerifyOtherMessage(byte[]   roundedMsg ,ref RequestValidation validation) ;
    }

    关于这个接口中各方法的含义可以在消息分派器的实现中更好的领会! 

3.  消息分派器实现
    在前述的基本元素的基础上,实现消息分派器非常简单,我们来看其核心方法DealRequestMessage的实现源码:

      private IMessageSplitter               curMsgSplitter = new MessageSpliter() ;
      private IDataStreamHelper            curMsgHelper ;  //必须设置
      private IRequestDealerFactory       curDealerFactory ;  //必须设置
      private ITcpStreamDispatcherHook tcpStreamDispatcherHook ;

       public ArrayList DealRequestMessage(RequestData requestData, out byte[] leftData, ref RequestValidation validation)
        {
            //消息分裂
            ArrayList respondList = new ArrayList() ;
            ArrayList reqList = this.curMsgSplitter.SplitRequestMsgs(requestData.Buff ,requestData.ValidCount ,out leftData) ;
            if(reqList == null)
            {
                return respondList ;
            }                

            bool verified = true ;

            for(int i=0; i<reqList.Count ;i++)
            {        
                byte[] theData = (byte[])reqList[i] ;

                #region 验证消息                
                if(requestData.IsFirstMsg && (i == 0))
                {                        
                    verified = this.tcpStreamDispatcherHook.VerifyFirstMsgOfUser(theData ,ref validation) ;                    
                }
                else
                {                            
                    verified = this.tcpStreamDispatcherHook.VerifyOtherMessage(theData ,ref validation ) ;                    
                }

                if(! verified)
                {
                    if(validation.gotoCloseConnection)
                    {
                        return null ;
                    }

                    this.AddRespondToList(respondList ,this.curMsgHelper.GetRespondWhenFailure(theData ,ServiceFailureType.InvalidMessge)) ;
                    continue ;
                }
                #endregion
                
                //接插,捕获/转换请求消息
                byte[] reqData = this.tcpStreamDispatcherHook.CaptureRequestMsg(theData) ;                

                #region 处理消息
                //处理消息
                IDataStreamHeader header = this.curMsgHelper.ParseMessageHeader(reqData ,0);                
                IRequestDealer dealer = this.curDealerFactory.CreateDealer(header.ServiceKey ,header.TypeKey) ;
                if(dealer == null)
                {
                    this.AddRespondToList(respondList ,this.curMsgHelper.GetRespondWhenFailure(reqData ,ServiceFailureType.ServiceIsNotExit)) ;
                    continue ;
                }

                RoundedRequestMsg roundReqMsg = new RoundedRequestMsg();
                roundReqMsg.ConnectID = requestData.ConnectID ;
                roundReqMsg.Data = reqData ;    
                try
                {
                    byte[] respondData = dealer.DealRequestMessage(roundReqMsg) ;
                    
                    if(respondData != null)
                    {
                        this.AddRespondToList(respondList ,respondData) ;
                    }
                }
                catch(Exception ee)
                {                    
                    this.AddRespondToList(respondList , this.curMsgHelper.GetRespondWhenFailure(reqData ,ee.Message)) ;
                }    
                #endregion
            }

            return respondList;
        }

        //将回复消息加密后放入list
        private void AddRespondToList(ArrayList list ,byte[] theRespondData)
        {
            //接插,捕获/转换回复消息
            byte[] respondData = this.tcpStreamDispatcherHook.CaptureRespondMsg(theRespondData) ;    

            list.Add(respondData) ;
        }

    如果你是一直按顺序读下来的,理解上面的实现一定不成什么问题。到这里,Tcp通信层的所有重要的设施基本都已介绍完毕,最后,给出了提示,即,在你的应用中,如何使用这个可复用的Tcp通信层。步骤如下:
(1)实现IDataStreamHelper接口。
(2)实现IReqestStreamDispatcher接口,如果采用的是Tcp协议,则可直接使用参考实现TcpStreamDispatcher
(3)实现各种请求处理器,这些处理器实现IRequestDealer接口。
(4)实现IRequestDealerFactory接口。 

    接下来,还有什么?其实,还有很多,都可以提高到框架的层次,以便复用。比如,前面我们处理消息都是基于流(byte[])的形式,在此基础上,我们可以更上一层,采用基于对象的形式――即,将请求消息和回复消息都封装成类,这就涉及了流的解析(流=>对象)和对象序列化(消息对象=>流)问题;另外,我们甚至可以将Tcp用户管理纳入到框架的高度,以进行复用,比如,通常基于Tcp服务的系统都需要管理在线的Tcp用户,并记录Tcp用户请求服务的具体信息、在线时间等,这些经过良好的分析概括都可以提高到复用的高度。以后有时间,我会将这样的经验和大家分享。

    最后,把EnterpriseServerBase类库中的Network命名空间中的源码和大家共享,希望对大家有所帮助!(另,该命名空间中已经包含了上述的基于对象的消息和Tcp用户管理的可复用组件)。点击下载

 

 

 

时间: 2024-10-23 00:02:19

.NET平台下可复用的Tcp通信层实现(续)的相关文章

.NET平台下可复用的Tcp通信层实现

    2006年已经来临,回首刚走过的2005,心中感慨万千.在人生和生活的目标上,有了清晰明确的定位,终于知道了自己喜欢什么样的生活,喜欢什么样的生活方式:在技术上,成熟了不少,眼界也开阔的不少,从面向对象到组件.从.Net到J2EE.从微软到开源,颇有收获.特别值得一提的是,认识了Rod Johnson这个大牛人,也终于在自己的项目中正式使用Spring.net框架来开发了,这确实是一个优秀的框架.而在已经到来的2006年,我有一个主要目标就是B/S应用开发,来填补自己在企业级开发上的另一

.NET下可复用的TCP通信层实现之TCP组件

    2006年已经来临,回首刚走过的2005,心中感慨万千.在人生和生活的目标上,有了清晰明确的定位,终于知道了自己喜欢什么样的生活,喜欢什么样的生活方式:在技术上,成熟了不少,眼界也开阔的不少,从面向对象到组件.从.Net到J2EE.从微软到开源,颇有收获.特别值得一提的是,认识了Rod Johnson这个大牛人,也终于在自己的项目中正式使用Spring.net框架来开发了,这确实是一个优秀的框架.而在已经到来的2006年,我有一个主要目标就是B/S应用开发,来填补自己在企业级开发上的另一

.NET可复用TCP通信层之消息分派器组件

上一篇主要讲到了Tcp通信层中的核心组件――Tcp组件的实现,Tcp组件是整个通信层的消息驱动源,甚至,可以将Tcp组件看作是我们整个服务器系统的消息驱动源,消息处理过程从这里引发.类似的消息驱动源还有发布的WebService接口.Remoting接口等.今天我们需要关注的是Tcp通信层中的"中央"组件――消息分派器组件ITcpReqStreamDispatcher,大家已经从前文的组件关系图中看到了消息分派器的大致位置和作用了,它是Tcp通信组件和消息处理器之间的"桥梁&

C# tcp 通信时,KeepALive 时自动断开 问题 急,急,急求大侠帮助

问题描述 在.net平台下使用c#进行tcp通信,客户端发送命令服务器端收到后执行命令,服务器端执行时间大概几个小时,然后返回给客户端结果.在此期间客户端使用阻塞的方式Read方法读取,为了防止TCP在长链接时断开,我启用了c#tcp中的KeepAlive机制,每三分钟发一次心跳包.但是过了一个多小时read()方法触发异常,"无法从传输连接中读取数据:你的主机中的软件终止了一个已建立的连接.",我用wireshark查看,发现触发异常时候心跳包还能正常接收.不知道什么原因导致read

两地局域网都接在互联网上,怎么实现两地udp/tcp通信

问题描述 求高手解答,需要在路由上做端口映射吗? 解决方案 解决方案二:应该需要的,6个字真麻烦解决方案三:两端都需要端口映射解决方案四:引用2楼zhouqinghe24的回复: 两端都需要端口映射 映射端口号一致是吧,然后使用winsock控件绑定的IP就是内网IP,端口号是映射端口吗?还是采用其他方法,我看他们FTP通信的.求思路解决方案五:引用1楼xdashewan的回复: 应该需要的,6个字真麻烦 除了winsock控件的tcp/udp还有别的通信方式吗解决方案六:引用1楼xdashew

不同WINDOWS平台下磁盘逻辑扇区的直接读写

不同WINDOWS平台下磁盘逻辑扇区的直接读写 关键字:VWIN32.中断.DeviceIoControl 一.概述 在DOS操作系统下,通过BIOS的INT13.DOS的INT25(绝对读).INT26(绝对写)等功能调用实现对磁盘逻辑扇区或物理扇区的读写是很方便的,C语言中还有对应上述功能调用的函数:biosdisk.absread和abswrite等.但在WINDOWS操作系统下编写WIN32应用程序时却再也不能直接使用上述的中断调用或函数了.那么,在WINDOWS操作系统下能不能实现磁盘

连接-TCP通信的数据格式该怎么定义???

问题描述 TCP通信的数据格式该怎么定义??? 请问TCP长连接该怎么定义什么样的数据格式才不会粘包呢?大牛们请指教!谢谢! 解决方案 粘包没关系,只要能正确拆包就行.比如约定: 发送,先发4个字节表述数据的长度,再发数据. 接受,先接受4个取得长度,再按长度读取数据.剩下的就是下个包的. 解决方案二: 自己定义包的结构,里面定义字段定义包的长度等,收到数据后,根据协议解析数据包等

postgresql在windows平台下的安装

window 经过了一天一夜的折磨,终于让postgresql正常的运行在我的计算机上了,尽管还有些不稳定,但总算是可以用了,废话少说,下面就说说我的配置过程: 1.搞来最新的postgresql for windows版本的,我用的是7.31(***,这个怎么象鬼子的那个细菌部队?打倒日本帝国主义!!!),开始默认安装.不知道为什么这个鸟玩艺儿为什么不能选择安装路径,也许是我没有找到?不过我前前后后安装了二十几遍也没有发现,如果那位大虾发现了请告诉我一声,^O^.安装完成了呢,系统会提示你重新

.Net平台下开发中文语音应用程序

程序|中文 .Net平台下开发中文语音应用程序 -------------------------------------------------------------------------------- 摘要:语音是人类最自然的交互方式,也是现阶段软件用户界面发展的最高目标.微软公司一直积极推动语音技术的发展,并且公布了语音开发平台Speech SDK帮助开发人员实现语音应用.随着.net技术深入人心,越来越多的程序员开始转到.net平台上进行开发.然而,在新发布的.net speech