java nio 操作(2)异步阻塞 socket实现

一.同步阻塞IO

       
BIO就是阻塞式的IO,网络通信中对于多客户端的连入,服务器端总是与客户端数量一致的线程去处理每个客户端任务,即,客户端与线程数1:1,并且进行读写操作室阻塞的,当有你成千上完的客户端进行连接,就导致服务器不断的建立新的线程,最后导致低通资源不足,后面的客户端不能连接服务器,并且连接入的客户端并不是总是在于服务器进行交互,很可能就只是占用着资源而已。

二.伪异步IO

   伪异步IO对同步IO进行了优化,后端通过一个线程池和任务队列去处理所有客户端的请求,当用完后在归还给线程池,线程池的原理和数据库连接池的原理很像,他减少了线程创建和销毁的时间,无论多少客户端的连接都是这些固定的线程数量去处理,这在大量客户端与服务器信息交互量很少的情况下,是一种很好的处理方式,但是想一种情况,当有一个客户端的读取信息非常慢时,服务器对其的写操作时会很慢,甚至会阻塞很长时间,因为线程池中的线程是有限的,当有客户端需要分配线程时,就会导致新任务在队列中一直等待阻塞的客户端释放线程。当任务队列已经满时,就会有大量的用户发生连接超时。其实,伪异步IO也是同步阻塞IO。

三.javaNIO原理

       为了解决上面的两种阻塞IO的缺陷,Java在1.4版本开始加入NIO也称为new
IO,至于他是什么模型我们先来看这个博客(Java NIO BIO AIO全面剖析)。看完之后就会很清楚了。

目前在UNP划分的角度,异步IO只有AIO。网络通信中,NIO也提供了SocketChannel和ServerSocketChannel两种不同的套接字通道来实现,可以设置阻塞余非阻塞两种模式,为了实现高负载高并发都采取非阻塞的模式。NIO采用缓冲区BUFFER,实现对数据的读写操作,缓冲区是固定大小,并由内部状态记录有多少数据被放入或者取出。与阻塞IO不同,阻塞IO采用阻塞式流(Stream)的方式进行读写,流是单向的只能向一个方向读数据或者写数据,而通道是双向的,可以同时在通道上发送和读取数据,而且是非阻塞的,在没有数据可读可写时可以去做别的事情。

       NIO改进了上面的一对一或者M:N的模型。服务器仅采用一个一个线程去处理所有客户端线程,这就需要创建一个selector,并将其注册到想要监控的信道上(注意是通过channel的方法来实现),并返回一个selectorKey实例(包含通道和select以及感兴趣的操作),selector就好像是一个观察者,通过不断轮询所注册的一组通道上有没有等待的操作发生,当等待事件发生的时候可以做其他事情,当有信道出现感兴趣的操作,则该信道就进入就绪状态。

      Slector的select方法阻塞等待又没有就绪的通道,当出现就绪的信道或者等待超时返回,就绪信道的个数,若等待超时则返回-1,selectedKeys方法返回就绪的信道。

   下面附代码

这是处理感兴趣信道的接口,因为他可以放在多个服务器上所以把他做成了接口。

[java] view plain copy

  1. package Nio;  
  2.   
  3. import java.io.IOException;  
  4. import java.nio.channels.SelectionKey;  
  5.   
  6. public interface TCPProtocol {  
  7.     void handleAccept(SelectionKey key) throws IOException;  
  8.     void handleRead(SelectionKey key) throws IOException;  
  9.     void handleWrite(SelectionKey key) throws IOException;  
  10. }  

