0%

tornado.gen 模块解析

引言

注:正文中引用的 Tornado 代码除特别说明外,都默认引用自 Tornado 4.0.1。

tornado.gen 模块是一个基于 python generator 实现的异步编程接口。通过该模块提供的 coroutine (注:这里 coroutine 指的是 ”协程” 概念而不是后面具体实现的 decorator:@gen.decorator),大大简化了在 Tornado 中编写异步代码的工作 —— 支持 “同步方式编写异步代码” ,避免编写烦人的回调函数。参考官方文档的例子,通常我们编写的异步代码如下:

1
2
3
4
5
6
7
8
9
10
class AsyncHandler(RequestHandler):
@asynchronous
def get(self):
http_client = AsyncHTTPClient()
http_client.fetch("http://example.com",
callback=self.on_fetch)

def on_fetch(self, response):
do_something_with_response(response)
self.render("template.html")

而使用 tornado.gen 模块提供的 decorator ,在 Tornado 3.1 以前我们可以这样写异步代码:

1
2
3
4
5
6
7
8
class GenAsyncHandler(RequestHandler):
@asynchronous
@gen.engine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")

Tornado 3.1 及以上版本,可以直接使用 @gen.coroutine 来代替 @asynchronous:

1
2
3
4
5
6
7
class GenAsyncHandler(RequestHandler):
@gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
response = yield http_client.fetch("http://example.com")
do_something_with_response(response)
self.render("template.html")

注:@asynchronous 在 tornado.web 中定义,对于使用了 @gen.coroutine 装饰的方法不需要再使用 @asynchronous 进行装饰,但同时使用前述 2 个 decorator 进行方法装饰也是合法的,在同时使用的情况下需要注意的是 @asynchronous 必须是第 1 个 decorator 。

很显然,采用同步方式编写的异步代码相比起分散在各处的异步回调函数代码,更利于代码的阅读和逻辑的组织。

该模块的实现非常巧妙也不容易理解,作为阅读 Tonardo 源码的笔记,我将在后面内容中结合源码和自己的理解对其实现进行分析。

@gen.coroutine 与 @gen.engine 的实现原理

tornado.gen 支持以同步方式编写异步代码的核心就是 python generator。其原理简单来说,就是通过 generator.next() 启动 yield 返回的 generator ,通过 IOLoop 与 generator.send(value) 驱动 generator 运行,以达到协调异步执行的目的。

从功能上来看, @gen.coroutine 与 @gen.engine 的功能非常相似,差别就在于二者对被装饰方法参数中的 “callback” 参数处理不一样以及具有不同的返回值。 @gen.coroutine 装饰的方法执行后返回 Future 对象并且会将方法参数中的 “callback” 加入到 Future 完成后的回调列表中;@gen.engine 装饰的方法执行后没有返回值(注:实际上如果被装饰方法有返回值,会抛出 ReturnValueIgnoredError 异常,详见后面的代码分析部分)。

所以,通过 @gen.engine 装饰的方法没有返回值,方法必须自己在异步调用完成后调用 “callback” 来执行回调动作,而通过 @gen.coroutine 装饰的方法则可以直接返回执行结果,然后由 gen 模块负责将结果传递给 “callback” 来执行回调。

注: 从调用者的角度来看 `@gen.coroutine可以视为@tornado.concurrent.return_future@gen.engine` 的组合。

@gen.coroutine 实现原理

@gen.coroutine 中充分利用了 generator 的特性,下面是其实现代码及分析。

1
2
3
def coroutine(func, replace_callback=True):
"""Decorator for asynchronous generators."""
return _make_coroutine_wrapper(func, replace_callback=True)

coroutine 内部直接委托 _make_coroutine_wrapper 完成具体功能(这段代码中 coroutine 的可选参数 “replace_callback” 是没有使用的),返回一个 Future 实例对象。

