<摘录>详谈高性能UDP服务器的开发

上一篇文章我详细介绍了如何开发一款高性能的TCP服务器的网络传输层.本章我将谈谈如何开发一个高性能的UDP服务器的网络层.UDP服务器的网络层开 发相对与TCP服务器来说要容易和简单的多,UDP服务器的大致流程为创建一个socket然后将其绑定到完成端口上并投递一定数量的recv操作.当有 数据到来时从完成队列中取出数据发送到接收队列中即可。
  测试结果如下:
    WindowsXP Professional,Intel Core Duo E4600 双核2.4G , 2G内存。同时30K个用户和该UDP服务器进行交互其CPU使用率为10%左右,内存占用7M左右。
  下面详细介绍该服务器的架构及流程:  
1. 首先介绍服务器的接收和发送缓存UDP_CONTEXT。

 1    class UDP_CONTEXT : protected NET_CONTEXT
 2    {
 3        friend class UdpSer;
 4    protected:
 5        IP_ADDR m_RemoteAddr;            //对端地址
 6
 7        enum
 8        {
 9            HEAP_SIZE = 1024 * 1024 * 5,
10            MAX_IDL_DATA = 10000,
11        };
12
13    public:
14        UDP_CONTEXT() {}
15        virtual ~UDP_CONTEXT() {}
16
17        void* operator new(size_t nSize);
18        void operator delete(void* p);
19
20    private:
21        static vector<UDP_CONTEXT* > s_IDLQue;
22        static CRITICAL_SECTION s_IDLQueLock;
23        static HANDLE s_hHeap;    
24    };

UDP_CONTEXT的实现流程和TCP_CONTEXT的实现流程大致相同,此处就不进行详细介绍。

2. UDP_RCV_DATA,当服务器收到客户端发来的数据时会将数据以UDP_RCV_DATA的形式放入到数据接收队列中,其声明如下:

 1    class DLLENTRY UDP_RCV_DATA
 2    {
 3        friend class UdpSer;
 4    public:
 5        CHAR* m_pData;                //数据缓冲区
 6        INT m_nLen;                    //数据的长度
 7        IP_ADDR m_PeerAddr;            //发送报文的地址
 8
 9        UDP_RCV_DATA(const CHAR* szBuf, int nLen, const IP_ADDR& PeerAddr);
10        ~UDP_RCV_DATA();
11
12        void* operator new(size_t nSize);
13        void operator delete(void* p);
14
15        enum
16        {
17            RCV_HEAP_SIZE = 1024 * 1024 *50,        //s_Heap堆的大小
18            DATA_HEAP_SIZE = 100 * 1024* 1024,    //s_DataHeap堆的大小
19            MAX_IDL_DATA = 250000,
20        };
21
22    private:
23        static vector<UDP_RCV_DATA* > s_IDLQue;
24        static CRITICAL_SECTION s_IDLQueLock;
25        static HANDLE s_DataHeap;        //数据缓冲区的堆
26        static HANDLE s_Heap;            //RCV_DATA的堆
27    };

UDP_RCV_DATA的实现和TCP_RCV_DATA大致相同, 此处不在详细介绍.

下面将主要介绍UdpSer类, 该类主要用来管理UDP服务.其定义如下:

 1    class DLLENTRY UdpSer
 2    {
 3    public:
 4        UdpSer();
 5        ~UdpSer();
 6
 7        /************************************************************************
 8        * Desc : 初始化静态资源,在申请UDP实例对象之前应先调用该函数, 否则程序无法正常运行
 9        ************************************************************************/
10        static void InitReource();
11
12        /************************************************************************
13        * Desc : 在释放UDP实例以后, 掉用该函数释放相关静态资源
14        ************************************************************************/
15        static void ReleaseReource();
16
17        //用指定本地地址和端口进行初始化
18        BOOL StartServer(const CHAR* szIp = "0.0.0.0", INT nPort = 0);
19
20        //从数据队列的头部获取一个接收数据, pCount不为null时返回队列的长度
21        UDP_RCV_DATA* GetRcvData(DWORD* pCount);
22
23        //向对端发送数据
24        BOOL SendData(const IP_ADDR& PeerAddr, const CHAR* szData, INT nLen);
25
26        /****************************************************
27        * Name : CloseServer()
28        * Desc : 关闭服务器
29        ****************************************************/
30        void CloseServer();
31
32    protected:
33        SOCKET m_hSock;
34        vector<UDP_RCV_DATA* > m_RcvDataQue;                //接收数据队列
35        CRITICAL_SECTION m_RcvDataLock;                        //访问m_RcvDataQue的互斥锁
36        long volatile m_bThreadRun;                                //是否允许后台线程继续运行
37        BOOL m_bSerRun;                                            //服务器是否正在运行
38
39        HANDLE *m_pThreads;                //线程数组
40        HANDLE m_hCompletion;                    //完成端口句柄
41
42        void ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped);
43
44        /****************************************************
45        * Name : WorkThread()
46        * Desc : I/O 后台管理线程
47        ****************************************************/
48        static UINT WINAPI WorkThread(LPVOID lpParam);
49    };

