Netty 5用户指南(二)

Time客户端

不像DISCARD和ECHO的服务端,对于TIME协议我们需要一个客户端因为人们不能把一个32位的二进制数据翻译成一个日期或者日历。在这一部分,我们将会讨论如何确保服务端是正常工作的,并且学习怎样用Netty编写一个客户端。

在Netty中,编写服务端和客户端最大的并且唯一不同的使用了不同的BootStrapChannel的实现。请看一下下面的代码:

01 package io.netty.example.time;
02  
03 public class TimeClient {
04     public static void main(String[] args) throws Exception {
05         String host = args[0];
06         int port = Integer.parseInt(args[1]);
07         EventLoopGroup workerGroup = new NioEventLoopGroup();
08  
09         try {
10             Bootstrap b = new Bootstrap(); // (1)
11             b.group(workerGroup); // (2)
12             b.channel(NioSocketChannel.class); // (3)
13             b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
14             b.handler(new ChannelInitializer<SocketChannel>() {
15                 @Override
16                 public void initChannel(SocketChannel ch) throws Exception {
17                     ch.pipeline().addLast(new TimeClientHandler());
18                 }
19             });
20  
21             // Start the client.
22             ChannelFuture f = b.connect(host, port).sync(); // (5)
23  
24             // Wait until the connection is closed.
25             f.channel().closeFuture().sync();
26         finally {
27             workerGroup.shutdownGracefully();
28         }
29     }
30 }
  1. BootStrapServerBootstrap类似,不过他是对非服务端的channel而言,比如客户端或者无连接传输模式的channel。
  2. 如果你只指定了一个EventLoopGroup,那他就会即作为一个‘boss’线程,也会作为一个‘workder’线程,尽管客户端不需要使用到‘boss’线程。
  3. 代替NioServerSocketChannel的是NioSocketChannel,这个类在客户端channel被创建时使用。
  4. 不像在使用ServerBootstrap时需要用childOption()方法,因为客户端的SocketChannel没有父channel的概念。
  5. 我们用connect()方法代替了bind()方法。

正如你看到的,他和服务端的代码是不一样的。ChannelHandler是如何实现的?他应该从服务端接受一个32位的整数消息,把他翻译成人们能读懂的格式,并打印翻译好的时间,最后关闭连接:

01 package io.netty.example.time;
02  
03 import java.util.Date;
04  
05 public class TimeClientHandler extends ChannelHandlerAdapter {
06     @Override
07     public void channelRead(ChannelHandlerContext ctx, Object msg) {
08         ByteBuf m = (ByteBuf) msg; // (1)
09         try {
10             long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
11             System.out.println(new Date(currentTimeMillis));
12             ctx.close();
13         finally {
14             m.release();
15         }
16     }
17  
18     @Override
19     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
20         cause.printStackTrace();
21         ctx.close();
22     }
23 }
  1. 在TCP/IP中,NETTY会把读到的数据放到ByteBuf的数据结构中。

这样看起来非常简单,并且和服务端的那个例子的代码也相差不多。然而,处理器有时候会因为抛出IndexOutOfBoundsException而拒绝工作。在下个部分我们会讨论为什么会发生这种情况。

流数据的传输处理

一个小的Socket Buffer问题

在基于流的传输里比如TCP/IP,接收到的数据会先被存储到一个socket接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。即使你发送了2个独立的数据包,操作系统也不会作为2个消息处理而仅仅是作为一连串的字节而言。因此这是不能保证你远程写入的数据就会准确地读取。举个例子,让我们假设操作系统的TCP/TP协议栈已经接收了3个数据包:

由于基于流传输的协议的这种普通的性质,在你的应用程序里读取数据的时候会有很高的可能性被分成下面的片段。

因此,一个接收方不管他是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更有意思并且能够让程序的业务逻辑更好理解的数据。在上面的例子中,接收到的数据应该被构造成下面的格式:

第一个解决方案

现在让我们回到TIME客户端的例子上。这里我们遇到了同样的问题,一个32字节数据是非常小的数据量,他并不见得会被经常拆分到到不同的数据段内。然而,问题是他确实可能会被拆分到不同的数据段内,并且拆分的可能性会随着通信量的增加而增加。

最简单的方案是构造一个内部的可积累的缓冲,直到4个字节全部接收到了内部缓冲。下面的代码修改了TimeClientHandler的实现类修复了这个问题

01 package io.netty.example.time;
02  
03 import java.util.Date;
04  
05 public class TimeClientHandler extends ChannelHandlerAdapter {
06     private ByteBuf buf;
07  
08     @Override
09     public void handlerAdded(ChannelHandlerContext ctx) {
10         buf = ctx.alloc().buffer(4); // (1)
11     }
12  
13     @Override
14     public void handlerRemoved(ChannelHandlerContext ctx) {
15         buf.release(); // (1)
16         buf = null;
17     }
18  
19     @Override
20     public void channelRead(ChannelHandlerContext ctx, Object msg) {
21         ByteBuf m = (ByteBuf) msg;
22         buf.writeBytes(m); // (2)
23         m.release();
24  
25         if (buf.readableBytes() >= 4) { // (3)
26             long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L;
27             System.out.println(new Date(currentTimeMillis));
28             ctx.close();
29         }
30     }
31  
32     @Override
33     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
34         cause.printStackTrace();
35         ctx.close();
36     }
37 }
  1. ChannelHandler有2个生命周期的监听方法:handlerAdded()和handlerRemoved()。你可以完成任意初始化任务只要他不会被阻塞很长的时间。
  2. 首先,所有接收的数据都应该被累积在buf变量里。
  3. 然后,处理器必须检查buf变量是否有足够的数据,在这个例子中是4个字节,然后处理实际的业务逻辑。否则,Netty会重复调用channelRead()当有更多数据到达直到4个字节的数据被积累。

