Tornado 源码分析(一)

tornado.gen.engine 是 tornado v3.0.0之前用于异步方法的。 具体点说如果一个方法内部使用yield 方法返回一个异步调用的结果, 那么这个方法必须使用tornado.gen.engine装饰。 tornado v3.0.0之后被tornado.gen.coroutines取代。

如果没有engine, 异步也是可行的,只不过需要大量callback, 就像Javascript那样。
一个最经典的异步handler如下:

class SyncDemo(web.RequestHandler):                                                                            

  @asynchronous
  def get(self):
      http_client = httpclient.AsyncHTTPClient()
      http_client.fetch("http://www.baidu.com", self.on_fetch)

  def on_fetch(self, response):
      return self.finish(response)

@asynchronous 用来说明这个get方法是异步的, tornado 不在负责finish request, 由handler自己处理。
tornado.httpclient.AsyncHTTPClient.fetch 是进行异步网路请求的方法。其函数声明为:
def fetch(self, request, callback, **kwargs)
可见他有一个callback参数, 当网络请求完成后, 将调用callback方法, 这一切是有ioloop实现的。

使用engine后,代码会更接近顺序代码,没有回调如下:

class AsyncDemo(web.RequestHandler):
    @asynchronous
    @engine
    def get(self):
        http_client = httpclient.AsyncHTTPClient()
        response = yield Task(httpclient.fetch, 'http://www.baidu.com')
        self.finish(response.body)

但是在AsyncDemo 的例子中, tornado.httpclient.AsyncHTTPClient.fetch 放到了Task中, 当异步方法完成时, response被直接赋值给了response 。 callback还是存在的, 也是执行的,只不过Task和engine 这两个东西,让代码接近顺序代码了。

原理是这样的:
函数内部如果有yield, 那么调用这个函数讲产生types.Generator实例, 这个函数的调用将由这个生成器控制。

words = ['a', 'b', 'c']

def async_func():
    for i in range(len(words)):
        w = yield i
        print "in_func : ", w

runner = async_func()
yield_value = runner.send(None)
print "out_func : ", yield_value
while 1:
    try:
        yield_value = runner.send(words[yield_value])
        print "out_func : ", yield_value
    except StopIteration:
        break

运行结果如下:

out_func :  0
in_func :  a
out_func :  1
in_func :  b
out_func :  2
in_func :  c

tornado.gen.Runner是一个管理生成器和生成器内部异步函数运行结果的类。被engine装饰的函数, 每当调用都会产生一个生成器,同时tornado都会为他生成一个Runner实例,Runner实例负责在合适的时候调用这个生成器的send方法,把函数执行完。

# tornado v2.4.1 tornado/gen.py
def engine(func):
"""Decorator for asynchronous generators.
.....
"""
    @functools.wraps(func)
    def wrapper(args, *kwargs):
        runner = None
        def handle_exception(typ, value, tb):
            if runner is not None:
                return runner.handle_exception(typ, value, tb)
            return False
        with ExceptionStackContext(handle_exception) as deactivate:
            gen = func(args, *kwargs)
            if isinstance(gen, types.GeneratorType):
                runner = Runner(gen, deactivate)
                runner.run()
                return
            assert gen is None, gen
            deactivate()
    return wrapper

Runner在什么时候调用生成器的send方法呢? 这是由回调函数决定的。当回调函数执行时, 我们有了异步函数的执行结果response, 同时有了函数运行体Runner, 由Runner负责send(response)。 这样我们的函数就能向下运行了。 所以需要将Runner和callback关联。 tornao.gen.Task完成了这项任务。

# torando v2.4.1 torando/gen.py
class Task(YieldPoint):
    """Runs a single asynchronous operation.
    ...
    """
    def __init__(self, func, args, *kwargs):
        assert "callback" not in kwargs
        self.args = args
        self.kwargs = kwargs
        self.func = func

    def start(self, runner):
        self.runner = runner
        self.key = object()
        runner.register_callback(self.key)
        self.kwargs["callback"] = runner.result_callback(self.key)
        self.func(self.args, *self.kwargs)

    def is_ready(self):
        return self.runner.is_ready(self.key)

    def get_result(self):
        return self.runner.pop_result(self.key)

注意start方法, 首先向Runner注册了一个key, 在Runner内部, func的回调结果会和这个key关联。
然后构造了一个callback函数, Runner.result_callback源码如下:

  # tornado v2.4.1 tornado.gen.Runner.result_callback
    def result_callback(self, key):
        def inner(args, *kwargs):
            if kwargs or len(args) > 1:
                result = Arguments(args, kwargs)
            elif args:
                result = args[0]
            else:
                result = None
            self.set_result(key, result)
        return inner

    # tornado v2.4.1 tornado.gen.Runner.set_result
    def set_result(self, key, result):
        """Sets the result for ``key`` and attempts to resume the generator."""
        self.results[key] = result
        self.run()

可见,当回调函数运行时, Runner记录了异步函数的回调结果,并和key关联,然后触发了Runner继续往后运行。

Runner.run 方法也就很好理解了

