0%

ProcessPoolExecutor In Tornado

背景

编程时遇到的阻塞任务一般有两类:

  1. 等待 I/O 就绪(I/O 密集型);
  2. 耗时的计算工作(CPU 密集型)。

遇到这类任务,通常首选考虑是否可以优化操作(主要是针对第 2 种情况),将阻塞限制在可以接受的范围内,若不行则考虑使用多线程或多进程,将阻塞操作交由其他线程(例如 .NET APM,.NET 异步编程模型使用线程池来异步执行任务)或进程(fork/exec、任务队列,或者异步委托第三方服务 API)去异步处理,然后在操作结束后取回结果。对于第 1 种情况,使用操作系统支持的非阻塞 I/O 来提供异步支持是最理想的方式, 这也是 Tornado 的核心工作原理。

由于 Tornado 工作模型的原因,通过异步库(指由系统级非阻塞 I/O 来提供支持的库)来处理 I/O 密集型操作才是 Tornado 的正确工作方式,否则一个任务出现阻塞(或者执行时间过长)就会导致其他请求不能被及时处理。遇上没有异步库支持的 I/O 操作(比如磁盘 I/O 操作,Linux 不能很好地提供异步支持)以及 CPU 密集型操作,在 Tornado 中一般我们可以简单地用下面两种方式进行处理:

  1. 将阻塞操作委托给 futures 模块的 ThreadPoolExecutor/ProcessPoolExecutor 去执行;
  2. 使用 tornado + celery(RabbitMQ 或 Redis 做 Broker,totoro我个人弄的一个 tornado+celery 适配库,支持 RabbitMQ 和 Redis),将阻塞操作委托给 celery 执行。

NOTE:python2 需要单独安装 futures 模块(pip install futures),python3 自带不需要单独安装。

这篇笔记主要是记录在使用 ProcessPoolExecutor 去执行时遇到的一些问题和最终解决方法。

ThreadPoolExecutor/ProcessPoolExecutor

如何选择 ThreadPoolExecutor 和 ProcessPoolExecutor

由于 Python GIL 的原因,利用多线程(“单进程,多线程”) 去处理 CPU 密集型任务并不能有效地利用多核,提高性能。在处理 I/O 密集型任务时,由于遇到 I/O 阻塞时线程会主动释放 GIL,多线程才能明显提高性能。

基于上述原因,在 Tornado 中区分一个任务是 CPU 密集型还是 I/O 密集型很重要,前者选择 ProcessPoolExecutor,后者选择 ThreadPoolExecutor 理论上是正确的。

使用 ThreadPoolExecutor

下面代码使用 ThreadPoolExecutor 来执行一个阻塞操作 sleep()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function, with_statement

import time
import tornado.web
import tornado.ioloop

from concurrent.futures import ThreadPoolExecutor

class MockSyncOptHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(2)
io_loop = tornado.ioloop.IOLoop.current()

@tornado.gen.coroutine
def get(self):
sec = yield self.sleep()
self.write("sleep %s s" % sec)

@run_on_executor
def sleep(self):
# mock blocking operation
sec = 10
time.sleep(sec)
return sec

方法 sleep() 模拟一个阻塞 10 秒的操作,通过 @run_on_executor 装饰后便将方法将委托给 executor 执行,在执行完成后通过 io_loop 回调获取结果。如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def run_on_executor(fn):
"""Decorator to run a synchronous method asynchronously on an executor.

The decorated method may be called with a ``callback`` keyword
argument and returns a future.

This decorator should be used only on methods of objects with attributes
``executor`` and ``io_loop``.
"""
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
callback = kwargs.pop("callback", None)
future = self.executor.submit(fn, self, *args, **kwargs)
if callback:
self.io_loop.add_future(future,
lambda future: callback(future.result()))
return future
return wrapper

@run_on_executor 提供了一种兼容 coroutine 和 callback 的模式,从这个角度来讲,我个人不是很推崇。在使用 coroutine 编程的情况,直接使用 executor 更直观,约束也更少(要求对象 executorio_loop 属性)。如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function, with_statement

import time
import tornado.web

from concurrent.futures import ThreadPoolExecutor

class MockSyncOptHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(2)

@tornado.gen.coroutine
def get(self):
sec = yield executor.executor.submit(self.sleep)
self.write("sleep %s s" % sec)

def sleep(self):
# mock blocking operation
sec = 10
time.sleep(sec)
return sec

使用 ProcessPoolExecutor