1. InitReource() 主要对相关的静态资源进行初始化.其实大致和TcpServer::InitReource()大致相同.在UdpSer实例使用之前必须调用该函数进行静态资源的初始化, 否则服务器无法正常使用.

2.ReleaseReource() 主要对相关静态资源进行释放.只有在应用程序结束时才能调用该函数进行静态资源的释放.

3. StartServer() 
该函数的主要功能启动一个UDP服务.其大致流程为先创建服务器UDP socket, 将其绑定到完成端口上然后投递一定数量的recv操作以接收客户端的数据.其实现如下:

 1    BOOL UdpSer::StartServer(const CHAR* szIp /* =  */, INT nPort /* = 0 */)
 2    {
 3        BOOL bRet = TRUE;
 4        const int RECV_COUNT = 500;
 5        WSABUF RcvBuf = { NULL, 0 };
 6        DWORD dwBytes = 0;
 7        DWORD dwFlag = 0;
 8        INT nAddrLen = sizeof(IP_ADDR);
 9        INT iErrCode = 0;
10
11        try
12        {
13            if (m_bSerRun)
14            {
15                THROW_LINE;
16            }
17
18            m_bSerRun = TRUE;
19            m_hSock = WSASocket(AF_INET, SOCK_DGRAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
20            if (INVALID_SOCKET == m_hSock)
21            {
22                THROW_LINE;
23            }
24            ULONG ul = 1;
25            ioctlsocket(m_hSock, FIONBIO, &ul);
26
27            //设置为地址重用,优点在于服务器关闭后可以立即启用
28            int nOpt = 1;
29            setsockopt(m_hSock, SOL_SOCKET, SO_REUSEADDR, (char*)&nOpt, sizeof(nOpt));
30
31            //关闭系统缓存,使用自己的缓存以防止数据的复制操作
32            INT nZero = 0;
33            setsockopt(m_hSock, SOL_SOCKET, SO_SNDBUF, (char*)&nZero, sizeof(nZero));
34            setsockopt(m_hSock, SOL_SOCKET, SO_RCVBUF, (CHAR*)&nZero, sizeof(nZero));
35
36            IP_ADDR addr(szIp, nPort);
37            if (SOCKET_ERROR == bind(m_hSock, (sockaddr*)&addr, sizeof(addr)))
38            {
39                closesocket(m_hSock);
40                THROW_LINE;
41            }
42
43            //将SOCKET绑定到完成端口上
44            CreateIoCompletionPort((HANDLE)m_hSock, m_hCompletion, 0, 0);
45
46            //投递读操作
47            for (int nIndex = 0; nIndex < RECV_COUNT; nIndex++)
48            {
49                UDP_CONTEXT* pRcvContext = new UDP_CONTEXT();
50                if (pRcvContext && pRcvContext->m_pBuf)
51                {
52                    dwFlag = 0;
53                    dwBytes = 0;
54                    nAddrLen = sizeof(IP_ADDR);
55                    RcvBuf.buf = pRcvContext->m_pBuf;
56                    RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;
57
58                    pRcvContext->m_hSock = m_hSock;
59                    pRcvContext->m_nOperation = OP_READ;            
60                    iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr)
61                        , &nAddrLen, &(pRcvContext->m_ol), NULL);
62                    if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
63                    {
64                        delete pRcvContext;
65                        pRcvContext = NULL;
66                    }
67                }
68                else
69                {
70                    delete pRcvContext;
71                }
72            }
73        }
74        catch (const long &lErrLine)
75        {            
76            bRet = FALSE;
77            _TRACE("Exp : %s -- %ld ", __FILE__, lErrLine);            
78        }
79
80        return bRet;
81    }

