0%

tornado.stack_context 模块解析

引言

注:正文中引用的 Tornado 代码除特别说明外,都默认引用自 Tornado 4.0.1。

通过前面 IOLoop、 tornado.gen 模块的分析,我们基本了解了 Tornado 这个异步框架的核心实现,IOLoop 模块负责驱动异步执行, tornado.gen 模块提供 coroutine 的实现,负责支持使用同步方式编写异步代码。到此为止,一切看起来都还不错。接下来我们来看一看 Tornado 中如何处理异步调用上下文状态的。

一起来思考一个在(所有)异步框架中都会遇到的问题,一个 “异步调用” 可以简单理解为:传递一个 “回调函数” 后便立即返回的调用,框架会在异步动作完成后执行 “回调函数”。很显然,这样就导致了一个问题,由于 “回调函数” 实际执行的环境已经脱离了 “异步调用” 时的环境,这便要求 “回调函数” 不能依赖 ”调用时“ 环境。如果真有这个限制的话,那么这个框架使用起来就不是那么顺手了,试想下面一些场景:

  1. (在一个线程中)处理多个异步操作时,可能需要一些共享的资源,通常我们可以把这些资源保存到 “线程局部变量” 或者 “全局变量” 中以达到共享的目的。但当不是所有的异步操作都需要这些资源时,将资源暴露到不需要的操作中,很可能引发不可预知的问题。

  2. “回调函数” 执行时的环境已经不是 “调用时” 环境,如果 “回调函数” 抛出一些异常,那么很显然不能被 “调用时” 的上下文捕捉到。这与同步代码比较起来显得不够直观,我们希望减少这种差异。

针对这个问题,Tornado 提供了 tornado.stack_context 模块来解决。按照我个人的理解,简单来说就是通过该模块, Tornado 提供了一个叫 StackContext 的机制, StackContext 是一个栈式上下文结构,它能够像 threadlocal 一样为当前操作保存一个栈式上下文快照,当异步执行结束回调时便可以借助这个机制恢复调用时的环境。

通过源代码中注释,我们来看看 Facebook 的工程师们给出的介绍(注:我知道我的翻译就是一坨,看不懂这坨的可以直接看源代码的英文原文注释。):

StackContext 允许应用程序在切换到其他上下文执行时也能保持一个像 threadlocal 一样的状态。一些令人振奋的的例子是使用 StackContext 可以避免显式地使用异步调用的封装器,以及为当前调用增加一些额外的上下文用于输出日志。

这个有些不好理解,异常处理器可以视为这么一种想法(idea)的延伸,它就像一种本地栈的状态,栈在暂停和在新的上下文中恢复时需要被保持(注:把异常处理器也抽象处理成一种特化的上下文,能够被转移。)。 StackContext ++把恢复调用栈的工作转到一种控制一个上下文转移的机制上++。

范例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@contextlib.contextmanager
def die_on_error():
try:
yield
except Exception:
logging.error("exception in asynchronous operation",exc_info=True)
sys.exit(1)

with StackContext(die_on_error):
# Any exception thrown here *or in callback and its desendents*
# will cause the process to exit instead of spinning endlessly
# in the ioloop.
http_client.fetch(url, callback)
ioloop.start()

大多数应用程序都不需要和 StackContext 直接打交道。什么时候需要用到,这里有些经验法则可供参考:

  • 如果你在写一个不依赖于 tornado.ioloop 或者 tornado.iostream 这类函数库(这类库提供 stack_context 的默认支持)的异步库(比如一个线程池),那么在任何异步操作之前你需要使用 stack_context.wrap() 函数来取得操作开始时的栈式上下文快照。

  • 如果正在写一个需要使用到一些共享资源(比如连接池)的异步库,那么你需要在 with stack_context.NullContext(): 块中创建那些共享资源。这样可以防止 StackContexts 从一个请求泄漏到另一个请求。

  • 如果你想写一些在可以保持到异步回调时异常处理器,那么创建一个 StackContext 或者 ExceptionStackContext ,并把异步调用放在它们的 with 块中。

在学习分析 tornado.stack_context 模块代码之前,我先大概就我个人的理解对该模块实现做一个简单的概述:tornado.stack_context 模块将恢复函数调用栈的工作处理为一种上下文状态的转移,通过上下文状态的转移重建以达到恢复执行环境的目的。虽然与完全恢复函数栈不同,但已经够应对大多数的需求。该模块提供了几个重要的实现:

  • StackContextExceptionStackContext 实现了上下文状态的转移,可以用来创建对异步有效的上下文状态,ExceptionStackContext 是特化的类型,顾名思义,专门用于处理异常;

  • NullContext 也可视为特化的类型,专门用于清空上下文,适用于处理那些不希望上下文互相污染的情况;

  • wrap(fn) 函数用来给 “回调函数 fn ” 取一个上下文快照,这样 “回调函数 fn ” 在多线程或异步环境执行时便能够恢复自己调用时的上下文(注:比如 tornado.ioloop 中添加异步回调前会默认调用 wrap(fn) 函数,这就是源代码注释所说的 stack_context-aware)。