[java] view plain copy

  1. <span style="font-family:FangSong_GB2312;">这是对上面接口的具体实现</span>  
  2. package Nio;  
  3.   
  4. import java.io.IOException;  
  5. import java.nio.ByteBuffer;  
  6. import java.nio.channels.SelectionKey;  
  7. import java.nio.channels.ServerSocketChannel;  
  8. import java.nio.channels.SocketChannel;  
  9.   
  10. public class SelectorProtocol implements TCPProtocol {  
  11.     private int bufSize ;  
  12.     public SelectorProtocol(int buffsize){  
  13.         this.bufSize = buffsize;  
  14.     }  
  15.   
  16.      //服务端信道已经准备好了接收新的客户端连接    
  17.     public void handleAccept(SelectionKey key) throws IOException {    
  18.         SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();    
  19.         clntChan.configureBlocking(false);    
  20.         //将选择器注册到连接到的客户端信道,并指定该信道key值的属性为OP_READ,同时为该信道指定关联的附件    
  21.         clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));    
  22.     }    
  23.     
  24.     //客户端信道已经准备好了从信道中读取数据到缓冲区    
  25.     public void handleRead(SelectionKey key) throws IOException{    
  26.         SocketChannel clntChan = (SocketChannel) key.channel();    
  27.         //获取该信道所关联的附件,这里为缓冲区    
  28.         ByteBuffer buf = (ByteBuffer) key.attachment();    
  29.         long bytesRead = clntChan.read(buf);    
  30.         //如果read()方法返回-1,说明客户端关闭了连接,那么客户端已经接收到了与自己发送字节数相等的数据,可以安全地关闭    
  31.         if (bytesRead == -1){     
  32.             clntChan.close();    
  33.         }else if(bytesRead > 0){    
  34.         //如果缓冲区总读入了数据,则将该信道感兴趣的操作设置为为可读可写    
  35.         key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);    
  36.         }    
  37.     }    
  38.         
  39.     //客户端信道已经准备好了将数据从缓冲区写入信道    
  40.     public void handleWrite(SelectionKey key) throws IOException {    
  41.     //获取与该信道关联的缓冲区,里面有之前读取到的数据    
  42.     ByteBuffer buf = (ByteBuffer) key.attachment();    
  43.     //重置缓冲区,准备将数据写入信道    
  44.     buf.flip();     
  45.     SocketChannel clntChan = (SocketChannel) key.channel();    
  46.     //将数据写入到信道中    
  47.     clntChan.write(buf);    
  48.     if (!buf.hasRemaining()){     
  49.     //如果缓冲区中的数据已经全部写入了信道,则将该信道感兴趣的操作设置为可读    
  50.       key.interestOps(SelectionKey.OP_READ);    
  51.     }    
  52.     //为读入更多的数据腾出空间    
  53.     buf.compact();     
  54.   }    
  55. }  

客户端

[java] view plain copy

  1. package Nio;  
  2. import java.net.InetSocketAddress;  
  3. import java.net.SocketException;  
  4. import java.nio.ByteBuffer;  
  5. import java.nio.channels.SocketChannel;  
  6.   
  7. public class TCPEchoClientNonblocking {  
  8.     public static void main(String args[]) throws Exception{  
  9.         //第一个参数作为要连接的服务端的主机名或IP  
  10.         String server = "localhost";   
  11.         //第二个参数为要发送到服务端的字符串  
  12.         byte[] argument = "nihaopengyou".getBytes();  
  13.         //如果有第三个参数,则作为端口号,如果没有,则端口号设为7  
  14.         int servPort = 2002;  
  15.         //创建一个信道,并设为非阻塞模式  
  16.         SocketChannel clntChan = SocketChannel.open();  
  17.         clntChan.configureBlocking(false);  
  18.         //向服务端发起连接  
  19.         if (!clntChan.connect(new InetSocketAddress(server, servPort))){  
  20.             //不断地轮询连接状态,直到完成连接  
  21.             while (!clntChan.finishConnect()){  
  22.                 //在等待连接的时间里,可以执行其他任务,以充分发挥非阻塞IO的异步特性  
  23.                 //这里为了演示该方法的使用,只是一直打印"."  
  24.                 System.out.print(".");    
  25.             }  
  26.         }  
  27.         //为了与后面打印的"."区别开来,这里输出换行符  
  28.         System.out.print("\n");  
  29.         //分别实例化用来读写的缓冲区  
  30.         ByteBuffer writeBuf = ByteBuffer.wrap(argument);  
  31.         ByteBuffer readBuf = ByteBuffer.allocate(argument.length);  
  32.         //接收到的总的字节数  
  33.         int totalBytesRcvd = 0;   
  34.         //每一次调用read()方法接收到的字节数  
  35.         int bytesRcvd;   
  36.         //循环执行,直到接收到的字节数与发送的字符串的字节数相等  
  37.         while (totalBytesRcvd < argument.length){  
  38.             //如果用来向通道中写数据的缓冲区中还有剩余的字节,则继续将数据写入信道  
  39.             if (writeBuf.hasRemaining()){  
  40.                 clntChan.write(writeBuf);  
  41.             }  
  42.             //如果read()接收到-1,表明服务端关闭,抛出异常  
  43.             if ((bytesRcvd = clntChan.read(readBuf)) == -1){  
  44.                 throw new SocketException("Connection closed prematurely");  
  45.             }  
  46.             //计算接收到的总字节数  
  47.             totalBytesRcvd += bytesRcvd;  
  48.             //在等待通信完成的过程中,程序可以执行其他任务,以体现非阻塞IO的异步特性  
  49.             //这里为了演示该方法的使用,同样只是一直打印"."  
  50.             System.out.print(".");   
  51.         }  
  52.         //打印出接收到的数据  
  53.         System.out.println("Received: " +  new String(readBuf.array(), 0, totalBytesRcvd));  
  54.         //关闭信道  
  55.         clntChan.close();  
  56.     }  
  57. }  