4. GetRcvData(), 从接收队列中取出一个数据包.

 1    UDP_RCV_DATA *UdpSer::GetRcvData(DWORD* pCount)
 2    {
 3        UDP_RCV_DATA* pRcvData = NULL;
 4
 5        EnterCriticalSection(&m_RcvDataLock);
 6        vector<UDP_RCV_DATA* >::iterator iterRcv = m_RcvDataQue.begin();
 7        if (iterRcv != m_RcvDataQue.end())
 8        {
 9            pRcvData = *iterRcv;
10            m_RcvDataQue.erase(iterRcv);
11        }
12
13        if (pCount)
14        {
15            *pCount = (DWORD)(m_RcvDataQue.size());
16        }
17        LeaveCriticalSection(&m_RcvDataLock);
18
19        return pRcvData;
20    }

5. SendData() 发送指定长度的数据包.

 1    BOOL UdpSer::SendData(const IP_ADDR& PeerAddr, const CHAR* szData, INT nLen)
 2    {
 3        BOOL bRet = TRUE;
 4        try
 5        {
 6            if (nLen >= 1500)
 7            {
 8                THROW_LINE;
 9            }
10
11            UDP_CONTEXT* pSendContext = new UDP_CONTEXT();
12            if (pSendContext && pSendContext->m_pBuf)
13            {
14                pSendContext->m_nOperation = OP_WRITE;
15                pSendContext->m_RemoteAddr = PeerAddr;        
16
17                memcpy(pSendContext->m_pBuf, szData, nLen);
18
19                WSABUF SendBuf = { NULL, 0 };
20                DWORD dwBytes = 0;
21                SendBuf.buf = pSendContext->m_pBuf;
22                SendBuf.len = nLen;
23
24                INT iErrCode = WSASendTo(m_hSock, &SendBuf, 1, &dwBytes, 0, (sockaddr*)&PeerAddr, sizeof(PeerAddr), &(pSendContext->m_ol), NULL);
25                if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
26                {
27                    delete pSendContext;
28                    THROW_LINE;
29                }
30            }
31            else
32            {
33                delete pSendContext;
34                THROW_LINE;
35            }
36        }
37        catch (const long &lErrLine)
38        {
39            bRet = FALSE;
40            _TRACE("Exp : %s -- %ld ", __FILE__, lErrLine);            
41        }
42
43        return bRet;
44    }

6. CloseServer() 关闭服务

1    void UdpSer::CloseServer()
2    {
3        m_bSerRun = FALSE;
4        closesocket(m_hSock);
5    }

7. WorkThread() 在完成端口上工作的后台线程

 1    UINT WINAPI UdpSer::WorkThread(LPVOID lpParam)
 2    {
 3        UdpSer *pThis = (UdpSer *)lpParam;
 4        DWORD dwTrans = 0, dwKey = 0;
 5        LPOVERLAPPED pOl = NULL;
 6        UDP_CONTEXT *pContext = NULL;
 7
 8        while (TRUE)
 9        {
10            BOOL bOk = GetQueuedCompletionStatus(pThis->m_hCompletion, &dwTrans, &dwKey, (LPOVERLAPPED *)&pOl, WSA_INFINITE);
11
12            pContext = CONTAINING_RECORD(pOl, UDP_CONTEXT, m_ol);
13            if (pContext)
14            {
15                switch (pContext->m_nOperation)
16                {
17                case OP_READ:
18                    pThis->ReadCompletion(bOk, dwTrans, pOl);
19                    break;
20                case OP_WRITE:
21                    delete pContext;
22                    pContext = NULL;
23                    break;
24                }
25            }
26
27            if (FALSE == InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0))
28            {
29                break;
30            }
31        }
32
33        return 0;
34    }