实现原理

tornado.stack_context 模块中上下文是以 threadlocal 变量存储的,异步调用前通过 wrap(fn) 函数将当前上下文(链)复制到 “自由变量” 中形成一个闭包返回,异步调用时执行闭包来恢复上下文,如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class _State(threading.local):
def __init__(self):
# self.contexts[0],tuple 中包含的是普通的 StackContext 上下文,用于异步调用时恢复上下文状态,
# 具有完整的 StackContext protocol 实现,即 enter, exit ;
# self.contexts[1]为 Head Context ,用于处理异步调用抛出的异常。
self.contexts = (tuple(), None)

# 当前线程的上下文状态
_state = _State()

def wrap(fn):
"""wrap(fn) 函数是上下文调度的“核心”,它通过闭包将当前上下文保存在自由变量
cap_contexts 中,并返回一个可调用的(函数)对象(wrapped 或者 null_wrapper)作为
回调函数 fn 的 wrapper,在之后被调用时(在其他线程或者在相同线程异步调用)会从
cap_contexts 中恢复保存的上下文,然后执行 fn 函数。
"""
# Check if function is already wrapped
if fn is None or hasattr(fn, '_wrapped'):
return fn

# Capture current stack head
# TODO: Any other better way to store contexts and update them in wrapped function?
cap_contexts = [_state.contexts]

if not cap_contexts[0][0] and not cap_contexts[0][1]:
# Fast path when there are no active contexts.
def null_wrapper(*args, **kwargs):
try:
current_state = _state.contexts
_state.contexts = cap_contexts[0]
return fn(*args, **kwargs)
finally:
_state.contexts = current_state
null_wrapper._wrapped = True
return null_wrapper

def wrapped(*args, **kwargs):
"""从 cap_contexts 中恢复上下文,并负责处理 fn 抛出的异常这里略掉具体代码,详情常见后面内容
"""
pass

wrapped._wrapped = True
return wrapped

StackContext 的实现

StackContext 是一个 context 的 wrapper,它接受一个 context manager(一个可调用对象,调用该对象可以返回一个 context) 作为参数。通过 with StackContext(my_context): 将 StackContext 对象加入到当前线程的上下文中(_state.contexts)。

(注:StackContext 封装 context manager 才能在未来执行回调函数时重新建立新的上下文环境(new context object)。使用 “同一个上下文环境” 和使用 “相同的上下文环境” 是有区别的,StackContext 是为了重建 “相同的上下文环境” 而不是使用 “同一个上下文环境”。)

with StackContext() as cb: 返回的是一个 deactivation 回调,执行这个回调后会将该 StackContext 设置为非活动的(active=False)。非活动的 StackContext 不会被传递,也就是说该 StackContext 封装的上下文不会在后续执行 “回调函数” 时作为 回调函数” 的上下文环境而重建(注:函数 _remove_deactivated 会忽略非活动的 StackContext)。但是这个高级特性在大多数的应用中都不需要。

StackContext 的实现源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class StackContext(object):
"""Establishes the given context as a StackContext that will be transferred.

Note that the parameter is a callable that returns a context
manager, not the context itself. That is, where for a
non-transferable context manager you would say::

with my_context():

StackContext takes the function itself rather than its result::

with StackContext(my_context):

The result of ``with StackContext() as cb:`` is a deactivation
callback. Run this callback when the StackContext is no longer
needed to ensure that it is not propagated any further (note that
deactivating a context does not affect any instances of that
context that are currently pending). This is an advanced feature
and not necessary in most applications.
"""
def __init__(self, context_factory):
self.context_factory = context_factory
self.contexts = []
self.active = True

def _deactivate(self):
self.active = False

# StackContext protocol
def enter(self):
context = self.context_factory()
self.contexts.append(context)
context.__enter__()

def exit(self, type, value, traceback):
context = self.contexts.pop()
context.__exit__(type, value, traceback)

# Note that some of this code is duplicated in ExceptionStackContext
# below. ExceptionStackContext is more common and doesn't need
# the full generality of this class.
#
# ExceptionStackContext 是特化的 StackContext,后面会看到 ExceptionStackContext
# 是没有 `enter()` 方法的。
def __enter__(self):
self.old_contexts = _state.contexts
# 将当前 StackContext “加入”上下文栈进行传播,并将其设置为 Head StackContext 。
self.new_contexts = (self.old_contexts[0] + (self,), self)
_state.contexts = self.new_contexts

