《Netty 权威指南》—— AIO创建的TimeClient源码分析

声明:本文是《Netty 权威指南》的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文。

异步非阻塞IO版本的时间服务器服务端已经介绍完毕,下面我们继续看客户端的实现。

首先看下客户端主函数的实现,AIO时间服务器客户端  TimeClient:

01 public class TimeClient {
02  
03     /**
04      * @param args
05      */
06     public static void main(String[] args) {
07     int port = 8080;
08     if (args != null && args.length > 0) {
09         try {
10         port = Integer.valueOf(args[0]);
11         } catch (NumberFormatException e) {
12         // 采用默认值
13         }
14     }
15     new Thread(new AsyncTimeClientHandler("127.0.0.1", port),
16         "AIO-AsyncTimeClientHandler-001").start();
17  
18     }
19 }

第15行我们通过一个独立的IO线程创建异步时间服务器客户端handler,在实际项目中,我们不需要独立的线程创建异步连接对象,因为底层都是通过JDK的系统回调实现的,在后面运行时间服务器程序的时候,我们会抓取线程调用堆栈给大家展示。继续看代码, AsyncTimeClientHandler的实现类源码如下:

001 public class AsyncTimeClientHandler implements
002     CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {
003  
004     private AsynchronousSocketChannel client;
005     private String host;
006     private int port;
007     private CountDownLatch latch;
008  
009     public AsyncTimeClientHandler(String host, int port) {
010     this.host = host;
011     this.port = port;
012     try {
013         client = AsynchronousSocketChannel.open();
014     } catch (IOException e) {
015         e.printStackTrace();
016     }
017     }
018  
019     @Override
020     public void run() {
021     latch = new CountDownLatch(1);
022     client.connect(new InetSocketAddress(host, port), this, this);
023     try {
024         latch.await();
025     } catch (InterruptedException e1) {
026         e1.printStackTrace();
027     }
028     try {
029         client.close();
030     } catch (IOException e) {
031         e.printStackTrace();
032     }
033     }
034  
035     @Override
036     public void completed(Void result, AsyncTimeClientHandler attachment) {
037     byte[] req = "QUERY TIME ORDER".getBytes();
038     ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
039     writeBuffer.put(req);
040     writeBuffer.flip();
041     client.write(writeBuffer, writeBuffer,
042         new CompletionHandler<Integer, ByteBuffer>() {
043             @Override
044             public void completed(Integer result, ByteBuffer buffer) {
045             if (buffer.hasRemaining()) {
046                 client.write(buffer, buffer, this);
047             } else {
048                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
049                 client.read(
050                     readBuffer,
051                     readBuffer,
052                     new CompletionHandler<Integer, ByteBuffer>() {
053                     @Override
054                     public void completed(Integer result,
055                         ByteBuffer buffer) {
056                         buffer.flip();
057                         byte[] bytes = new byte[buffer
058                             .remaining()];
059                         buffer.get(bytes);
060                         String body;
061                         try {
062                         body = new String(bytes,
063                             "UTF-8");
064                         System.out.println("Now is : "
065                             + body);
066                         latch.countDown();
067                         } catch (UnsupportedEncodingException e) {
068                         e.printStackTrace();
069                         }
070                     }
071  
072                     @Override
073                     public void failed(Throwable exc,
074                         ByteBuffer attachment) {
075                         try {
076                         client.close();
077                         latch.countDown();
078                         } catch (IOException e) {
079                         // ingnore on close
080                         }
081                     }
082                     });
083             }
084             }
085  
086             @Override
087             public void failed(Throwable exc, ByteBuffer attachment) {
088             try {
089                 client.close();
090                 latch.countDown();
091             } catch (IOException e) {
092                 // ingnore on close
093             }
094             }
095         });
096     }
097  
098     @Override
099     public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
100     exc.printStackTrace();
101     try {
102         client.close();
103         latch.countDown();
104     } catch (IOException e) {
105         e.printStackTrace();
106     }
107     }
108 }

由于在AsyncTimeClientHandler中大量使用了内部匿名类,所以代码看起来稍微有些复杂,下面我们就对主要代码进行详细解说。

9-17行是构造方法,首先通过AsynchronousSocketChannel的open方法创建一个新的AsynchronousSocketChannel对象。然后跳到第36行,创建CountDownLatch进行等待,防止异步操作没有执行完成线程就退出。第37行通过connect方法发起异步操作,它有两个参数,分别如下:

1)      A attachment : AsynchronousSocketChannel的附件,用于回调通知时作为入参被传递,调用者可以自定义;

2)      CompletionHandler<Void,? super A> handler:异步操作回调通知接口,由调用者实现。

在本例程中,我们的两个参数都使用AsyncTimeClientHandler类本身,因为它实现了CompletionHandler接口。

接下来我们看异步连接成功之后的方法回调completed方法,代码第39行,我们创建请求消息体,对其进行编码,然后拷贝到发送缓冲区writeBuffer中,调用AsynchronousSocketChannel的write方法进行异步写,与服务端类似,我们可以实现CompletionHandler<Integer, ByteBuffer>接口用于写操作完成后的回调,代码第45-47行,如果发送缓冲区中仍有尚未发送的字节,我们继续异步发送,如果已经发送完成,则执行异步读取操作。