因为之前都是使用 ThreadPoolExecutor ,在项目中没有使用过 ProcessPoolExecutor ,想当然的就以为前面代码将 ThreadPoolExecutor 改为 ProcessPoolExecutor 就行。结果实践时便发生了一些问题,还由于混杂着其他问题,导致定位问题花费了我不少时间。过程就略过不提,直接说问题:

  1. 发生 PicklingError 异常;
  2. ProcessPoolExecutor 启动的子进程在 Tornado 重启时成了 “孤儿进程”。

先说第一个问题,进程之间通信时,对象(数据)的传输需要序列化,在 Python 中对象序列化常用的方式是 Pickle。ProcessPoolExecutor 内部使用 multiprocessing 模块来提供多进程支持,与 ThreadPoolExecutor 相比便要求执行的函数及其参数能够 Pickable。参考 Pickle 模块的文档,在 python 中能够可 Pickable 的类型为:

  1. None, True, and False
  2. integers, long integers, floating point numbers, complex numbers
  3. normal and Unicode strings
  4. tuples, lists, sets, and dictionaries containing only picklable objects
  5. functions defined at the top level of a module
  6. built-in functions defined at the top level of a module
  7. classes that are defined at the top level of a module
  8. instances of such classes whose dict or the result of calling getstate() is picklable (see section The pickle protocol for details)

简单来说就是基本类型和模块级的函数及类,很显然不支持 bound 和 unbound method(http://bugs.python.org/issue9276)。这个问题在 python3 中已经得到解决:PEP-3154。

在 stackoverflow 有一些讨论可供参考:
http://stackoverflow.com/questions/1816958/cant-pickle-type-instancemethod-when-using-pythons-multiprocessing-pool-ma
http://stackoverflow.com/questions/7016567/picklingerror-when-using-multiprocessing

有了上面的解释便很容易理解第一个问题发生的原因:self.sleep() 是一个 bound method ,是不能被 Pickle 的,所以不能使用 ProcessPoolExecutor 去执行。若要使用 ProcessPoolExecutor ,可以将其改为模块级函数。

对于第二个问题,产生 “孤儿进程” 有几种情况:

  1. 在 debug=True 模式下,Tornado 默认会启用 autoreload 机制,该机制会在应用程序的 Python 代码发生变化时自动通过 exec 重新加载代码。此时,由于 ProcessPoolExecutor 的子进程是 fork 出来的(可简单理解为父进程的拷贝),主进程通过 exec 重新加载代码时会遇到端口被(子进程)占用而无法重启并异常退出,导致这些子进程被 init 接管成为 “孤儿进程”。

  2. 另一种情况下,通过 supervisor 来管理 Tornado 进程,没有设置 stopasgroup=true/killasgroup=true,导致通过 supervisor 来停止应用时只有主进程能收到停止信号而退出,子进程便成了 “孤儿进程”。

  3. 还有一种情况是主进程注册了停止信号处理函数,fork 的子进程会默认继承父进程的信号处理函数。若停止信号处理函数没有考虑到这个问题,导致主进程能正常退出,子进程却忽略停止信号。比如我个人项目中为了尽可能不影响请求处理过程会为 Tornado 主进程注册停止信号处理函数,该函数首先停止接受新的请求,然后在一段时间内循环检测 IOLoop 中是否还有未处理的回调(包括定时回调),没有就主动退出,否则等待该段时间后强制退出。这个检测过程本身又是依赖 IOLoop 的定时回调。这样的逻辑在主进程中没有问题,但是子进程中由于 COW 的 IOLoop 是个拷贝,本身没有运行,故导致子进程无法像主进程一样循环检测并退出。单进程运行 Tornado 时没有问题,一旦使用了子进程便出现 “孤儿进程” 的问题。

NOTE: 一些关于进程退出的知识点:

  1. 在 Linux 中守护进程(无关联控制终端,既是会话首进程又是进程组组长)被 kill 时会向该组每个进程发送 SIGHUP ,导致组中进程被中止。

  2. 普通会话首进程被 kill 时向前台进程组所有进程发送 SIGHUP 信号,导致前台进程组进程终止,但后台进程组不受影响;在终端中 ctrl+c/delete 会前台进程组所有进程发送中断信号。

  3. 一般进程(包括进程组组长)被 kill 时不影响所属进程组的其他进程,若是进程组组长进程被 kill,则子进程由 init 接管。

  4. 更多关于进程的退出的信息,请通过 Linux 进程组、会话、终端等相关知识来了解。