8.ReadCompletion(), 接收操作完成后的回调函数

 1    void UdpSer::ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
 2    {
 3        UDP_CONTEXT* pRcvContext = CONTAINING_RECORD(lpOverlapped, UDP_CONTEXT, m_ol);
 4        WSABUF RcvBuf = { NULL, 0 };
 5        DWORD dwBytes = 0;
 6        DWORD dwFlag = 0;
 7        INT nAddrLen = sizeof(IP_ADDR);
 8        INT iErrCode = 0;
 9
10        if (TRUE == bSuccess && dwNumberOfBytesTransfered <= UDP_CONTEXT::S_PAGE_SIZE)
11        {
12#ifdef _XML_NET_
13            EnterCriticalSection(&m_RcvDataLock);
14
15            UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfered, pRcvContext->m_RemoteAddr);
16            if (pRcvData && pRcvData->m_pData)
17            {
18                m_RcvDataQue.push_back(pRcvData);
19            }    
20            else
21            {
22                delete pRcvData;
23            }
24
25            LeaveCriticalSection(&m_RcvDataLock);
26#else
27            if (dwNumberOfBytesTransfered >= sizeof(PACKET_HEAD))
28            {
29                EnterCriticalSection(&m_RcvDataLock);
30
31                UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfered, pRcvContext->m_RemoteAddr);
32                if (pRcvData && pRcvData->m_pData)
33                {
34                    m_RcvDataQue.push_back(pRcvData);
35                }    
36                else
37                {
38                    delete pRcvData;
39                }
40
41                LeaveCriticalSection(&m_RcvDataLock);
42            }
43#endif
44
45            //投递下一个接收操作
46            RcvBuf.buf = pRcvContext->m_pBuf;
47            RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;
48
49            iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr)
50                , &nAddrLen, &(pRcvContext->m_ol), NULL);
51            if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
52            {
53                ATLTRACE("\r\n%s -- %ld dwNumberOfBytesTransfered = %ld, LAST_ERR = %ld"
54                    , __FILE__, __LINE__, dwNumberOfBytesTransfered, WSAGetLastError());
55                delete pRcvContext;
56                pRcvContext = NULL;
57            }
58        }
59        else
60        {
61            delete pRcvContext;
62        }
63    }

时间: 2024-08-03 08:51:31

<摘录>详谈高性能UDP服务器的开发的相关文章