_make_coroutine_wrapper(func, replace_callback) 函数作为 @gen.coroutine 和 @gen.engine 内部实现,通过 replace_callback 的值来决定是否对 “callback” 方法参数进行处理。coroutine 的实现中通过 replace_callback=True 调用 _make_coroutine_wrapper 函数,会检查方法参数中是否有 “callback” 参数,如果有的话会将其加入到方法返回值 Future 的完成后回调列表中。如下面代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def _make_coroutine_wrapper(func, replace_callback):
@functools.wraps(func)
def wrapper(*args, **kwargs):
future = TracebackFuture()

# 处理 “callback”,忽略或者将其加入到 Future 的完成回调列表中。
if replace_callback and 'callback' in kwargs:
callback = kwargs.pop('callback')
IOLoop.current().add_future(
future, lambda future: callback(future.result()))

try:
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
# 在 python 2 以及 python 3.3 以前,generator 中不能直接通过
# return 返回值:return 被视为 raise StopIteration(),
# return <something> 被视为raise StopIteration(<something>)。
# 在 gen 模块中,特别定义了 Return 类型用于返回值:raise gen.Return(something>)
result = getattr(e, 'value', None)
except Exception:
# 发生异常,异常被写入 future(将会被设置为完成状态),结束调用,返回 future
future.set_exc_info(sys.exc_info())
return future
else:
if isinstance(result, types.GeneratorType):
# 通过检查 result 是否为 GeneratorType 来选择是否创建 coroutine ,对于
# 同步情况直接 future.set_result(result) 返回,避免创建 coroutine 而
# 造成的性能损失。
# 与 Tornado 4.0 之前的版本比较,这里已经把顶层 ExceptionStackContext
# 的构建以及 Runner.run 的功能进行了重构,都迁移到了 Runner 实现中。
#
# 通过 next 启动 generator ,启动前记录上下文,启动后对上下文进行一致性检查。
# 若 generator 中有从 "with StackContext" 直接 “yield” 的代码逻辑,将抛
# 出 StackContextInconsistentError 异常。
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result)
if stack_context._state.contexts is not orig_stack_contexts:
yielded = TracebackFuture()
yielded.set_exception(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
future.set_result(getattr(e, 'value', None))
except Exception:
future.set_exc_info(sys.exc_info())
else:
Runner(result, future, yielded)
try:
return future
finally:
# Subtle memory optimization: if next() raised an exception,
# the future's exc_info contains a traceback which
# includes this stack frame. This creates a cycle,
# which will be collected at the next full GC but has
# been shown to greatly increase memory usage of
# benchmarks (relative to the refcount-based scheme
# used in the absence of cycles). We can avoid the
# cycle by clearing the local variable after we return it.
#
# 代码注释中说,generator.next() 抛出异常失败后, future 的 exc_info
# 中会包含当前栈帧的引用,栈帧中也有对 future 的引用,这样导致一个环,必须
# 要在下一次 full GC 时才能回收内存。返回 future 后将 future 设置为 None
# 可以优化内存。(注:需要 full GC 是与 python 的垃圾回收实现采用引用计数
# 为主,标记-清除和分代机制为辅相关。python 采用引用计数来立刻释放可以释放
# 的内存,然后用标记-清除的方法来清除循环引用的不可达对象。)
future = None

# 同步情况下,不需要创建 coroutine,直接返回 future。
future.set_result(result)
return future
return wrapper

class Return(Exception):
def __init__(self, value=None):
super(Return, self).__init__()
self.value = value

注: 关于 CPython 的 GC 实现,这里有一篇不错的源码分析文章:Python垃圾回收机制

如下面的代码所示, IOLoop 的 add_future 方法会封装回调方法,在 Future 完成以后会将 “callback” 加入到 IOLoop 的回调列表中以等待 IOLoop 调度执行回调动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
def add_future(self, future, callback):
"""Schedules a callback on the ``IOLoop`` when the given
`.Future` is finished.

The callback is invoked with one argument, the
`.Future`.
"""
assert is_future(future)
callback = stack_context.wrap(callback)
# 在 future 的完成回调列表中增加一个 lambda 表达式,负责在
# 将 “callback” 加入 IOLoop 调度执行。
future.add_done_callback(
lambda future: self.add_callback(callback, future))

从上面的代码分析中可以看到 _make_coroutine_wrapper 函数已经完成了 coroutine 的创建,其代码逻辑比较简单,而整个 coroutine 启动、运行的核心功能被实现在 Runner 类中。 Runner 有一个 run() 方法,该方法负责启动 coroutine,并与 IOLoop 配合驱动 YieldPoint(注:在 generator 中通过 yield 返回的实例类型,Tornado 4.0 及以后推荐使用 Futures 类型, YieldPoint 类型被放弃) 执行直到 result_future 完成。 run() 方法的详细代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def run(self):
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready.
"""
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future

# 当前 future 没有完成时直接返回,等待 IOLoop 在 future 完成后回调再执行
if not future.done():
return

# 当前 future 完成后对 coroutine 接下来运行没作用,立即释放
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
try:
value = future.result()
except Exception:
self.had_exception = True
yielded = self.gen.throw(*sys.exc_info())
else:
# 将 future 的结果赋值给当前 yield 表达式,驱动 generator 继续
# 执行, (如果generator未结束的话)返回下一个 yield 表达式结果
yielded = self.gen.send(value)
if stack_context._state.contexts is not orig_stack_contexts:
self.gen.throw(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
# generator 执行完成,将 执行结果赋值给 result_future,返回
self.finished = True
self.future = _null_future

# Tornado 4.0 之前使用 YieldPoint 驱动,Callback 与 Wait/WaitAll
# 协调时,Callback 的回调结果需要 Runner 作为中转站,通过
# Runner.register_callback(key) 登记 Callback ,再通过
# YieldPoint.result_callback(key) 取回“设置(回调)方法”,
# 外部通过“设置(回调)方法”把结果保存到 Runner.results 字典中。
# Wait/WaitAll 通过 get_result(key) 取回 结果。
# YieldFuture 的实现也采用了相同的实现方式。
# Tornado 4.0 之后使用 Future 代替 YieldPoint,这些已经过时。
# 与 Yield 相关的代码都是为了向后兼容。
if self.pending_callbacks and not self.had_exception:
# If we ran cleanly without waiting on all callbacks
# raise an error (really more of a warning). If we
# had an exception then some callbacks may have been
# orphaned, so skip the check in that case.
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
self.result_future.set_result(getattr(e, 'value', None))
self.result_future = None
self._deactivate_stack_context()
return
except Exception:
self.finished = True
self.future = _null_future
self.result_future.set_exc_info(sys.exc_info())
self.result_future = None
self._deactivate_stack_context()
return

# 继续处理 yield 表达式结果
if not self.handle_yield(yielded):
return
finally:
self.running = False

def handle_yield(self, yielded):
# 为了保持向后兼容,需要对多个 YieldPonit 和 Future 的混合集合做处理。
# 对于全是 Future 的集合类型使用新的 multi_future 函数进行封装处理;
# 不全是的使用 Multi 类进行封装,对于 Future 提供了 YieldFuture 适配器类。
# 详细的实现细节见 YieldFuture、Multi的实现代码。
# 若需要 run() 循环立即处理该 YieldPoint(被启动)/Future(已经完成) 则返
# 回 True,否则返回 False。
if isinstance(yielded, list):
if all(is_future(f) for f in yielded):
yielded = multi_future(yielded)
else:
yielded = Multi(yielded)
elif isinstance(yielded, dict):
if all(is_future(f) for f in yielded.values()):
yielded = multi_future(yielded)
else:
yielded = Multi(yielded)

# 针对第一个 YieldPoint 使用一个 ExceptionStackContext 上下文来处理
# StackContexts 中没有处理的异常,将未处理的异常记录到 result_future 中。
# 对于 Future 对象则没有必要, Future 提供了方法来记录异常和异常堆栈信息,
# 在 Future 完成后通过其 result() 方法获取结果(在 run 方法的调用)时会
# 再次抛出异常,这时可捕获记录到 result_future 中。
if isinstance(yielded, YieldPoint):
self.future = TracebackFuture()
def start_yield_point():
try:
yielded.start(self)
# 如果 yielded 已经完成,则将其结果赋值给 self.future,等待 run 循环处理;
# 若未就绪,则需要通过 Runner.set_result(key, value) 来进行赋值操作。
if yielded.is_ready():
self.future.set_result(
yielded.get_result())
else:
self.yield_point = yielded
except Exception:
self.future = TracebackFuture()
self.future.set_exc_info(sys.exc_info())
if self.stack_context_deactivate is None:
# Start a stack context if this is the first
# YieldPoint we've seen.
with stack_context.ExceptionStackContext(
self.handle_exception) as deactivate:
self.stack_context_deactivate = deactivate
def cb():
start_yield_point()
self.run()
# 第 1 个 yielded 交由 IOLoop来启动
self.io_loop.add_callback(cb)
return False
else:
# 启动 YieldPoint,需要返回 True,在 run 循环中继续处理
start_yield_point()
elif is_future(yielded):
self.future = yielded
# self.future 完成后继续 self.run()
# moment = Future() 是一个特殊的对象,主要用在需要长时间执行的 coroutine 中,
# 通过 “yield gen.moment” 中断当前 coroutine ,将控制权交给 IOLoop 去轮询。
# 等效于当前 coroutine 临时放弃时间片,给了其他 callback 机会运行。
if not self.future.done() or self.future is moment:
self.io_loop.add_future(
self.future, lambda f: self.run())
return False
else:
self.future = TracebackFuture()
self.future.set_exception(BadYieldError(
"yielded unknown object %r" % (yielded,)))
return True

如上面代码所示, Runner 的核心就是 run/handle_yield 方法,该方法的调用目前已经被内联到 Runner 的初始化方法中。如下面代码所示,根据 handle_yield() 方法返回的结果立即运行 run() 方法或者等待 IOLoop 调度运行 run()方法(Falsehandle_yield 返回 False 时,run() 方法被注册到 IOLoop 回调中执行。)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def __init__(self, gen, result_future, first_yielded):
self.gen = genreturn_futurereturn_future
self.result_future = result_future
self.future = _null_future
self.yield_point = None
self.pending_callbacks = None
self.results = None
self.running = False
self.finished = False
self.had_exception = False
self.io_loop = IOLoop.current()
# For efficiency, we do not create a stack context until we
# reach a YieldPoint (stack contexts are required for the historical
# semantics of YieldPoints, but not for Futures). When we have
# done so, this field will be set and must be called at the end
# of the coroutine.
self.stack_context_deactivate = None
if self.handle_yield(first_yielded):
self.run()

Runner 的其他方法主要是用于支持 YieldPoint 的执行,诸如为 YieldPoint 提供 ExceptionStackContext 支持(注:coroutine 结束时会调用 stack_context_deactivate 将该上下文设置为无效,在 stack_context._remove_deactivated 方法中会清理无效的上下文,并不会污染到上下文链,详细的细节请参考 stack_context 的设计实现)、结果中转(注:这是个人说法,主要是指为协调 Callback 与 Wait/WaitAll 提供的 register_callback、 is_ready、 set_result、 pop_result、 result_callback 几个方法)。

@gen.engine 实现原理

前面内容已经说过 `@gen.engine@gen.coroutine是非常相似的,对使用者而言@gen.coroutine就是@concurrent.return_future@gen.engine的组合(详见 concurrent.return_future 的实现)。下面是@gen.engine` 的实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def engine(func):
# 不对被装饰方法的 "callback" 参数做替换处理,也就是说即使被装饰方法有 “callback” 参数,
# 在 coroutine 执行完成得到结果以后不会“自动调用”该 “callback”。细节将
# _make_coroutine_wrapper 实现代码。
func = _make_coroutine_wrapper(func, replace_callback=False)
@functools.wraps(func)
def wrapper(*args, **kwargs):
# 获取 coroutine 的执行结果,由于 coroutine 执行完成后不会自动调用 "callback" ,所
# 以要求被装饰方法不能有返回值(非 None)而必须自己调用 "callback",否则抛出
# ReturnValueIgnoredError 异常。
future = func(*args, **kwargs)
def final_callback(future):
if future.result() is not None:
raise ReturnValueIgnoredError(
"@gen.engine functions cannot return values: %r" %
(future.result(),))
future.add_done_callback(final_callback)
return wrapper

把普通的异步方法适配到 coroutine

通过上面的分析可以看到,Tornado 实现的 coroutine 支持编写 “同步方式的异步代码”,但是要求异步调用的返回值是 Future 或者 YieldPoint 实例,对于这样的异步方法,我们只需要简单的使用 yield 表达式便可以轻松将其转换为 coroutine。而对于返回值为 None,仅支持通过 “callback” 参数回调(异步结果执行的结果会作为 “callback” 调用的实参)的普通异步方法,便不能直接被 Tornado 的 coroutine 支持,需要我们自己做一些额外的封装工作。

tornado.gen 模块提供了一个标准的封装函数 Task(注:Tornado 4.0 以前 Task 是作为 YieldPoint 的子类来实现的,之后改为返回 Future 实例的函数,为了向后兼容,所以是一个拥有“类名”的函数。)。Task 的实现原理很简单,因为这种普通异步方法没有返回值而是通过把异步结果作为回调函数的实参来达到传递的目的,所以 Task 就将这种方法包装成返回值为 Future 的方法然后通过方法的回调函数来把异步结果传递给返回的 Future 实例。下面是 Task 的实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def Task(func, *args, **kwargs):
future = Future()
def handle_exception(typ, value, tb):
if future.done():
return False
future.set_exc_info((typ, value, tb))
return True

# 提供给 func 的回调函数,将 func 的异步结果传递给 future。
# 注意: 这个回调函数仅支持一个 “参数”。
def set_result(result):
if future.done():
return
future.set_result(result)

# 由于 func 的 “callback” 的形参个数是不确定的(或者说 func 回调 “callback”
# 的形参个数不确定),要适配 set_result 就需要将形参包装成一个对象传递给
# set_result。 _argument_adapter 函数就是负责完成这个封装功能的,对于 0 个或 1
# 个实参调用的情况,_argument_adapter 不做任何处理直接将该参数传递给 set_result,
# 对于多个实参的情况,这将参数包装成 Arguments(namedtuple) 传递给 set_result。
# 参数的 “callback”,
with stack_context.ExceptionStackContext(handle_exception):
func(*args, callback=_argument_adapter(set_result), **kwargs)
return future

Arguments = collections.namedtuple('Arguments', ['args', 'kwargs'])

def _argument_adapter(callback):
"""Returns a function that when invoked runs ``callback`` with one arg.

If the function returned by this function is called with exactly
one argument, that argument is passed to ``callback``. Otherwise
the args tuple and kwargs dict are wrapped in an `Arguments` object.
"""
def wrapper(*args, **kwargs):
if kwargs or len(args) > 1:
callback(Arguments(args, kwargs))
elif args:
callback(args[0])
else:
callback(None)
return wrapper

注:Task 内部已经实现了隐式传递 “callback” 参数,所以使用 Task 时不能显示传递 “callback”。

结束语

python generator 是个功能强大的利器(PEP 342 加入了新的特性,能让生成器在单一语句中实现,生成一个值或者接受一个值,或同时生成一个值并接受一个值。),是 tornado.gen 模块实现 coroutine 的基石。虽然 tornado.gen 的核心就是 generator,但是其整个设计和实现都非常巧妙,并且随着 Tornado 版本的演变该模块也在不断重构和优化,对比其不同版本的实现演进,对于我们理解学习都非常有价值,值得反复研读。