try:
self.enter()
except:
_state.contexts = self.old_contexts
raise

return self._deactivate

def __exit__(self, type, value, traceback):
try:
self.exit(type, value, traceback)
finally:
final_contexts = _state.contexts
_state.contexts = self.old_contexts

# Generator coroutines and with-statements with non-local
# effects interact badly. Check here for signs of
# the stack getting out of sync.
# Note that this check comes after restoring _state.context
# so that if it fails things are left in a (relatively)
# consistent state.
#
# 回调函数上下文环境的正确重建和销毁依赖于 StackContext 的 enter 与 exit,
# 在一个 ``with StackContext`` 语句块中使用 yield 语句交回代码控制权
# 是不安全的,有可能导致上下文不能按照正确顺序进入和退出。如果必须在这种情况下使
# 用 yield 语句,请使用 run_with_stack_context(context, func) 函数。
if final_contexts is not self.new_contexts:
raise StackContextInconsistentError(
'stack_context inconsistency (may be caused by yield '
'within a "with StackContext" block)')

# Break up a reference to itself to allow for faster GC on CPython.
self.new_contexts = None
ExceptionStackContext 的实现

ExceptionStackContext 是特化的 StackContext,在语义上类似 try/finally 语句块。其设计初衷是为处理上下文中未被处理的异常,关闭套接字以及完成一些清理工作。与普通的 StackContext 相比 ExceptionStackContext 没有 enter() 方法,只有 exit() 方法,所以它不加入上下文 tuple 中进行传播重建,而仅仅提供 exit() 对上下文中未处理的异常进行后处理。

ExceptionStackContext 由一个可调用的对象 exception_handler 初始化,exception_handler 的调用参数为 exc_info 元组 (type, value, traceback) 。其返回值为 boolean 值,True 表示异常被处理, False 表示异常未被处理需要 propagated 给其他 exception handlers(指上下文链中的其他 StackContext/ExceptionStackContext)。

ExceptionStackContext 的实现源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class ExceptionStackContext(object):
"""Specialization of StackContext for exception handling.

The supplied ``exception_handler`` function will be called in the
event of an uncaught exception in this context. The semantics are
similar to a try/finally clause, and intended use cases are to log
an error, close a socket, or similar cleanup actions. The
``exc_info`` triple ``(type, value, traceback)`` will be passed to the
exception_handler function.

If the exception handler returns true, the exception will be
consumed and will not be propagated to other exception handlers.
"""
def __init__(self, exception_handler):
self.exception_handler = exception_handler
self.active = True

def _deactivate(self):
self.active = False

def exit(self, type, value, traceback):
if type is not None:
return self.exception_handler(type, value, traceback)

def __enter__(self):
self.old_contexts = _state.contexts
# 这里需要注意一下, ExceptionStackContext 自身不会作为上下文的一部分(tuple)
# 进行传播重建,仅作为 Head StackContext ,在异步回调时负责处理
# StackContexts 未处理的异常。
self.new_contexts = (self.old_contexts[0], self)
_state.contexts = self.new_contexts

return self._deactivate

def __exit__(self, type, value, traceback):
try:
if type is not None:
return self.exception_handler(type, value, traceback)
finally:
final_contexts = _state.contexts
_state.contexts = self.old_contexts

if final_contexts is not self.new_contexts:
raise StackContextInconsistentError(
'stack_context inconsistency (may be caused by yield '
'within a "with StackContext" block)')

# Break up a reference to itself to allow for faster GC on CPython.
self.new_contexts = None
NullContext 的实现

NullContext 的实现很简单就是清空 _state.contexts ,其源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
class NullContext(object):
"""Resets the `StackContext`.

Useful when creating a shared resource on demand (e.g. an
`.AsyncHTTPClient`) where the stack that caused the creating is
not relevant to future operations.
"""
def __enter__(self):
self.old_contexts = _state.contexts
_state.contexts = (tuple(), None)

def __exit__(self, type, value, traceback):
_state.contexts = self.old_contexts

wrap(fn) 函数

wrap(fn) 函数是上下文调度的“核心”,异步调用前用它获取当前上下文快照,再将快照保存在自由变量 cap_contexts 中,并返回一个闭包(wrapped 或者 null_wrapper)作为回调函数 fn 的 wrapper ,在之后回调时(在其他线程或者在相同线程异步调用)再从 cap_contexts 中恢复保存的上下文状态,然后执行 fn 函数。完整实现代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
def wrap(fn):
"""Returns a callable object that will restore the current `StackContext`
when executed.

Use this whenever saving a callback to be executed later in a
different execution context (either in a different thread or
asynchronously in the same thread).
"""
# Check if function is already wrapped
if fn is None or hasattr(fn, '_wrapped'):
return fn