第二个解决方案

尽管第一个解决方案已经解决了Time客户端的问题了,但是修改后的处理器看起来不那么的简洁,想象一下如果由多个字段比如可变长度的字段组成的更为复杂的协议时,你的ChannelHandler的实现将很快地变得难以维护。

正如你所知的,你可以增加多个ChannelHandlerChannelPipeline ,因此你可以把一整个ChannelHandler拆分成多个模块以减少应用的复杂程度,比如你可以把TimeClientHandler拆分成2个处理器:

  • TimeDecoder处理数据拆分的问题
  • TimeClientHandler原始版本的实现

幸运地是,Netty提供了一个可扩展的类,帮你完成TimeDecoder的开发。

01 package io.netty.example.time;
02  
03 public class TimeDecoder extends ByteToMessageDecoder { // (1)
04     @Override
05     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
06         if (in.readableBytes() < 4) {
07             return// (3)
08         }
09  
10         out.add(in.readBytes(4)); // (4)
11     }
12 }
  1. ByteToMessageDecoderChannelHandler的一个实现类,他可以在处理数据拆分的问题上变得很简单。
  2. 每当有新数据接收的时候,ByteToMessageDecoder都会调用decode()方法来处理内部的那个累积缓冲。
  3. Decode()方法可以决定当累积缓冲里没有足够数据时可以往out对象里放任意数据。当有更多的数据被接收了ByteToMessageDecoder会再一次调用decode()方法。
  4. 如果在decode()方法里增加了一个对象到out对象里,这意味着解码器解码消息成功。ByteToMessageDecoder将会丢弃在累积缓冲里已经被读过的数据。请记得你不需要对多条消息调用decode(),ByteToMessageDecoder会持续调用decode()直到不放任何数据到out里。

现在我们有另外一个处理器插入到ChannelPipeline里,我们应该在TimeClient里修改ChannelInitializer 的实现:

1 b.handler(new ChannelInitializer<SocketChannel>() {
2     @Override
3     public void initChannel(SocketChannel ch) throws Exception {
4         ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
5     }
6 });

如果你是一个大胆的人,你可能会尝试使用更简单的解码类ReplayingDecoder。不过你还是需要参考一下API文档来获取更多的信息。

1 public class TimeDecoder extends ReplayingDecoder {
2 @Override
3 protected void decode(
4 ChannelHandlerContext ctx, ByteBuf in, List<object width="300" height="150">out) {out.add(in.readBytes(4));}}

此外,Netty还提供了更多可以直接拿来用的解码器使你可以更简单地实现更多的协议,帮助你避免开发一个难以维护的处理器实现。请参考下面的包以获取更多更详细的例子:

用POJO代替ByteBuf

我们已经讨论了所有的例子,到目前为止一个消息的消息都是使用ByteBuf作为一个基本的数据结构。在这一部分,我们会改进TIME协议的客户端和服务端的例子,用POJO替代ByteBuf。在你的ChannelHandlerS中使用POJO优势是比较明显的。通过从ChannelHandler中提取出ByteBuf的代码,将会使ChannelHandler的实现变得更加可维护和可重用。在TIME客户端和服务端的例子中,我们读取的仅仅是一个32位的整形数据,直接使用ByteBuf不会是一个主要的问题。然后,你会发现当你需要实现一个真实的协议,分离代码变得非常的必要。首先,让我们定义一个新的类型叫做UnixTime。

