netty 事件驱动(一)

本篇文章着重于浅析一下Netty的事件处理流程,Netty版本为netty-3.6.6.Final。

Netty定义了非常丰富的事件类型,代表了网络交互的各个阶段。并且当各个阶段发生时,触发相应的事件交给pipeline中定义的handler处理。

举个例子,如下一段简单的代码:

ChannelFactory factory =
            new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
        ServerBootstrap bootstrap = new ServerBootstrap(factory);

        bootstrap.setPipelineFactory(new PipelineFactory());

        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        bootstrap.bind(new InetSocketAddress(7080));

Netty中触发事件几乎都是靠Channels类中的几个静态fire函数,因此通过在这些函数中加上Sysout方法,就可以看出这一个简单的bind方法触发了多少事件,如下:

fireChannelOpen(final Channel channel) upstream
bind(final Channel channel, final SocketAddress localAddress) downstream
fireChannelBound(final Channel channel, final SocketAddress localAddress) upstream

由此可见由这几个函数触发了OPEN、BOUND和BIND事件。

 

Netty中的事件大致可以分为upstream事件和downstream事件。简单的说,upstream事件是内获取外资源时触发的事件如messageReceived等等,而downstream事件则是内向外发送资源时触发的事件如write、connect等等。

与之相对应的,处理upstream事件的是upstreamhandler,处理downstream事件的是downstreamhandler,也有可以处理两类事件的channelhandler。我们可以通过继承handler来实现自己的业务逻辑。

Upstream事件的典型是messageReceived,在Netty中抽象为MessageEvent,即接收到了消息。而downstream事件的典型是write,在Netty中也抽象为MessageEvent,即发送消息。一个比较完整的事件表如下:

upstream事件包括:

downstream事件包括

 

Netty通过pipeline来存放upstreamhandler和downstreamhandler,在pipeline中添加handler的源代码如下:

public class PipelineFactory implements ChannelPipelineFactory
{
    public ChannelPipeline getPipeline()
        throws Exception
    {
        ChannelPipeline pipeline = Channels.pipeline();

        //并不具体处理事件,只是输出相关事件的string
        pipeline.addLast("1", new UpStreamHandler1());
        //单纯的丢弃事件
        pipeline.addLast("2", new DiscardServer());
        return pipeline;
    }
}

在Netty中,upstreamhandler的处理顺序是从前向后,而downstreamhandler的顺序是从后往前。根本原因是pipeline中维护了一个双向链表,handler的处理顺序不同是因为upstream是从head->tail遍历,而downstream事件是从tail->head遍历。

以DefaultChannelPipeline为例,以下分别是添加handler至链表的代码和访问upstreamhandler的代码

    public synchronized void addLast(String name, ChannelHandler handler) {
        if (name2ctx.isEmpty()) {
            init(name, handler);
        } else {
            checkDuplicateName(name);
            //一段典型的插入到链表尾部并更新尾指针的代码
            DefaultChannelHandlerContext oldTail = tail;
            DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);

            callBeforeAdd(newTail);

            oldTail.next = newTail;
            tail = newTail;
            name2ctx.put(name, newTail);

            callAfterAdd(newTail);
        }
    }

    public void sendUpstream(ChannelEvent e) {
        //从头部开始遍历,相对的是,downstream则是从尾部开始遍历
        DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
        if (head == null) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "The pipeline contains no upstream handlers; discarding: " + e);
            }

            return;
        }

        sendUpstream(head, e);
    }

 

前文已经说过,Netty中通过Channels中的静态方法来触发事件,这些静态函数列举如下:

 1.fireChannelOpen;2.fireChannelBound;3.fireChannelConnected等等。

直接来看fireChannelOpen的源码,看看Netty到底是怎么做的。

    public static void fireChannelOpen(final Channel channel) {
        // Notify the parent handler.
        if (channel.getParent() != null) {
            fireChildChannelStateChanged(channel.getParent(), channel);
        }
        channel.getPipeline().sendUpstream(
                new UpstreamChannelStateEvent(
                        channel, ChannelState.OPEN, Boolean.TRUE));
    }

这个sendUpstream到底是干嘛的了?

    void sendUpstream(final DefaultChannelHandlerContext ctx, final ChannelEvent e) {
        try {
            //从链表头部开始,取出每个节点中的handler直接对channelevent进行处理
            ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
        } catch (Throwable t) {
            notifyHandlerException(e, t);
        }
    }

然后具体到handler又是怎么处理各个事件的了?以SimpleChannelUpstreamHandler为例,如下:

    public void handleUpstream(
            final ChannelHandlerContext ctx, final ChannelEvent e) throws Exception {
        //根据事件类型进行不同的处理
        if (e instanceof MessageEvent) {
            messageReceived(ctx, (MessageEvent) e);
        } else if (e instanceof WriteCompletionEvent) {
            WriteCompletionEvent evt = (WriteCompletionEvent) e;
            writeComplete(ctx, evt);
        } else if (e instanceof ChildChannelStateEvent) {
            ......
        }
    }

    public void messageReceived(
            final ChannelHandlerContext ctx, final MessageEvent e) throws Exception {
        //直接将事件传至下一个handler进行处理
        ctx.sendUpstream(e);
    }

源码看到现在已经很明显了,在Netty里,pipeline中维护了一个handler的链表。每当事件触发时,就会从双向链表的头部(对于downstream事件则是尾部)开始遍历,这样每个handler都会对事件进行处理。在handler里,可以根据事件类型做相应的处理后传至下一个handler继续处理(甚至可以截断处理链)。