服务器

[java] view plain copy

  1. package Nio;  
  2. import java.io.IOException;  
  3. import java.net.InetSocketAddress;  
  4. import java.nio.channels.SelectionKey;  
  5. import java.nio.channels.Selector;  
  6. import java.nio.channels.ServerSocketChannel;  
  7. import java.util.Iterator;  
  8.   
  9. public class TCPServerSelector{  
  10.     //缓冲区的长度  
  11.     private static final int BUFSIZE = 256;   
  12.     //select方法等待信道准备好的最长时间  
  13.     private static final int TIMEOUT = 3000;   
  14.     public static void main(String[] args) throws IOException {  
  15.         if (args.length < 1){  
  16.             throw new IllegalArgumentException("Parameter(s): <Port> ...");  
  17.         }  
  18.         //创建一个选择器  
  19.         Selector selector = Selector.open();  
  20.         for (String arg : args){  
  21.             //实例化一个信道  
  22.             ServerSocketChannel listnChannel = ServerSocketChannel.open();  
  23.             //将该信道绑定到指定端口  
  24.             listnChannel.socket().bind(new InetSocketAddress(Integer.parseInt(arg)));  
  25.             System.out.println("启动服务器"+Integer.parseInt(arg));  
  26.             //配置信道为非阻塞模式  
  27.             listnChannel.configureBlocking(false);  
  28.             //将选择器注册到各个信道  
  29.             listnChannel.register(selector, SelectionKey.OP_ACCEPT);  
  30.         }  
  31.         //创建一个实现了协议接口的对象  
  32.         TCPProtocol protocol = new SelectorProtocol(BUFSIZE);  
  33.         //不断轮询select方法,获取准备好的信道所关联的Key集  
  34.         while (true){  
  35.             //一直等待,直至有信道准备好了I/O操作  
  36.             if (selector.select(TIMEOUT) == 0){  
  37.                 //在等待信道准备的同时,也可以异步地执行其他任务,  
  38.                 //这里只是简单地打印"."  
  39.                 System.out.print(".");  
  40.                 continue;  
  41.             }  
  42.             //获取准备好的信道所关联的Key集合的iterator实例  
  43.             Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();  
  44.             //循环取得集合中的每个键值  
  45.             while (keyIter.hasNext()){  
  46.                 SelectionKey key = keyIter.next();   
  47.                 //如果服务端信道感兴趣的I/O操作为accept  
  48.                 if (key.isAcceptable()){  
  49.                     protocol.handleAccept(key);  
  50.                 }  
  51.                 //如果客户端信道感兴趣的I/O操作为read  
  52.                 if (key.isReadable()){  
  53.                     protocol.handleRead(key);  
  54.                 }  
  55.                 //如果该键值有效,并且其对应的客户端信道感兴趣的I/O操作为write  
  56.                 if (key.isValid() && key.isWritable()) {  
  57.                     protocol.handleWrite(key);  
  58.                 }  
  59.                 //这里需要手动从键集中移除当前的key  
  60.                 keyIter.remove();   
  61.             }  
  62.         }  
  63.     }  
  64. }  

运行结果:

时间: 2024-11-23 04:20:09

java nio 操作(2)异步阻塞 socket实现的相关文章

java的nio之:java的bio流下实现的socket服务器同步阻塞模型和socket的伪异步的socket服务器的通信模型

同步I/O模型的弊端===>每一个线程的创建都会消耗服务端内存,当大量请求进来,会耗尽内存,导致服务宕机 伪异步I/O的弊端分析===>当对Socket的输入流进行读取操作的时候,它会一直阻塞下去,知道发生如下三件事情(1)有数据可读(2)可用数据已经读取完毕(3)发生空指针或者I/O异常===>这意味着当对方发送请求或应答消息比较缓慢,或者网络传输比较慢时候,读取输入流的一方的通信线程将被长时间阻塞.在阻塞期间,其他接入的消息只能在消息队列中排队.===>伪异步I/O实际上仅仅只

java nio socket 异步接收数据