Netty实现高性能RPC服务器

  在本人写的前一篇文章中,谈及有关如何利用Netty开发实现,高性能RPC服务器的一些设计思路.设计原理,以及具体的实现方案(具体参见:谈谈如何使用Netty开发实现高性能的RPC服务器).在文章的最后提及到,其实基于该方案设计的RPC服务器的处理性能,还有优化的余地.于是利用周末的时间,在原来NettyRPC框架的基础上,加以优化重构,本次主要优化改造点如下: 1.NettyRPC中对RPC消息进行编码.解码采用的是Netty自带的ObjectEncoder.ObjectDecoder(对象

SSDB:高性能数据库服务器

SSDB是一个开源的高性能数据库服务器, 使用Google LevelDB作为存储引擎, 支持T级别的数据, 同时支持类似Redis中的zset和hash等数据结构, 在同时需求高性能和大数据的条件下, 作为Redis的替代方案. 因为SSDB的最初目的是替代Redis, 所以SSDB会经常和Redis进行比较. 我们知道, Redis是经常的"主-从"架构, 虽然可以得到负载均衡以及数据跨地域备份的功能, 但无法实现高可用性. 考虑这种情况, Redis的主和从分别在两个IDC机房,

基于Oracle的高性能动态SQL程序开发_oracle

正在看的ORACLE教程是:基于Oracle的高性能动态SQL程序开发. 摘要:对动态SQL的程序开发进行了总结,并结合笔者实际开发经验给出若干开发技巧. 关键词:动态SQL,PL/SQL,高性能 1. 静态SQLSQL与动态SQL Oracle编译PL/SQL程序块分为两个种:其一为前期联编(early binding),即SQL语句在程序编译期间就已经确定,大多数的编译情况属于这种类型:另外一种是后期联编(late binding),即SQL语句只有在运行阶段才能建立,例如当查询条件为用户输

宝德可控双路和高性能四路服务器通过现场评审和鉴定

9月16日,由深圳市科技中介同业工会主持,中国科学院院士兼中国科学技术大学软件学院院长陈国良和来自清华.北大.南开等全国六所高校的专家组成的鉴定委员会对宝德科技的安全可控双路服务器(PR2920L)和高性能四路服务器(PR4840R)进行了现场评审和鉴定,认为其产品和性能达到了国内领先水平,一致通过了这两个项目的科技成果鉴定,宝德科技集团副总裁马竹茂先生和宝德研究院院长张栩先生带领宝德科技的项目主要研制人员参与了鉴定会. 鉴定委员会认为宝德安全可控双路服务器PR2920L是一款低功耗且面向国家重

第十八章-Delphi客户服务器应用开发(二)(2)

BDE在设计上是面向对象的.在运行时,数据库应用通过建立各种类型的BDE 对象与BDE交互,这些运行的对象用于操作数据库实体如数据库表.查询.BDE的扩展的API支持C.C++.Delphi等对数据库引擎的访问. 在Delphi应用程序中访问数据库是通过调BDE的API函数.Delphi在库单元BDE中提供了大约三十多个API函数和各种BDE消息和结构.由于Delphi应用程序的开发是基于部件的,有关BDE API的调用都嵌入了Delphi可视部件类库,因此,建立数据库应用时可以不必管BDE A

第十八章-Delphi客户服务器应用开发(一)(1)

客户/服务器的开发工作涉及定义客户/服务器的体系结构, 然后再将该结构与其它一些对于客户/服务器的实现至关重要的系统结构和技术集成起来.Delphi 2.0的Client/Sever版支持用户开发客户/服务器结构的应用程序.本章中我们将阐述客户服务器体系结构原理.如何用Delphi构建客户/服务器的环境和Delphi存取远程SQL服务器的编程和注意事项. 18.1 Delphi客户/服务器应用开发原理 18.1.1 客户/服务器体系结构 18.1.1.1 体系结构概述 客户/服务器系统的体系结构

udp通信 接收不到包-win7 下进行UDP通信测试发现UDP服务器无法接收到CLIENT的发送数据这是啥原因?

问题描述 win7 下进行UDP通信测试发现UDP服务器无法接收到CLIENT的发送数据这是啥原因? 进行UDP数据通信的时候,UDP的客户端用WIRESHARK抓包,能很明显的看到数据发出来了,数据格式也是正确的,但是服务那边的就是接受不到数据,接受卡在接受哪里不动了.查看防火墙,防火墙也是关了的:该程序在WINXP下通信都是正确的,哪位大侠能给下解决方案?总觉得是WIN7在哪里设置有问题:另外我查看QQ的设置,开放该应用程序的UDP协议为开放所有协议,还是不行. 解决方案 WIN7下UDP需

《高性能响应式Web开发实战》一导读

前 言 高性能响应式Web开发实战 为什么写这样一本书 作为一名程序员,写书也好,写博客也罢,其实都和写开源程序的性质是一样的,都是想要把自己的知识分享出去.分享是一件非常有成就感同时也是很快乐的事情,因为我们在此过程中会有很多新的想法,会迫不及待地想去实现,也会有很多人来和我们进行交流,探讨其他的一些可能性.最重要的是,对于做分享的人而言,做好分享很难!首先,分享者要对自己讲解的技术有足够的了解,不仅仅是了解如何用它,还要了解它的过去和未来:其次,分享者要能够娓娓道来,要站在受众的立场上考虑他

甲骨文宣布将不再为Intel安腾服务器平台开发任何软件

世界上最大的商业软件开发企业甲骨文(Oracle)今天宣布,将不再为Intel Itanium安腾服务器平台开发任何软件,让这套纯粹IA64架构平台的未来再次蒙上了阴影. 甲骨文进一步指出,微软和红帽此前都已经停止了安腾http://www.aliyun.com/zixun/aggregation/14734.html">平台软件的开发,而作为安腾最忠实拥趸的惠普也没有在其长远细节规划中提到安腾,至于戴尔.IBM,早在2005年就和安腾说拜拜了. 在本月23日,甲骨文(Oracle)曾向媒