# Capture current stack head
# TODO: Any other better way to store contexts and update them in wrapped function?
cap_contexts = [_state.contexts]

if not cap_contexts[0][0] and not cap_contexts[0][1]:
# Fast path when there are no active contexts.
def null_wrapper(*args, **kwargs):
try:
current_state = _state.contexts
_state.contexts = cap_contexts[0]
return fn(*args, **kwargs)
finally:
_state.contexts = current_state
null_wrapper._wrapped = True
return null_wrapper

def wrapped(*args, **kwargs):
ret = None
try:
# Capture old state
current_state = _state.contexts

# Remove deactivated items
cap_contexts[0] = contexts = _remove_deactivated(cap_contexts[0])

# Force new state
_state.contexts = contexts

# Current exception
exc = (None, None, None)
top = None

# Apply stack contexts
last_ctx = 0
stack = contexts[0]

# Apply state
for n in stack:
try:
n.enter()
last_ctx += 1
except:
# Exception happened. Record exception info and store top-most handler
exc = sys.exc_info()
top = n.old_contexts[1]

# Execute callback if no exception happened while restoring state
if top is None:
try:
ret = fn(*args, **kwargs)
except:
exc = sys.exc_info()
top = contexts[1]

# If there was exception, try to handle it by going through the exception chain
if top is not None:
exc = _handle_exception(top, exc)
else:
# Otherwise take shorter path and run stack contexts in reverse order
while last_ctx > 0:
last_ctx -= 1
c = stack[last_ctx]

try:
c.exit(*exc)
except:
exc = sys.exc_info()
top = c.old_contexts[1]
break
else:
top = None

# If if exception happened while unrolling, take longer exception handler path
if top is not None:
exc = _handle_exception(top, exc)

# If exception was not handled, raise it
if exc != (None, None, None):
raise_exc_info(exc)
finally:
_state.contexts = current_state
return ret

wrapped._wrapped = True
return wrapped

# 从给定的上下文开始遍历上下文链,直到异常被处理,否则返回异常
def _handle_exception(tail, exc):
while tail is not None:
try:
if tail.exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()

tail = tail.old_contexts[1]

return exc

wrap(fn) 的代码注释已经很清楚,这里就不重复做解释了。(注释中作者 TODO 寻找更好的方式保存上下文快照,我没有想到更好的方式,觉得这样使用自由变量保存就挺好,回头看看在新版本的实现中是否对这个进行了改进。)

上述代码中我们可以看到内部函数 wrapped(*args, **kwargs) 中是调用函数 _remove_deactivated(contexts) 来清理上下文快照的。下面我们来看看该函数的实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def _remove_deactivated(contexts):
"""Remove deactivated handlers from the chain"""

# 从上下文栈(tuple)中移除掉非活动(不需要传播)的 StackContext 上下文。
stack_contexts = tuple([h for h in contexts[0] if h.active])

# 沿着上下文链找到活动的 Head Context,作为 new Head。
head = contexts[1]
while head is not None and not head.active:
head = head.old_contexts[1]

# 处理上下文链,对于不需要传播而被设置为非活动的 StackContext 上下文节点,在 With
# 调用结束后并没有从上下文链中移除,这段代码负责清理上下文链。
ctx = head
while ctx is not None:
parent = ctx.old_contexts[1]

while parent is not None:
if parent.active:
break
# 移除非活动的 parent 上下文节点
ctx.old_contexts = parent.old_contexts
parent = parent.old_contexts[1]

ctx = parent

return (stack_contexts, head)

_remove_deactivated 函数会在每次执行回调时调用(详见内部函数 wrapped ),对于没有非活动的上下文链,实际上进行了毫无意义的重复遍历处理上下文链,这个逻辑应该可以被优化。比如在上下文的 _deactivate() 方法中调用 _remove_deactivated 而不是在每次执行回调时,这样就可以按需移除非活动的上下文。但是考虑到实际使用时上下文链不长的情况,这个重复遍历处理也尚可接受。

结束语

针对异步调用环境变化的问题,Tornado 提供了一种将恢复函数栈转换为一种控制上下文转移的机制上来,并且对异常处理也采用了同样的抽象方式进行处理——不单独将异常处理独立出来,而是将其作为上下文的一部分而存在,这样整个上下文处理就可以在一个 “栈” 中完成。虽然最初接触时有些不太好理解,但仔细阅读源码后也能理解这种处理方式。(注:在一些异步框架中,对于异常的处理是通过注册事件函数捕获特定异常类型的方式来设计的。

最后说一句, tornado.stack_context 模块的实现真的很巧妙。