01 package io.netty.example.time;
02  
03 import java.util.Date;
04  
05 public class UnixTime {
06  
07     private final int value;
08  
09     public UnixTime() {
10         this((int) (System.currentTimeMillis() / 1000L + 2208988800L));
11     }
12  
13     public UnixTime(int value) {
14         this.value = value;
15     }
16  
17     public int value() {
18         return value;
19     }
20  
21     @Override
22     public String toString() {
23         return new Date((value() - 2208988800L) * 1000L).toString();
24     }
25 }

现在我们可以修改下TimeDecoder类,返回一个UnixTime,以替代ByteBuf

1 @Override
2 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
3     if (in.readableBytes() < 4) {
4         return;
5     }
6  
7     out.add(new UnixTime(in.readInt()));
8 }

下面是修改后的解码器,TimeClientHandler不再有任何的ByteBuf代码了。

1 @Override
2 public void channelRead(ChannelHandlerContext ctx, Object msg) {
3     UnixTime m = (UnixTime) msg;
4     System.out.println(m);
5     ctx.close();
6 }

是不是变得更加简单和优雅了?相同的技术可以被运用到服务端。让我们修改一下TimeServerHandler的代码。

1 @Override
2 public void channelActive(ChannelHandlerContext ctx) {
3     ChannelFuture f = ctx.writeAndFlush(new UnixTime());
4     f.addListener(ChannelFutureListener.CLOSE);
5 }

现在,仅仅需要修改的是ChannelHandler的实现,这里需要把UnixTime对象重新转化为一个ByteBuf。不过这已经是非常简单了,因为当你对一个消息编码的时候,你不需要再处理拆包和组装的过程。

01 package io.netty.example.time;
02  
03 public class TimeEncoder extends ChannelHandlerAdapter {
04     @Override
05     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
06         UnixTime m = (UnixTime) msg;
07         ByteBuf encoded = ctx.alloc().buffer(4);
08         encoded.writeInt(m.value());
09         ctx.write(encoded, promise); // (1)
10     }
11 }
  1. 在这几行代码里还有几个重要的事情。第一, 通过ChannelPromise,当编码后的数据被写到了通道上Netty可以通过这个对象标记是成功还是失败。第二, 我们不需要调用cxt.flush()。因为处理器已经单独分离出了一个方法void flush(ChannelHandlerContext cxt),如果像自己实现flush方法内容可以自行覆盖这个方法。

进一步简化操作,你可以使用MessageToByteEncode:

1 public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
2     @Override
3     protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
4         out.writeInt(msg.value());
5     }
6 }

最后的任务就是在TimeServerHandler之前把TimeEncoder插入到ChannelPipeline。但这是不那么重要的工作。

关闭你的应用

关闭一个Netty应用往往只需要简单地通过shutdownGracefully()方法来关闭你构建的所有的NioEventLoopGroupS.当EventLoopGroup被完全地终止,并且对应的所有channels都已经被关闭时,Netty会返回一个Future对象。

概述

在这一章节中,我们会快速地回顾下如果在熟练掌握Netty的情况下编写出一个健壮能运行的网络应用程序。在Netty接下去的章节中还会有更多更相信的信息。我们也鼓励你去重新复习下在io.netty.example包下的例子。请注意社区一直在等待你的问题和想法以帮助Netty的持续改进,Netty的文档也是基于你们的快速反馈上。

时间: 2024-09-11 10:00:17

Netty 5用户指南(二)的相关文章

Netty 5用户指南

原文地址:http://netty.io/wiki/user-guide-for-5.x.html    译者:光辉勇士      校对:郭蕾 前言 问题 现如今我们使用通用的应用程序或者类库来实现系统之间地互相访问,比如我们经常使用一个HTTP客户端来从web服务器上获取信息,或者通过web service来执行一个远程的调用. 然而,有时候一个通用的协议和他的实现并没有覆盖一些场景.比如我们无法使用一个通用的HTTP服务器来处理大文件.电子邮件.近实时消息比如财务信息和多人游戏数据.我们需要

Netty 5用户指南(一)