需要注意的是,单次流程是在一个线程中实现的,是串行的。因此如果其中一个handler是阻塞的,就会影响整体的效果。

时间: 2024-10-12 05:06:11

netty 事件驱动(一)的相关文章

netty 事件驱动(二)

上一篇文件浅析了Netty中的事件驱动过程,这篇主要写一下异步相关的东东. 首先,什么是异步了? 异步的概念和同步相对.当一个异步过程调用发出后,调用者不能立刻得到结果.实际处理这个调用的部件在完成后,通过状态.通知和回调来通知调用者. 异步的好处是不会造成阻塞,在高并发情形下会更稳定和更高的吞吐量.   说到Netty中的异步,就不得不提ChannelFuture.Netty中的IO操作是异步的,包括bind.write.connect等操作会简单的返回一个ChannelFuture,调用者并

《Netty实战》Netty In Action中文版 第1章——Netty——异步和事件驱动(一)

<Netty实战>样章由人民邮电出版社授权并发编程网发布,本书的中文版已经由人民邮电出版社引进并出版. 京东预售链接(优先发货):<Netty实战>([美]诺曼·毛瑞尔(Norman Maurer),马文·艾伦·沃尔夫泰尔(Marvin Allen Wolfthal)) 第一部分 Netty的概念及体系结构 Netty是一款用于创建高性能网络应用程序的高级框架.在第一部分,我们将深入地探究它的能力,并且在3个主要的方面进行示例: 使用Netty构建应用程序,你不必是一名网络编程专家

Netty4详解三:Netty架构设计

     读完这一章,我们基本上可以了解到Netty所有重要的组件,对Netty有一个全面的认识,这对下一步深入学习Netty是十分重要的,而学完这一章,我们其实已经可以用Netty解决一些常规的问题了. 一.先纵览一下Netty,看看Netty都有哪些组件?      为了更好的理解和进一步深入Netty,我们先总体认识一下Netty用到的组件及它们在整个Netty架构中是怎么协调工作的.Netty应用中必不可少的组件: Bootstrap or ServerBootstrap EventLo

Netty源码解读(一)概述

感谢网友[黄亿华]投递本稿. Netty和Mina是Java世界非常知名的通讯框架.它们都出自同一个作者,Mina诞生略早,属于Apache基金会,而Netty开始在Jboss名下,后来出来自立门户netty.io.关于Mina已有@FrankHui的Mina系列文章,我正好最近也要做一些网络方面的开发,就研究一下Netty的源码,顺便分享出来了. Netty目前有两个分支:4.x和3.x.4.0分支重写了很多东西,并对项目进行了分包,规模比较庞大,入手会困难一些,而3.x版本则已经被广泛使用.

Netty------对于Netty的十一个疑问

1.Netty 是什么? Netty 是一个基于 JAVA NIO 类库的异步通信框架,它的架构特点是:异步非阻塞.基于事件驱动.高性能.高可靠性和高可定制性. 2.使用 Netty 能够做什么? 开发异步.非阻塞的 TCP 网络应用程序: 开发异步.非阻塞的 UDP 网络应用程序: 开发异步文件传输应用程序: 开发异步 HTTP 服务端和客户端应用程序: 提供对多种编解码框架的集成,包括谷歌的 Protobuf.Jbossmarshalling.Java 序列化.压缩编解码.XML 解码.字符

对于Netty的十一个疑问(转)

阻塞"与"非阻塞"与"同步"与"异步"不能简单的从字面理解,提供一个从分布式系统角度的回答. 1.同步与异步(描述服务器反馈给客户端的策略) 同步和异步关注的是 消息通信机制 (synchronous communication/ asynchronous communication).所谓同步,就是在发出一个*调用*时,在没有得到结果之前,该*调用*就不返回.但是一旦调用返回,就得到返回值了.换句话说,就是由*调用者*主动等待这个*调

Netty 5用户指南

原文地址:http://netty.io/wiki/user-guide-for-5.x.html    译者:光辉勇士      校对:郭蕾 前言 问题 现如今我们使用通用的应用程序或者类库来实现系统之间地互相访问,比如我们经常使用一个HTTP客户端来从web服务器上获取信息,或者通过web service来执行一个远程的调用. 然而,有时候一个通用的协议和他的实现并没有覆盖一些场景.比如我们无法使用一个通用的HTTP服务器来处理大文件.电子邮件.近实时消息比如财务信息和多人游戏数据.我们需要

Netty线程模型详解

1. 背景 1.1. Java线程模型的演进 1.1.1. 单线程 时间回到十几年前,那时主流的CPU都还是单核(除了商用高性能的小机),CPU的核心频率是机器最重要的指标之一. 在Java领域当时比较流行的是单线程编程,对于CPU密集型的应用程序而言,频繁的通过多线程进行协作和抢占时间片反而会降低性能. 1.1.2. 多线程 随着硬件性能的提升,CPU的核数越来越越多,很多服务器标配已经达到32或64核.通过多线程并发编程,可以充分利用多核CPU的处理能力,提升系统的处理效率和并发性能. 从2

《Netty 权威指南》—— NIO创建的TimeClient源码分析

声明:本文是<Netty 权威指南>的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文. 我们首先还是看下如何对TimeClient进行改造: public class TimeClient { /** * @param args */ public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port =