tornado.gen.engine 是 tornado v3.0.0之前用于异步方法的。 具体点说如果一个方法内部使用yield 方法返回一个异步调用的结果, 那么这个方法必须使用tornado.gen.engine装饰。 tornado v3.0.0之后被tornado.gen.coroutines取代。
如果没有engine, 异步也是可行的,只不过需要大量callback, 就像Javascript那样。
一个最经典的异步handler如下:
class SyncDemo(web.RequestHandler):
@asynchronous
def get(self):
http_client = httpclient.AsyncHTTPClient()
http_client.fetch("http://www.baidu.com", self.on_fetch)
def on_fetch(self, response):
return self.finish(response)
@asynchronous
用来说明这个get方法是异步的, tornado 不在负责finish request, 由handler自己处理。tornado.httpclient.AsyncHTTPClient.fetch
是进行异步网路请求的方法。其函数声明为:def fetch(self, request, callback, **kwargs)
可见他有一个callback参数, 当网络请求完成后, 将调用callback方法, 这一切是有ioloop实现的。
使用engine后,代码会更接近顺序代码,没有回调如下:
class AsyncDemo(web.RequestHandler):
@asynchronous
@engine
def get(self):
http_client = httpclient.AsyncHTTPClient()
response = yield Task(httpclient.fetch, 'http://www.baidu.com')
self.finish(response.body)
但是在AsyncDemo
的例子中, tornado.httpclient.AsyncHTTPClient.fetch
放到了Task中, 当异步方法完成时, response被直接赋值给了response 。 callback还是存在的, 也是执行的,只不过Task和engine 这两个东西,让代码接近顺序代码了。
原理是这样的:
函数内部如果有yield, 那么调用这个函数讲产生types.Generator实例, 这个函数的调用将由这个生成器控制。
words = ['a', 'b', 'c']
def async_func():
for i in range(len(words)):
w = yield i
print "in_func : ", w
runner = async_func()
yield_value = runner.send(None)
print "out_func : ", yield_value
while 1:
try:
yield_value = runner.send(words[yield_value])
print "out_func : ", yield_value
except StopIteration:
break
运行结果如下:
out_func : 0
in_func : a
out_func : 1
in_func : b
out_func : 2
in_func : c
tornado.gen.Runner是一个管理生成器和生成器内部异步函数运行结果的类。被engine装饰的函数, 每当调用都会产生一个生成器,同时tornado都会为他生成一个Runner实例,Runner实例负责在合适的时候调用这个生成器的send方法,把函数执行完。
# tornado v2.4.1 tornado/gen.py
def engine(func):
"""Decorator for asynchronous generators.
.....
"""
@functools.wraps(func)
def wrapper(args, *kwargs):
runner = None
def handle_exception(typ, value, tb):
if runner is not None:
return runner.handle_exception(typ, value, tb)
return False
with ExceptionStackContext(handle_exception) as deactivate:
gen = func(args, *kwargs)
if isinstance(gen, types.GeneratorType):
runner = Runner(gen, deactivate)
runner.run()
return
assert gen is None, gen
deactivate()
return wrapper
Runner在什么时候调用生成器的send方法呢? 这是由回调函数决定的。当回调函数执行时, 我们有了异步函数的执行结果response, 同时有了函数运行体Runner, 由Runner负责send(response)。 这样我们的函数就能向下运行了。 所以需要将Runner和callback关联。 tornao.gen.Task完成了这项任务。
# torando v2.4.1 torando/gen.py
class Task(YieldPoint):
"""Runs a single asynchronous operation.
...
"""
def __init__(self, func, args, *kwargs):
assert "callback" not in kwargs
self.args = args
self.kwargs = kwargs
self.func = func
def start(self, runner):
self.runner = runner
self.key = object()
runner.register_callback(self.key)
self.kwargs["callback"] = runner.result_callback(self.key)
self.func(self.args, *self.kwargs)
def is_ready(self):
return self.runner.is_ready(self.key)
def get_result(self):
return self.runner.pop_result(self.key)
注意start方法, 首先向Runner注册了一个key, 在Runner内部, func的回调结果会和这个key关联。
然后构造了一个callback函数, Runner.result_callback源码如下:
# tornado v2.4.1 tornado.gen.Runner.result_callback
def result_callback(self, key):
def inner(args, *kwargs):
if kwargs or len(args) > 1:
result = Arguments(args, kwargs)
elif args:
result = args[0]
else:
result = None
self.set_result(key, result)
return inner
# tornado v2.4.1 tornado.gen.Runner.set_result
def set_result(self, key, result):
"""Sets the result for ``key`` and attempts to resume the generator."""
self.results[key] = result
self.run()
可见,当回调函数运行时, Runner记录了异步函数的回调结果,并和key关联,然后触发了Runner继续往后运行。
Runner.run 方法也就很好理解了
# torando v2.4.1 torndo.gen.Runner.run
def run(self):
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
if self.exc_info is None:
try:
if not self.yield_point.is_ready():
return
next = self.yield_point.get_result()
except Exception:
self.exc_info = sys.exc_info()
try:
if self.exc_info is not None:
self.had_exception = True
exc_info = self.exc_info
self.exc_info = None
yielded = self.gen.throw(*exc_info)
else:
yielded = self.gen.send(next)
except StopIteration:
self.finished = True
if self.pending_callbacks and not self.had_exception:
# If we ran cleanly without waiting on all callbacks
# raise an error (really more of a warning). If we
# had an exception then some callbacks may have been
# orphaned, so skip the check in that case.
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
self.deactivate_stack_context()
self.deactivate_stack_context = None
return
except Exception:
self.finished = True
raise
if isinstance(yielded, list):
yielded = Multi(yielded)
if isinstance(yielded, YieldPoint):
self.yield_point = yielded
try:
self.yield_point.start(self)
except Exception:
self.exc_info = sys.exc_info()
else:
self.exc_info = (BadYieldError("yielded unknown object %r" % yielded),)
finally:
self.running = False
原文出处: http://shaolianbo.github.io/web/tornado/2015/03/01/tornado1