前言 问题 现如今我们使用通用的应用程序或者类库来实现系统之间地互相访问,比如我们经常使用一个HTTP客户端来从web服务器上获取信息,或者通过web service来执行一个远程的调用. 然而,有时候一个通用的协议和他的实现并没有覆盖一些场景.比如我们无法使用一个通用的HTTP服务器来处理大文件.电子邮件.近实时消息比如财务信息和多人游戏数据.我们需要一个合适的协议来处理一些特殊的场景.例如你可以实现一个优化的Ajax的聊天应用.媒体流传输或者是大文件传输的HTTP服务器,你甚至可以自己设计和

PHP用户指南-cookies部分

PHP用户指南-cookies部分 在这课教程我们将学习怎样利用 PHP 处理cookies,我将试着使事情尽可能简单地去解释cookies的一些实际应用. 什么是cookies及作用?  cookies是由web服务器产生的并且存在客户端的一些信息.它嵌在html信息中,由服务器端指定,在客户端及服务器端间传递信息 .它通常用来:用户网页个性化,计数器,储存被浏览站点的信息等. cookies和php 在 PHP中用cookies是相当容易的.可以使用setcookie函数设置一个cookie

AUR用户指南

==用途简介== AUR的全称是"ArchLinux User-community Repository",即ArchLinux社区用户的软件仓库.它是依靠社区贡献软件包.这个文档告诉普通用户们如何使用AUR. ==AUR与用户== AUR中的软件是社区用户提供的,它们没有技术支持,也不保证其安全性(除非它们被TU标记上安全标志).AUR软件包不一定会被一直维护也不一定会被及时更新.AUR完全依赖用户贡献,你可以以许多形式为AUR做贡献. ===在UNSUPPORTED中共享你的PKG

《CUDA高性能并行计算》----0.8 用户指南

本 节 书 摘 来 自 华 章 出 版 社 <CUDA高性能并行计算> 一 书 中 的 第0章,第0.8节, 作 者 CUDA for Engineers: An Introduction to High-Performance Parallel Computing[美] 杜安·斯托尔蒂(Duane Storti)梅特·尤尔托卢(Mete Yurtoglu) 著,苏统华 项文成 李松泽 姚宇鹏 孙博文 译 , 更 多 章 节 内 容 可 以 访 问 云 栖 社 区 "华 章 计 算

《Apache Velocity用户指南》官方文档

Quick Start 本项目是 Apache Velocity官方文档的中文翻译版,Velocity类似与JSP,是一种基于Java的模板引擎.它可以在web页面中引用Java代码中定义的数据和对象,而Velocity的作用就是把Web视图和java代码进行组装在一起.本次翻译主要针对对Velocity感兴趣和工作中使用到Velocity的开发人员提供有价值的中文资料,希望能够对大家的工作和学习有所帮助. 由于我也是第一次接触Velocity,还不是很深入,翻译的时候也查看了一些博客以及其他网

《Jersey用户指南》翻译邀请

7月并发网组织大家翻译<Jersey用户指南>.有兴趣的同学可以通过评论领取,翻译完成之后再并发网直接提交审核. 一次领取一篇,一篇建议领取一个章节,翻译完成之后可以继续领取,领取文章后最好在一个星期内翻译完成,如果不能完成翻译请通过评论告知,以便于其他人可以继续翻译. Table of Contents Preface 1. Getting Started 1.1. Creating a New Project from Maven Archetype 1.2. Exploring the

《UML用户指南(第2版.修订版)》—第2章2.2节UML的概念模型

2.2 UML的概念模型 UML用户指南(第2版.修订版) 为了理解UML,需要形成该语言的概念模型,这要求学习建模的3个要素:UML的基本构造块.支配这些构造块如何放在一起的规则和一些运用于整个UML的公共机制.如果掌握了这些思想,就能够读懂UML模型,并能建立一些基本模型.当有了较丰富的应用UML的经验时,就能够在这些概念模型之上使用更高深的语言特征进行构造. 2.2.1 UML的构造块 UML的词汇表包含下面3种构造块: (1)事物: (2)关系: (3)图. 事物是对模型中首要成分的抽象

《Raspberry Pi用户指南》——第1章 初识树莓派

第1章 初识树莓派 Raspberry Pi用户指南 树莓派(Raspberry Pi)主板可以说是个"微型"的奇迹,它和一张信用卡的大小差不多,却拥有非常强的计算能力.在首次利用树莓派开发出令人惊奇的应用前,读者还需要了解一些事情. 小提示 如果你想马上使用树莓派,可以略过前面几页,直接学习如何将显示器.键盘和鼠标连接到树莓派上.