代码第64-97行是客户端异步读取时间服务器服务端应答消息的处理逻辑,代码第49行我们调用AsynchronousSocketChannel的read方法异步读取服务端的响应消息,由于read操作是异步的,所以我们通过内部匿名类实现CompletionHandler<Integer, ByteBuffer>接口,当读取完成被JDK回调时,我们构造应答消息。第56-63行我们从CompletionHandler的ByteBuffer中读取应答消息,然后打印结果。

第197-96行,当读取发生异常时,我们关闭链路,同时调用CountDownLatch的countDown方法让AsyncTimeClientHandler线程执行完毕,客户端退出执行。

需要指出的是,正如之前的NIO例程,我们并没有完整的处理网络的半包读写,当对例程进行功能测试的时候没有问题,但是,如果对代码稍加改造,进行压力或者性能测试,就会发现输出结果存在问题。

由于半包的读写会作为专门的小节在Netty的应用和源码分析章节进行详细讲解,在NIO的入门章节我们就不详细展开介绍,以便读者能够将注意力集中在NIO的入门知识上来。

下面的小节我们运行AIO版本的时间服务器程序,并通过打印线程堆栈的方式看下JDK回调异步Channel CompletionHandler的调用情况。 

时间: 2024-09-20 19:57:09

《Netty 权威指南》—— AIO创建的TimeClient源码分析的相关文章

《Netty 权威指南》—— AIO 创建的TimeServer源码分析

声明:本文是<Netty 权威指南>的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文. NIO2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现.异步通道提供两种方式获取获取操作结果: 通过java.util.concurrent.Future类来表示异步操作的结果: 在执行异步操作的时候传入一个java.nio.channels. CompletionHandler接口的实现类作为操作完成的回调. NIO2.0的异步套接字通道是真正的异步非阻塞IO

《Netty 权威指南》—— NIO创建的TimeClient源码分析

声明:本文是<Netty 权威指南>的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文. 我们首先还是看下如何对TimeClient进行改造: public class TimeClient { /** * @param args */ public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port =

《Netty 权威指南》—— NIO创建的TimeServer源码分析

声明:本文是<Netty 权威指南>的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文. 我们将在TimeServer例程中给出完整的NIO创建的时间服务器源码: public class TimeServer { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if (args !=

《Netty 权威指南》—— 4种IO的对比

声明:本文是<Netty 权威指南>的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文. 2.5.1.概念澄清 为了防止由于对一些技术概念和术语的理解或者叫法不一致引起歧义,本小节特意对本书中的专业术语或者技术用语做下声明,如果它们与其它的一些技术书籍术语不一致,请以本小节的解释为准. 2.5.1.1. 异步非阻塞IO 很多人喜欢将JDK1.4提供的NIO框架称为异步非阻塞IO,但是,如果严格按照Unix网络编程模型和JDK的实现进行区分,实际上它只能被称为非阻塞IO,不能叫

《Netty权威指南》目录

<Netty权威指南>是全球第二本.中国第一本Netty教材,它由华为平台中间件资深架构设计师李林锋撰写,作者有6年多的NIO设计和开发实战经验,多次受邀进行Netty和 NIO编程培训. 本书基于最新的Netty5.0 版本撰写,从Netty开发环境的搭建,到第一个基于Netty的NIO服务端和客户端程序的开发,一步步的让读者从入门到精通,熟练的掌握基于Netty 的NIO开发,理解Netty的架构设计原理,可以对Netty进行深度的定制设计和开发. 本书共分为五部分:第一部分介绍 JAVA

《Netty 权威指南》样章

声明:本文是<Netty 权威指南>的样章目录,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文. 第 2 章  NIO入门 在本章节,我们分别对JDK的BIO.NIO和JDK1.7最新提供的NIO2.0的使用进行详细说明,通过流程图和代码讲解,让大家体会到随着Java IO类库的不断发展和改进,基于Java的网络编程会变得越来越简单,随着异步IO功能的增强,基于Java NIO开发的网络服务器甚至不逊色于采用C++开发的网络程序. 本章主要内容包括:  传统的同步阻塞式IO编程

《Netty 权威指南》—— 传统的BIO编程

声明:本文是<Netty 权威指南>的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文. 网络编程的基本模型是Client/Server模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定的IP地址和监听端口),客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接,如果连接建立成功,双方就可以通过网络套接字(Socket)进行通信. 在基于传统同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口,Socket负责发起连接操

《Netty 权威指南》—— 伪异步IO编程

声明:本文是<Netty 权威指南>的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文. 为了解决同步阻塞IO面临的一个链路需要一个线程处理的问题,后来有人对它的线程模型进行了优化,后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N的比例关系,其中M可以远远大于N,通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽. 下面,我们结合连接模型图和源码,对伪异步IO进行分析,看它是否能够解决同步阻塞IO面临的问题.

Hadoop2源码分析-准备篇

1.概述 我们已经能够搭建一个高可用的Hadoop平台了,也熟悉并掌握了一个项目在Hadoop平台下的开发流程,基于Hadoop的一些套件我们也能够使用,并且能利用这些套件进行一些任务的开发.在Hadoop的应用级别上,我们接着往后面去研究学习,那就是Hadoop的源码了,作为Hadoop开发人员,我们得去学习和研究Hadoop得实现原理,底层框架的设计,编码的实现过程等等,下面就开始我们今天的Hadoop源码分析之旅. 2.准备 在分析源码之前,我们需要准备好分析源码的环境,以及如何去分析(分