jetty的NIO线程模型

概述

jetty NIO是典型reactor模型,如下图所示:

即:mainReactor负责监听server

socket,接受新连接,并将建立的socket分派给subReactor。subReactor负责多路分离已连接的socket,读写网络数据,扔给worker线程池来处理。本文主要是讲解jetty中mainReactor、subReactor、线程池的实现。

mainReactor

jetty中的server就相当于一个容器,一个jetty容器包含多个连接器和一个线程池,连接器实现了LifeCycle接口,随容器启动而启动,下图是连接器启动后,监听server socket,建立连接的过程:

可见,jetty利用了线程池来建立连接,每一个连接任务被当成一个job被放到了job队列里面,负责连接的线程会从队列中取出任务来执行,将得到的ServerSocket交给subReactor,下面来看subReactor的实现。

subReactor

这里需要提一下jetty
nio很重要的一个类SelectorManager,它负责channel注册,select,wakeup等操作。在SelectorManager中有SelectSet数组,可以把SelectSet理解为SelectorManager的代理,因为真正做事的是SelectSet,这里面SelectSet设计为一个数组,应该也是分而治之的思想,让一个selector监听更少的selectionkey。

SelectSet中有一个非常重要的成员changes,changes中存放了所有有变化的channel、endpoint、attachement。分别在以下情况触发addChannel方法:当有新的通道加入时,当有新的事件到来时,当有数据到来时。

subReactor的执行流程如下图:

在这里导致addChange除了selectorManager.register之外,还有endpoint.updatekey()以及selectionkey数据有变化时等等。

ThreadPool

jetty的线程池相当简单,其实mainReactor与subReactor共用同一个线程池,线程池的实现类是QueuedThreadPool,当然在jetty.xml中可以设置自己的线程池类。简单看下线程池的run方法

 

[java] view plain copy

 
 print?

  1. private Runnable _runnable = new Runnable()  
  2.   {  
  3.       public void run()  
  4.       {  
  5.           boolean shrink=false;  
  6.           try  
  7.           {  
  8.               Runnable job=_jobs.poll();  
  9.               while (isRunning())  
  10.               {  
  11.                   // Job loop  
  12.                   while (job!=null && isRunning())  
  13.                   {  
  14.                       runJob(job);  
  15.                       job=_jobs.poll();  
  16.                   }  
  17.   
  18.                   // Idle loop  
  19.                   try  
  20.                   {  
  21.                       _threadsIdle.incrementAndGet();  
  22.   
  23.                       while (isRunning() && job==null)  
  24.                       {  
  25.                           if (_maxIdleTimeMs<=0)  
  26.                               job=_jobs.take();  
  27.                           else  
  28.                           {  
  29.                               // maybe we should shrink?  
  30.                               final int size=_threadsStarted.get();  
  31.                               if (size>_minThreads)  
  32.                               {  
  33.                                   long last=_lastShrink.get();  
  34.                                   long now=System.currentTimeMillis();  
  35.                                   if (last==0 || (now-last)>_maxIdleTimeMs)  
  36.                                   {  
  37.                                       shrink=_lastShrink.compareAndSet(last,now) &&  
  38.                                       _threadsStarted.compareAndSet(size,size-1);  
  39.                                       if (shrink)  
  40.                                           return;  
  41.                                   }  
  42.                               }  
  43.                               job=idleJobPoll();  
  44.                           }  
  45.                       }  
  46.                   }  
  47.                   finally  
  48.                   {  
  49.                       _threadsIdle.decrementAndGet();  
  50.                   }  
  51.               }  
  52.           }  
  53.           catch(InterruptedException e)  
  54.           {  
  55.                 ...  
  56.           }  
  57.       }  
  58.   };  

1、线程池有个最小线程数_minThreads=8,当线程池启动时会创建_minThreads个线程,并启动它们。第12行,线程从任务队列中取出一个任务,并执行。这里使用了while循环表示这里会阻塞等待任务执行完,当任务队列中没有任务时,才会退出while循环;

 

2、退出while循环后,这个线程就空闲了,在这里需要有个回收策略,在等待_maxIdleTimeMs时间后,如果当前线程数大于_minThreads时,就会回收这个线程。

那么线程数什么时候会大于_minThreads?来看看dispatch()方法中的核心代码

 

[java] view plain copy

 
 print?

  1. // If we had no idle threads or the jobQ is greater than the idle threads  
  2.                if (idle==0 || jobQ>idle)  
  3.                {  
  4.                    int threads=_threadsStarted.get();  
  5.                    if (threads<_maxThreads)  
  6.                        startThread(threads);  
  7.                }  

如果没有空闲的线程或者空闲线程数太少,在保证线程数没有超过_maxThreads时会新建线程。