问题描述 nio socket 异步接收数据,如何确定收接的数据,就是发送的返回的?有谁研究过没. 问题补充:如果不用id,nio客户端是否可以同步接收数据呢?是把socketChannel.configureBlocking(true)吗? 解决方案 在消息中增加UUID,在调用端记录UUID,并将UUID和消息一起发送到服务端,服务端的回传消息附件上UUID,调用端根据收到的消息包含的UUID确定给哪个调用者.解决方案二:异步调用的常见问题.因为没有办法实施返回,只有打标志了.要你把通信协议

java nio 关闭客户端 服务器端的selector.select(TimeOut)方法不阻塞

问题描述 java nio 关闭客户端 服务器端的selector.select(TimeOut)方法不阻塞 代码如下: // 反复循环,等待IO while (true) { // 等待某信道就绪(或超时) if (selector.select(TimeOut) == 0) {// 监听注册通道,当其中有注册的 IO // 操作可以进行时,该函数返回,并将对应的 // SelectionKey 加入 selected-key // set log.info("服务器独自等待.");

《Java NIO文档》非阻塞式服务器

即使你知道Java NIO 非阻塞的工作特性(如Selector,Channel,Buffer等组件),但是想要设计一个非阻塞的服务器仍然是一件很困难的事.非阻塞式服务器相较于阻塞式来说要多上许多挑战.本文将会讨论非阻塞式服务器的主要几个难题,并针对这些难题给出一些可能的解决方案. 查找关于非阻塞式服务器设计方面的资料实在不太容易,所以本文提供的解决方案都是基于本人工作和想法上的.如果各位有其他的替代方案或者更好的想法,我会很乐意听取这些方案和想法!你可以在文章下方留下你的评论,或者发邮件给我(

关于java阻塞socket和非阻塞socket的应用区别

问题描述 关于java阻塞socket和非阻塞socket的应用区别 最近在学习NIO,在学习非阻塞Socket的时候 很困惑,不知道他相对于阻塞的Socket的优势 在哪,希望大神指点一二,在线等. 解决方案 阻塞就是一直等待结果返回,非阻塞就是立即返回,等有了结果了以后,再回调,事件通知你 解决方案二: 传统的阻塞式,每个连接必须要开一个线程来处理,并且没处理完线程不能退出. 非阻塞式,由于基于反应器模式,用于事件多路分离和分派的体系结构模式,所以可以利用线程池来处理.事件来了就处理,处理完

java nio 如何实现 阻塞读 不阻塞写

问题描述 java nio 如何实现 阻塞读 不阻塞写 java nio 如何实现 阻塞读 不阻塞写java nio 如何实现 阻塞读 不阻塞写 解决方案 java NIO 及 阻塞和非阻塞IO 解决方案二: 用selector可以实现不

java NIO

这次是关于java nio,有一些重复的发的地方.本文中的源代码可以在此处下载,下载链接为:http://115.com/file/cltlj10i#nio-src.zip 本文简介:  JDK 1.4 中引入的新输入输出 (NIO) 库在标准 Java 代码中提供了高速的.面向块的 I/O.本实用教程从高级概念到底层的编程细节,非常详细地介绍了 NIO 库.您将学到诸如缓冲区和通道这样的关键 I/O 元素的知识,并考察更新后的库中的标准 I/O 是如何工作的.您还将了解只能通过 NIO 来完成

理解Java NIO

基础概念 • 缓冲区操作 缓冲区及操作是所有I/O的基础,进程执行I/O操作,归结起来就是向操作系统发出请求,让它要么把缓冲区里的数据排干(写),要么把缓冲区填满(读).如下图 • 内核空间.用户空间  上图简单描述了数据从磁盘到用户进程的内存区域移动的过程,其间涉及到了内核空间与用户空间.这两个空间有什么区别呢?  用户空间就是常规进程(如JVM)所在区域,用户空间是非特权区域,如不能直接访问硬件设备.内核空间是操作系统所在区域,那肯定是有特权啦,如能与设备控制器通讯,控制用户区域的进程运行状

Java NIO 系列教程

Java NIO(New IO)是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API.本系列教程将有助于你学习和理解Java NIO.感谢并发编程网的翻译和投递.  (关注ITeye官微,随时随地查看最新开发资讯.技术文章.)  [本文转载于 Java NIO 系列教程] Java NIO提供了与标准IO不同的IO工作方式:  Channels and Buffers(通道和缓冲区):标准的IO基于字节流和字符流进行操作的,而NIO是基于通道(Channe