这里用到了一些技术点,比如平台调用、反射,多线程等,当然还有iocp和winsock的api,及 GCHandle,SafeHandle,Marshal类的使用等,不过相当多的东西,我上篇帖子讲的都很细了,如果对 winsock api不了解可以查阅MSDN。也没什么技术难点,说几个细节的地方吧。
1、.net自带的System.Threading.NativeOverlapped类型是完全按照win32的Overlapped结构实现的, 因为我们在WSASend和WSAReceive的时候想要传递更多的数据,而不只是一个重叠结构,所以我自己定义 了一个WaOverlapped,在原有结构的末尾加了一个指针,指向一个自定义类的GC句柄,这样在工作线程里 就可以拿到自定义的单IO数据了,这个是我想了N种办法不行后的一个可行的办法。
2、注意GCHandle在取到数据后不用的话记着Free掉,否则就有可能造成内存泄漏。
3、如果调用WSASend或者WSAReceive返回6的话,多半是你准备的单IO数据不对,6表示无效的句柄。
4、如果传递给WSASend或者WSAReceive的Overlapped没pin住,会抛异常的,等不到 GetLastWin32Error,所以用GCHandle.Alloc(PerIoData.Overlapped, GCHandleType.Pinned)把它pin住 。
5、这个类还没有进行各方面的优化,其中的单IO数据,socket等都可以做成对象池来重用,Accept还 可以替换成AcceptEx来用一个现成的Socket来接受新的连接,而不是自动创建一个新的,还有缓冲区可以 做成环状的,关于性能方面的优化,下次有机会再给大家做实验。
完整代码如下,windows2008打开不安全代码进行编译,然后可以用telnet进行测试。
using System;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Runtime.ConstrainedExecution;
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Win32.SafeHandles;
namespace WawaSocket.Net.Iocp
{
用IOCP和winsock api实现一个echo服务器#region 用IOCP和winsock api实现一个echo服务器
class IocpTest
{
private static readonly IntPtr INVALID_HANDLE_VALUE = new IntPtr(-1); //无 效句柄
const int PORT = 5150; //要监听的端口
const int DATA_BUFSIZE = 8192; //默认缓冲区
const int ERROR_IO_PENDING = 997; //表示数据正在接受或者发送中
const uint INIFINITE = 0xffffffff; //表示等待无限时长
private static readonly Logger _logger = Logger.GetLogger(typeof (IocpTest));
单IO数据#region 单IO数据
[StructLayout(LayoutKind.Sequential)]
class PerIoOperationData
{
public WaOverlapped Overlapped;
public WSABuffer DataBuf;
public readonly byte[] Buffer = new byte[DATA_BUFSIZE];
public uint BytesSEND;
public uint BytesRECV;
}
#endregion
单句柄数据#region 单句柄数据
[StructLayout(LayoutKind.Sequential)]
class PerHandleData
{
public SafeSocketHandle Socket;
}
#endregion
public static void Run()
{
WSAData wsaData;
SocketError Ret;
初始化套接字#region 初始化套接字
_logger.Log("初始化socket");
if ((Ret = Win32Api.WSAStartup(0x0202, out wsaData)) != SocketError.Success)
{
_logger.Error("WSAStartup failed with error {0}\n", Ret);
return;
}
#endregion
创建一个完成端口内核对象#region 创建一个完成端口内核对象
_logger.Log("创建完成端口");
// Setup an I/O completion port.
SafeFileHandle CompletionPort = Win32Api.CreateIoCompletionPort (INVALID_HANDLE_VALUE, IntPtr.Zero, IntPtr.Zero, 0);
if (CompletionPort.IsInvalid)
{
_logger.Error("CreateIoCompletionPort failed with error: {0}\n", Marshal.GetLastWin32Error());
Marshal.ThrowExceptionForHR(Marshal.GetLastWin32Error ());
return;
}
#endregion
创建工作线程#region 创建工作线程
int processorCount = Environment.ProcessorCount;
_logger.Log("创建{0}个工作线程", processorCount);
for (int i = 0; i < processorCount; i++)
{
// Create a server worker thread and pass the completion port to the thread.
var thread = new Thread(ThreadProc);
thread.Start(CompletionPort);
}
#endregion
创建监听用的套接字#region 创建监听用的套接字
_logger.Log("创建监听套接字");
// Create a listening socket
SafeSocketHandle Listen = Win32Api.WSASocket (AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp, IntPtr.Zero, 0, SocketConstructorFlags.WSA_FLAG_OVERLAPPED);
if (Listen.IsInvalid)
{
Listen.SetHandleAsInvalid();
_logger.Error("WSASocket() failed with error {0}\n", Win32Api.WSAGetLastError());
Marshal.ThrowExceptionForHR(Win32Api.WSAGetLastError ());
return;
}
#endregion
将套接字与本地端口绑定#region 将套接字与本地端口绑定
IPEndPoint InternetAddr = new IPEndPoint(IPAddress.Any, PORT);
SocketAddress socketAddress = InternetAddr.Serialize();
byte[] adress_buffer;
int adress_size;
_logger.Log("进行套接字绑定");
if (!DoBind(Listen, socketAddress, out adress_buffer, out adress_size))
{
_logger.Error("bind() failed with error {0}\n", Win32Api.WSAGetLastError());
Marshal.ThrowExceptionForHR(Win32Api.WSAGetLastError ());
return;
}
#endregion
开始监听端口#region 开始监听端口
_logger.Log("开始监听:{0}-{1}", InternetAddr.Address, InternetAddr.Port);
// Prepare socket for listening
if (Win32Api.listen(Listen, 5) == SocketError.SocketError)
{
_logger.Error("listen() failed with error {0}\n", Win32Api.WSAGetLastError());
Marshal.ThrowExceptionForHR(Win32Api.WSAGetLastError ());
return;
}
#endregion
起一个循环来接受新连接#region 起一个循环来接受新连接
// Accept connections and assign to the completion port.
while (true)
unsafe
{
接受新连接#region 接受新连接
_logger.Log("开始接受入站连接");
SafeSocketHandle Accept = Win32Api.accept (Listen.DangerousGetHandle(), adress_buffer, ref adress_size);
if (Accept.IsInvalid)
{
_logger.Error("WSAAccept() failed with error {0}\n", Win32Api.WSAGetLastError());
Marshal.ThrowExceptionForHR (Win32Api.WSAGetLastError());
}
_logger.Log("有新连接进入:{0}", Accept.GetHashCode ());
#endregion
创建单句柄数据#region 创建单句柄数据
// Create a socket information structure to associate with the socket
PerHandleData PerHandleData = new PerHandleData ();
GCHandle gch_PerHandleData = GCHandle.Alloc (PerHandleData);
// Associate the accepted socket with the original completion port.
PerHandleData.Socket = Accept;
#endregion
把新接受的套接字与完成端口绑定#region 把新接受的套 接字与完成端口绑定
SafeFileHandle iocp = Win32Api.CreateIoCompletionPort(Accept.DangerousGetHandle(),
CompletionPort.DangerousGetHandle(),
GCHandle.ToIntPtr(gch_PerHandleData), 0);
if (iocp == null)
{
_logger.Error("CreateIoCompletionPort failed with error {0}\n", Marshal.GetLastWin32Error());
Marshal.ThrowExceptionForHR (Marshal.GetLastWin32Error());
return;
}
#endregion
准备单IO数据#region 准备单IO数据
// Create per I/O socket information structure to associate with the
// WSARecv call below.
PerIoOperationData PerIoData = new PerIoOperationData();
GCHandle gchPerIoData = GCHandle.Alloc (PerIoData);
PerIoData.Overlapped = new WaOverlapped { State = ((IntPtr)gchPerIoData) };
GCHandle gcHandle = GCHandle.Alloc (PerIoData.Overlapped, GCHandleType.Pinned);
PerIoData.BytesSEND = 0;
PerIoData.BytesRECV = 0;
PerIoData.DataBuf.Length = DATA_BUFSIZE;
PerIoData.DataBuf.Pointer = Marshal.UnsafeAddrOfPinnedArrayElement(PerIoData.Buffer, 0);
#endregion
开始投递异步接受数据的请求#region 开始投递异步接受 数据的请求
SocketFlags Flags = SocketFlags.None;
_logger.Log("开始异步接受数据");
int RecvBytes;
SocketError error = Win32Api.WSARecv(Accept, ref PerIoData.DataBuf,
1, out RecvBytes, ref Flags, gcHandle.AddrOfPinnedObject(),
IntPtr.Zero);
if (error == SocketError.SocketError)
{
if (Win32Api.WSAGetLastError() != ERROR_IO_PENDING)
{
_logger.Error("WSARecv() failed with error {0}\n", Win32Api.WSAGetLastError());
Marshal.ThrowExceptionForHR (Win32Api.WSAGetLastError());
//其实在主线程退出之前都应该用 PostQueuedCompletionStatus通知工作线程退出
return;
}
}
#endregion
}
#endregion
}