虽然Java平台有不少相当不错的一步socket的框架,比如Netty,naga等,但是我们应该知其然并知其所以然。
nio是java New IO 的简称,从jdk1.4里提供的新api,Sun官方标榜的特性如下:
为所有的原始类型提供 (Buffer) 缓存支持
字符集编码解码解决方案
Channel:一个新的原始 I/O 抽象
支持锁和内存映射文件的文件访问接口
提供多路 (non-bloking) 非阻塞式的高伸缩性网络 I/O
基本原理
NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的SocketChannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的SocketChannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。
Selector内部原理实际是在做一个对所注册的channel的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,他就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容
基于nio写起来的代码有点罗嗦,不如用框架写起来那么爽。这个服务端实现的功能比较简单,还是只把客户端发过来的数据再返回去。
服务端
初始化Selector
代码如下 | 复制代码 |
private Selector initSelector() throws IOException { Selector socketSelector = SelectorProvider.provider().openSelector(); this.serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port); serverChannel.socket().bind(isa); serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT); return socketSelector; } |
接受传入的连接
代码如下 | 复制代码 |
private void accept(SelectionKey key) throws IOException { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = serverSocketChannel.accept(); Socket socket = socketChannel.socket(); socketChannel.configureBlocking(false); socketChannel.register(this.selector, SelectionKey.OP_READ); LOGGER.info("ACCEPT: " + socket.getInetAddress().toString()); } |
接受数据
代码如下 | 复制代码 |
private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); this.readBuffer.clear(); int numRead; try { numRead = socketChannel.read(this.readBuffer); } catch (IOException e) { key.cancel(); socketChannel.close(); return; } if (numRead == -1) { key.channel().close(); key.cancel(); return; } // Hand the data off to our worker thread this.worker.processData(this, socketChannel, this.readBuffer.array(), numRead); } |
发送数据
代码如下 | 复制代码 |
private void write(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); List<ByteBuffer> queue = this.pendingData.get(socketChannel); while (!queue.isEmpty()) { ByteBuffer buf = queue.get(0); socketChannel.write(buf); if (buf.remaining() > 0) { break; } queue.remove(0); } if (queue.isEmpty()) { key.interestOps(SelectionKey.OP_READ); } } |
客户端
客户端的初始化相当简单
代码如下 | 复制代码 |
private Selector initSelector() throws IOException { return SelectorProvider.provider().openSelector(); } |
接受数据
代码如下 | 复制代码 |
private void read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); this.readBuffer.clear(); int numRead; try { numRead = socketChannel.read(this.readBuffer); } catch (IOException e) { key.cancel(); socketChannel.close(); return; } if (numRead == -1) { key.channel().close(); key.cancel(); return; } // Handle the response this.handleResponse(socketChannel, this.readBuffer.array(), numRead); } |
发送数据
代码如下 | 复制代码 |
private void write(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); List<ByteBuffer> queue = this.pendingData.get(socketChannel); while (!queue.isEmpty()) { ByteBuffer buf = queue.get(0); socketChannel.write(buf); if (buf.remaining() > 0) { break; } queue.remove(0); } if (queue.isEmpty()) { key.interestOps(SelectionKey.OP_READ); } } |
辅助类
EchoWorker负责把接受到的客户端数据再发送回去,你具体的业务逻辑可以通过类似的方法来实现。
代码如下 | 复制代码 |
import java.nio.channels.SocketChannel; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class EchoWorker implements Runnable { private BlockingQueue<ServerDataEvent> queue = new LinkedBlockingQueue<ServerDataEvent>(); public void processData(NioServer server, SocketChannel socket, byte[] data, int count) { byte[] dataCopy = new byte[count]; System.arraycopy(data, 0, dataCopy, 0, count); try { queue.put(new ServerDataEvent(server, socket, dataCopy)); } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { while (true) { try { ServerDataEvent dataEvent = queue.take(); // Return to sender dataEvent.server.send(dataEvent.socket, dataEvent.data); } catch (InterruptedException e) { e.printStackTrace(); } } } } |
ChangeRequest负责处理SocketChannel的状态
代码如下 | 复制代码 |
import java.nio.channels.SocketChannel; public class ChangeRequest { public static final int REGISTER = 1; public static final int CHANGEOPS = 2; public SocketChannel socket; public int type; public int ops; public ChangeRequest(SocketChannel socket, int type, int ops) { this.socket = socket; this.type = type; this.ops = ops; } } |
RspHandler是负责处理客户端数据,在实际的案例中,可以在此类中处理你自己的业务逻辑。
代码如下 | 复制代码 |
public class RspHandler { private byte[] rsp = null; public synchronized boolean handleResponse(byte[] rsp) { this.rsp = rsp; this.notify(); return true; } public synchronized void waitForResponse() { while (this.rsp == null) { try { this.wait(); } catch (InterruptedException e) { } } System.out.println("PROCESS MESSAGE: " + new String(this.rsp)); } } |
启动
代码如下 | 复制代码 |
// 客户端启动 public static void main(String[] args) throws UnknownHostException, IOException { for (int i = 0; i < 10; i++) { try { NioClient client = new NioClient(InetAddress.getLocalHost(), 9090); Thread t = new Thread(client); t.setDaemon(true); t.start(); RspHandler handler = new RspHandler(); client.send(("TEST MESSAGE " + i + 10).getBytes(), handler); handler.waitForResponse(); } catch (Exception e) { e.printStackTrace(); } } } // 服务端启动 public static void main(String[] args) { try { EchoWorker worker = new EchoWorker(); new Thread(worker).start(); new Thread(new NioServer(null, 9090, worker)).start(); } catch (IOException e) { e.printStackTrace(); } } |