引言
注:正文中引用的 Tornado 代码除特别说明外,都默认引用自 Tornado 4.0.1。
通过前面 IOLoop、 tornado.gen 模块的分析,我们基本了解了 Tornado 这个异步框架的核心实现,IOLoop 模块负责驱动异步执行, tornado.gen 模块提供 coroutine 的实现,负责支持使用同步方式编写异步代码。到此为止,一切看起来都还不错。接下来我们来看一看 Tornado 中如何处理异步调用上下文状态的。
一起来思考一个在(所有)异步框架中都会遇到的问题,一个 “异步调用” 可以简单理解为:传递一个 “回调函数” 后便立即返回的调用,框架会在异步动作完成后执行 “回调函数”。很显然,这样就导致了一个问题,由于 “回调函数” 实际执行的环境已经脱离了 “异步调用” 时的环境,这便要求 “回调函数” 不能依赖 ”调用时“ 环境。如果真有这个限制的话,那么这个框架使用起来就不是那么顺手了,试想下面一些场景:
(在一个线程中)处理多个异步操作时,可能需要一些共享的资源,通常我们可以把这些资源保存到 “线程局部变量” 或者 “全局变量” 中以达到共享的目的。但当不是所有的异步操作都需要这些资源时,将资源暴露到不需要的操作中,很可能引发不可预知的问题。
“回调函数” 执行时的环境已经不是 “调用时” 环境,如果 “回调函数” 抛出一些异常,那么很显然不能被 “调用时” 的上下文捕捉到。这与同步代码比较起来显得不够直观,我们希望减少这种差异。
针对这个问题,Tornado 提供了 tornado.stack_context 模块来解决。按照我个人的理解,简单来说就是通过该模块, Tornado 提供了一个叫 StackContext 的机制, StackContext 是一个栈式上下文结构,它能够像 threadlocal 一样为当前操作保存一个栈式上下文快照,当异步执行结束回调时便可以借助这个机制恢复调用时的环境。
通过源代码中注释,我们来看看 Facebook 的工程师们给出的介绍(注:我知道我的翻译就是一坨,看不懂这坨的可以直接看源代码的英文原文注释。):
StackContext
允许应用程序在切换到其他上下文执行时也能保持一个像 threadlocal 一样的状态。一些令人振奋的的例子是使用 StackContext
可以避免显式地使用异步调用的封装器,以及为当前调用增加一些额外的上下文用于输出日志。
这个有些不好理解,异常处理器可以视为这么一种想法(idea)的延伸,它就像一种本地栈的状态,栈在暂停和在新的上下文中恢复时需要被保持(注:把异常处理器也抽象处理成一种特化的上下文,能够被转移。)。 StackContext
++把恢复调用栈的工作转到一种控制一个上下文转移的机制上++。
范例:
1 |
|
大多数应用程序都不需要和 StackContext
直接打交道。什么时候需要用到,这里有些经验法则可供参考:
如果你在写一个不依赖于
tornado.ioloop
或者tornado.iostream
这类函数库(这类库提供 stack_context 的默认支持)的异步库(比如一个线程池),那么在任何异步操作之前你需要使用stack_context.wrap()
函数来取得操作开始时的栈式上下文快照。如果正在写一个需要使用到一些共享资源(比如连接池)的异步库,那么你需要在
with stack_context.NullContext():
块中创建那些共享资源。这样可以防止 StackContexts 从一个请求泄漏到另一个请求。如果你想写一些在可以保持到异步回调时异常处理器,那么创建一个
StackContext
或者ExceptionStackContext
,并把异步调用放在它们的 with 块中。
在学习分析 tornado.stack_context 模块代码之前,我先大概就我个人的理解对该模块实现做一个简单的概述:tornado.stack_context 模块将恢复函数调用栈的工作处理为一种上下文状态的转移,通过上下文状态的转移重建以达到恢复执行环境的目的。虽然与完全恢复函数栈不同,但已经够应对大多数的需求。该模块提供了几个重要的实现:
StackContext
和ExceptionStackContext
实现了上下文状态的转移,可以用来创建对异步有效的上下文状态,ExceptionStackContext
是特化的类型,顾名思义,专门用于处理异常;NullContext
也可视为特化的类型,专门用于清空上下文,适用于处理那些不希望上下文互相污染的情况;wrap(fn)
函数用来给 “回调函数 fn ” 取一个上下文快照,这样 “回调函数 fn ” 在多线程或异步环境执行时便能够恢复自己调用时的上下文(注:比如tornado.ioloop
中添加异步回调前会默认调用wrap(fn)
函数,这就是源代码注释所说的 stack_context-aware)。
实现原理
tornado.stack_context 模块中上下文是以 threadlocal 变量存储的,异步调用前通过 wrap(fn)
函数将当前上下文(链)复制到 “自由变量” 中形成一个闭包返回,异步调用时执行闭包来恢复上下文,如下代码所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43class _State(threading.local):
def __init__(self):
# self.contexts[0],tuple 中包含的是普通的 StackContext 上下文,用于异步调用时恢复上下文状态,
# 具有完整的 StackContext protocol 实现,即 enter, exit ;
# self.contexts[1]为 Head Context ,用于处理异步调用抛出的异常。
self.contexts = (tuple(), None)
# 当前线程的上下文状态
_state = _State()
def wrap(fn):
"""wrap(fn) 函数是上下文调度的“核心”,它通过闭包将当前上下文保存在自由变量
cap_contexts 中,并返回一个可调用的(函数)对象(wrapped 或者 null_wrapper)作为
回调函数 fn 的 wrapper,在之后被调用时(在其他线程或者在相同线程异步调用)会从
cap_contexts 中恢复保存的上下文,然后执行 fn 函数。
"""
# Check if function is already wrapped
if fn is None or hasattr(fn, '_wrapped'):
return fn
# Capture current stack head
# TODO: Any other better way to store contexts and update them in wrapped function?
cap_contexts = [_state.contexts]
if not cap_contexts[0][0] and not cap_contexts[0][1]:
# Fast path when there are no active contexts.
def null_wrapper(*args, **kwargs):
try:
current_state = _state.contexts
_state.contexts = cap_contexts[0]
return fn(*args, **kwargs)
finally:
_state.contexts = current_state
null_wrapper._wrapped = True
return null_wrapper
def wrapped(*args, **kwargs):
"""从 cap_contexts 中恢复上下文,并负责处理 fn 抛出的异常这里略掉具体代码,详情常见后面内容
"""
pass
wrapped._wrapped = True
return wrapped
StackContext 的实现
StackContext
是一个 context 的 wrapper,它接受一个 context manager(一个可调用对象,调用该对象可以返回一个 context) 作为参数。通过 with StackContext(my_context):
将 StackContext 对象加入到当前线程的上下文中(_state.contexts
)。
(注:StackContext
封装 context manager 才能在未来执行回调函数时重新建立新的上下文环境(new context object)。使用 “同一个上下文环境” 和使用 “相同的上下文环境” 是有区别的,StackContext
是为了重建 “相同的上下文环境” 而不是使用 “同一个上下文环境”。)
with StackContext() as cb:
返回的是一个 deactivation 回调,执行这个回调后会将该 StackContext 设置为非活动的(active=False)。非活动的 StackContext 不会被传递,也就是说该 StackContext 封装的上下文不会在后续执行 “回调函数” 时作为 回调函数” 的上下文环境而重建(注:函数 _remove_deactivated 会忽略非活动的 StackContext)。但是这个高级特性在大多数的应用中都不需要。
StackContext
的实现源码如下所示:
1 | class StackContext(object): |
ExceptionStackContext 的实现
ExceptionStackContext
是特化的 StackContext,在语义上类似 try/finally 语句块。其设计初衷是为处理上下文中未被处理的异常,关闭套接字以及完成一些清理工作。与普通的 StackContext
相比 ExceptionStackContext
没有 enter() 方法,只有 exit()
方法,所以它不加入上下文 tuple 中进行传播重建,而仅仅提供 exit()
对上下文中未处理的异常进行后处理。
ExceptionStackContext
由一个可调用的对象 exception_handler 初始化,exception_handler 的调用参数为 exc_info
元组 (type, value, traceback)
。其返回值为 boolean 值,True 表示异常被处理, False 表示异常未被处理需要 propagated 给其他 exception handlers(指上下文链中的其他 StackContext/ExceptionStackContext)。
ExceptionStackContext
的实现源码如下所示:
1 | class ExceptionStackContext(object): |
NullContext 的实现
NullContext
的实现很简单就是清空 _state.contexts
,其源码如下所示:1
2
3
4
5
6
7
8
9
10
11
12
13class NullContext(object):
"""Resets the `StackContext`.
Useful when creating a shared resource on demand (e.g. an
`.AsyncHTTPClient`) where the stack that caused the creating is
not relevant to future operations.
"""
def __enter__(self):
self.old_contexts = _state.contexts
_state.contexts = (tuple(), None)
def __exit__(self, type, value, traceback):
_state.contexts = self.old_contexts
wrap(fn) 函数
wrap(fn)
函数是上下文调度的“核心”,异步调用前用它获取当前上下文快照,再将快照保存在自由变量 cap_contexts
中,并返回一个闭包(wrapped 或者 null_wrapper)作为回调函数 fn 的 wrapper ,在之后回调时(在其他线程或者在相同线程异步调用)再从 cap_contexts 中恢复保存的上下文状态,然后执行 fn 函数。完整实现代码如下所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110def wrap(fn):
"""Returns a callable object that will restore the current `StackContext`
when executed.
Use this whenever saving a callback to be executed later in a
different execution context (either in a different thread or
asynchronously in the same thread).
"""
# Check if function is already wrapped
if fn is None or hasattr(fn, '_wrapped'):
return fn
# Capture current stack head
# TODO: Any other better way to store contexts and update them in wrapped function?
cap_contexts = [_state.contexts]
if not cap_contexts[0][0] and not cap_contexts[0][1]:
# Fast path when there are no active contexts.
def null_wrapper(*args, **kwargs):
try:
current_state = _state.contexts
_state.contexts = cap_contexts[0]
return fn(*args, **kwargs)
finally:
_state.contexts = current_state
null_wrapper._wrapped = True
return null_wrapper
def wrapped(*args, **kwargs):
ret = None
try:
# Capture old state
current_state = _state.contexts
# Remove deactivated items
cap_contexts[0] = contexts = _remove_deactivated(cap_contexts[0])
# Force new state
_state.contexts = contexts
# Current exception
exc = (None, None, None)
top = None
# Apply stack contexts
last_ctx = 0
stack = contexts[0]
# Apply state
for n in stack:
try:
n.enter()
last_ctx += 1
except:
# Exception happened. Record exception info and store top-most handler
exc = sys.exc_info()
top = n.old_contexts[1]
# Execute callback if no exception happened while restoring state
if top is None:
try:
ret = fn(*args, **kwargs)
except:
exc = sys.exc_info()
top = contexts[1]
# If there was exception, try to handle it by going through the exception chain
if top is not None:
exc = _handle_exception(top, exc)
else:
# Otherwise take shorter path and run stack contexts in reverse order
while last_ctx > 0:
last_ctx -= 1
c = stack[last_ctx]
try:
c.exit(*exc)
except:
exc = sys.exc_info()
top = c.old_contexts[1]
break
else:
top = None
# If if exception happened while unrolling, take longer exception handler path
if top is not None:
exc = _handle_exception(top, exc)
# If exception was not handled, raise it
if exc != (None, None, None):
raise_exc_info(exc)
finally:
_state.contexts = current_state
return ret
wrapped._wrapped = True
return wrapped
# 从给定的上下文开始遍历上下文链,直到异常被处理,否则返回异常
def _handle_exception(tail, exc):
while tail is not None:
try:
if tail.exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
tail = tail.old_contexts[1]
return exc
wrap(fn)
的代码注释已经很清楚,这里就不重复做解释了。(注释中作者 TODO 寻找更好的方式保存上下文快照,我没有想到更好的方式,觉得这样使用自由变量保存就挺好,回头看看在新版本的实现中是否对这个进行了改进。)
上述代码中我们可以看到内部函数 wrapped(*args, **kwargs)
中是调用函数 _remove_deactivated(contexts)
来清理上下文快照的。下面我们来看看该函数的实现代码:
1 | def _remove_deactivated(contexts): |
_remove_deactivated
函数会在每次执行回调时调用(详见内部函数 wrapped ),对于没有非活动的上下文链,实际上进行了毫无意义的重复遍历处理上下文链,这个逻辑应该可以被优化。比如在上下文的 _deactivate() 方法中调用 _remove_deactivated 而不是在每次执行回调时,这样就可以按需移除非活动的上下文。但是考虑到实际使用时上下文链不长的情况,这个重复遍历处理也尚可接受。
结束语
针对异步调用环境变化的问题,Tornado 提供了一种将恢复函数栈转换为一种控制上下文转移的机制上来,并且对异常处理也采用了同样的抽象方式进行处理——不单独将异常处理独立出来,而是将其作为上下文的一部分而存在,这样整个上下文处理就可以在一个 “栈” 中完成。虽然最初接触时有些不太好理解,但仔细阅读源码后也能理解这种处理方式。(注:在一些异步框架中,对于异常的处理是通过注册事件函数捕获特定异常类型的方式来设计的。)
最后说一句, tornado.stack_context
模块的实现真的很巧妙。