# torando v2.4.1 torndo.gen.Runner.run
def run(self):
    """Starts or resumes the generator, running until it reaches a
    yield point that is not ready.
    """
    if self.running or self.finished:
        return
    try:
        self.running = True
        while True:
            if self.exc_info is None:
                try:
                    if not self.yield_point.is_ready():
                        return
                    next = self.yield_point.get_result()
                except Exception:
                    self.exc_info = sys.exc_info()
            try:
                if self.exc_info is not None:
                    self.had_exception = True
                    exc_info = self.exc_info
                    self.exc_info = None
                    yielded = self.gen.throw(*exc_info)
                else:
                    yielded = self.gen.send(next)
            except StopIteration:
                self.finished = True
                if self.pending_callbacks and not self.had_exception:
                    # If we ran cleanly without waiting on all callbacks
                    # raise an error (really more of a warning).  If we
                    # had an exception then some callbacks may have been
                    # orphaned, so skip the check in that case.
                    raise LeakedCallbackError(
                        "finished without waiting for callbacks %r" %
                        self.pending_callbacks)
                self.deactivate_stack_context()
                self.deactivate_stack_context = None
                return
            except Exception:
                self.finished = True
                raise
            if isinstance(yielded, list):
                yielded = Multi(yielded)
            if isinstance(yielded, YieldPoint):
                self.yield_point = yielded
                try:
                    self.yield_point.start(self)
                except Exception:
                    self.exc_info = sys.exc_info()
            else:
                self.exc_info = (BadYieldError("yielded unknown object %r" % yielded),)
    finally:
        self.running = False

原文出处: http://shaolianbo.github.io/web/tornado/2015/03/01/tornado1

时间: 2024-10-26 10:21:39

Tornado 源码分析(一)的相关文章

WebWork2源码分析

web Author: zhuam   昨晚一口气看完了夏昕写的<<Webwork2_Guide>>,虽然文档资料很简洁,但仍不失为一本好的WebWork2书籍,看的出作者的经验和能力都是非常的老道,在此向作者的开源精神致敬,并在此引用夏昕的那句话So many open source projects, Why not Open your Documents?   今天下载了最新的WebWork2版本, 开始了源码分析,这份文档只能算是我的个人笔记,也没时间细细校对,且个人能力有

JUnir源码分析(一)

一.引子 JUnit源码是我仔细阅读过的第一个开源项目源码.阅读高手写的代码能学到一些好的编程风格和实现思路,这是提高自己编程水平行之有效的方法,因此早就想看看这些赫赫有名的框架是怎么回事了.今天就拿最简单的JUnit下手,也算开始自己的源码分析之路.   JUnit作为最著名的单元测试框架,由两位业界有名人士协力完成,已经经历了多次版本升级(了解JUnit基础.JUnit实践).JUnit总体来说短小而精悍,有不少值得我们借鉴的经验在里面:但是也有一些不足存在,当然这对于任何程序来说都是难免的

java io学习(三) 管道的简介,源码分析和示例

管道(PipedOutputStream和PipedInputStream)的简介,源码分析和示例 本章,我们对java 管道进行学习. java 管道介绍 在java中,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流. 它们的作用是让多线程可以通过管道进行线程间的通讯.在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用. 使用管道通信时,大致的流程是:我们在线程A中向PipedOutputStr

java io学习(二)ByteArrayOutputStream的简介,源码分析和示例

ByteArrayOutputStream的简介,源码分析和示例(包括OutputStream) 前面学习ByteArrayInputStream,了解了"输入流".接下来,我们学习与ByteArrayInputStream相对应的输出流,即ByteArrayOutputStream. 本章,我们会先对ByteArrayOutputStream进行介绍,在了解了它的源码之后,再通过示例来掌握如何使用它. ByteArrayOutputStream 介绍 ByteArrayOutputS

java io学习(一)ByteArrayInputStream的简介,源码分析和示例

ByteArrayInputStream的简介,源码分析和示例(包括InputStream) 我们以ByteArrayInputStream,拉开对字节类型的"输入流"的学习序幕. 本章,我们会先对ByteArrayInputStream进行介绍,然后深入了解一下它的源码,最后通过示例来掌握它的用法. ByteArrayInputStream 介绍 ByteArrayInputStream 是字节数组输入流.它继承于InputStream. 它包含一个内部缓冲区,该缓冲区包含从流中读取

mahout源码分析之DistributedLanczosSolver(七) 总结篇

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit. 看svd算法官网上面使用的是亚马逊的云平台计算的,不过给出了svd算法的调用方式,当算出了eigenVectors后,应该怎么做呢?比如原始数据是600*60(600行,60列)的数据,计算得到的eigenVectors是24*60(其中的24是不大于rank的一个值),那么最后得到的结果应该是original_data乘以eigenVectors的转置这样就会得到一个600*24的矩阵,这样就达到了

mahout源码分析之DistributedLanczosSolver(五)

Mahout版本:0.7,hadoop版本:1.0.4,jdk:1.7.0_25 64bit. 1. Job 篇 接上篇,分析到EigenVerificationJob的run方法: public int run(Path corpusInput, Path eigenInput, Path output, Path tempOut, double maxError, double minEigenValue, boolean inMemory, Configuration conf) thro

Jquery源码分析---概述

jQuery是一个非常优秀的JS库,与Prototype,YUI,Mootools等众多的Js类库 相比,它剑走偏锋,从web开发实用的角度出发,抛除了其它Lib中一些不实用的 东西,为开发者提供了短小精悍的类库.其短小精悍,使用简单方便,性能高效 ,能极大地提高开发效率,是开发web应用的最佳的辅助工具之一.因此大部分 开发者在抛弃Prototype而选择Jquery来进行web开发. 一些开发人员在使用jquery时,由于仅仅只知道Jquery文档中的使用方法, 不明白Jquery的运行原理

Jquery源码分析---导言

jQuery是一个非常优秀的JS库,与Prototype,YUI,Mootools等众多的Js类库 相比,它剑走偏锋,从web开发的实用角度出发,抛除了其它Lib中一些中看但不 实用的东西,为开发者提供了优美短小而精悍的类库.其使用简单,文档丰富, 而且性能高效,能极大地提高web系统的开发效率.因此可以说是web应用开发中 最佳的Js辅助类库之一.大部分开发者正在抛弃Prototype,而选择Jquery做为 他们进行web开发的JS库. 如是开发人员仅仅只知道文档中的简单的使用 方法,却不明