原文链接:[http://wely.iteye.com/blog/2360486]

时间: 2024-10-31 22:31:04

jetty的NIO线程模型的相关文章

tomcat的NIO线程模型源码分析

1 tomcat8的并发参数控制 这种问题其实到官方文档上查看一番就可以知道,tomcat很早的版本还是使用的BIO,之后就支持NIO了,具体版本我也不记得了,有兴趣的自己可以去查下.本篇的tomcat版本是tomcat8.5.可以到这里看下tomcat8.5的配置参数 我们先来简单回顾下目前一般的NIO服务器端的大致实现,借鉴infoq上的一篇文章Netty系列之Netty线程模型中的一张图 一个或多个Acceptor线程,每个线程都有自己的Selector,Acceptor只负责accept

Netty线程模型详解

1. 背景 1.1. Java线程模型的演进 1.1.1. 单线程 时间回到十几年前,那时主流的CPU都还是单核(除了商用高性能的小机),CPU的核心频率是机器最重要的指标之一. 在Java领域当时比较流行的是单线程编程,对于CPU密集型的应用程序而言,频繁的通过多线程进行协作和抢占时间片反而会降低性能. 1.1.2. 多线程 随着硬件性能的提升,CPU的核数越来越越多,很多服务器标配已经达到32或64核.通过多线程并发编程,可以充分利用多核CPU的处理能力,提升系统的处理效率和并发性能. 从2

Netty的线程模型

1. 背景 1.1. Java线程模型的演进 1.1.1. 单线程 时间回到十几年前,那时主流的CPU都还是单核(除了商用高性能的小机),CPU的核心频率是机器最重要的指标之一. 在Java领域当时比较流行的是单线程编程,对于CPU密集型的应用程序而言,频繁的通过多线程进行协作和抢占时间片反而会降低性能. 1.1.2. 多线程 随着硬件性能的提升,CPU的核数越来越越多,很多服务器标配已经达到32或64核.通过多线程并发编程,可以充分利用多核CPU的处理能力,提升系统的处理效率和并发性能.  

dubbo的线程模型

      事件处理线程说明 1.如果事件处理的逻辑能迅速完成,并且不会发起新的IO请求,比如只是在内存中记个标识,则直接在IO线程上处理更快,因为减少了线程池调度. 2.但如果事件处理逻辑较慢,或者需要发起新的IO请求,比如需要查询数据库,则必须派发到线程池,否则IO线程阻塞,将导致不能接收其它请求. 3.如果用IO线程处理事件,又在事件处理过程中发起新的IO请求,比如在连接事件中发起登录请求,会报"可能引发死锁"异常,但不会真死锁. Dispatcher all 所有消息都派发到线

Java线程模型缺陷研究

Java 编程语言的线程模型可能是此语言中最薄弱的部分.它完全不适合实际复杂程序的要求,而且也完全不是面向对象的.本文建议对 Java 语言进行重大修改和补充,以解决这些问题. Java 语言的线程模型是此语言的一个最难另人满意的部分.尽管 Java 语言本身就支持线程编程是件好事,但是它对线程的语法和类包的支持太少,只能适用于极小型的应用环境. 关于 Java 线程编程的大多数书籍都长篇累牍地指出了 Java 线程模型的缺陷,并提供了解决这些问题的急救包(Band-Aid/邦迪创可贴)类库.我

socket编程与线程模型一

这里线程模型是指winsock相关的线程模型设计. 在本软件的设计的过程中有些问题是涉及到winsock的问题,为了能够很好的 设计线程模型,必须理解清楚socket的内部工作机制.为此,首先从外面开始分 析. 一.为什么使用多线程 1.使用多线程是为了避免应用程序主界面在I/O操作中没有反应,出现假死 机现象. Socket是一种特殊的I/O,所以很可能会出现这种现象.例如发送数据,或者 连接服务器的时候. 2.为了提高cpu利用率(在多cpu环境)和改善应用程序的并发性能. 在多cpu环境,

AIX6.1 线程模型说明

引文:线程模型(Threading Model)默认从进程域 (M:N 模型 ) 改为系统全局域 (1:1 模型 )   在 AIX 5L 中,pthread 线程的默认模型是 m:n 方式,而从 AIX 6.1 开始,默认改为了 1:1 方式.这两种方式在系统中通过 AIXTHREAD_SCOPE 环境变量来进行控制.如果设置 AIXTHREAD_SCOPE=P,则线程模型为进程域(M:N 模型),设置 AIXTHREAD_SCOPE=S 则为系统域(1:1 模型). 1:1 模型下,每个用户

理解RxJava线程模型

RxJava作为目前一款超火的框架,它便捷的线程切换一直被人们津津乐道,本文从源码的角度,来对RxJava的线程模型做一次深入理解.(注:本文的多处代码都并非原本的RxJava的源码,而是用来说明逻辑的伪代码) 入手体验 RxJava 中切换线程非常简单,例如最常见的异步线程处理,主线程回调的模型,可以很优雅的用如下代码来做处理: Observable.just("magic")          .map(str -> doExpensiveWork(str))        

《C++多线程编程实战》——.7 线程模型的实现

2.7 线程模型的实现 我们可以把进程看作是一个对象,它的任务就是把相关资源分组.每个进程都有一个地址空间,如图2.10所示. 图2.10 进程的地址空间 这个所谓的进程图像必须在初始化CreateProcess时加载至物理内存中.所有的资源(如文件句柄.子进程的信息.信号处理器等)都被储存起来.把它们以进程的形式分组在一起,更容易管理. 除进程外,还有一个重要的概念是线程.线程是CPU可执行调度的最小单位.也就是说,进程本身不能获得CPU时间,只有它的线程才可以.线程通过它的工作变量和栈来储存