0%

tornado.tcpserver 模块解析

引言

有了 tornado.iolooptornado.iostream 这两个模块的帮助,在 tornado 中要实现一个异步 Web 服务器就变得容易了。

tornado.httpserver 模块是 tornado 的 Web 服务器模块, 该模块中实现了 HTTPServer —— 一个单线程 HTTP 服务器,其实现是基于 tornado.tcpserver 模块的 TCPServerTCPServer 是一个非阻塞单线程的 TCP 服务器,它负责处理 TCP 协议部分的内容,并预留接口(handle_stream 抽象方法)以便针对相应的应用层协议编写服务器。所以在分析 tornado HTTP 服务器实现之前,我们先来看看 tornado.tcpserver.TCPServer 的实现。

TCPServer

tornado.tcpserver 模块中只定义了一个 TCPServer 类,由于其实现不涉及到具体的应用层协议(例如Http协议),加上有 IOLoopIOStream 的支持,其实现比较简单。

TCPServer 是一个非阻塞的单线程 TCP Server,它提供了一个抽象接口方法 handle_stream 供具体的子类去实现,同时支持多进程的运行方式。按照源码注释,通过调用不同的方法我们有 3 中方式使用一个 TCP Server。

三种使用模式

使用 listen 的单进程模式

通过 TCPServerlisten 方法以单进程的方式运行服务器实例。示例代码如下:

1
2
3
server = TCPServer()
server.listen(8888)
IOLoop.instance().start()

TCPServer 提供的 listen 方法可以立即启动在指定的端口进行监听,并将相应的 socket 加入到 IOLoop 中。该方法可以多次调用,同时监听多个端口。由于需要 IOLoop 来驱动,所以必须确保相应的 IOLoop 实例已经启动(上述示例代码实例化 server 用的是默认 IOLoop 实例,所以通过 IOLoop.instance().start() 启动)。

使用 bind/start 的多进程模式

通过 TCPServerbind/start 方法可以以多进程的方式运行服务器实例。示例代码如下所示:

1
2
3
4
server = TCPServer()
server.bind(8888)
server.start(0) # Forks multiple sub-processes
IOLoop.instance().start()

bind 方法可以将服务器绑定到指定的地址,并通过 start 方法启动多个子进程,以达到多进程运行的模式。start 方法通过参数 num_processes 指定以单进程或者多进程方式运行服务器,num_processes 参数默认为 1,以单进程方式运行;当为 None 或者 <= 0 时,将尝试使用与 cpu 核数相当的子进程运行(如上述示例代码所示);当 > 1,将以该值指定的子进程数运行。

不过,如果是以单进程方式运行服务器的话,一般使用 listen 的方式。

更高级的多进程模式

TCPServerbind/start 方法内部实际封装的就是绑定监听端口和启动子进程的功能,我们可以不使用这两个方法,而是执行调用绑定函数和 fork 进程来达到多进程运行服务器实例的目的。示例代码如下所示:

1
2
3
4
5
sockets = bind_sockets(8888)
tornado.process.fork_processes(0)
server = TCPServer()
server.add_sockets(sockets)
IOLoop.instance().start()

bind_sockets 函数定义在 tornado.netutil 模块中。fork_processes 函数定义在 tornado.process 模块中。

通过调用 bind_sockets 函数可以创建一个或者多个(一个 hostname 可能绑定到多个 IP 地址)监听指定端口的 socket。调用 fork_processes 方法可以 fork 出多个子进程,其中主进程调用负责监听子进程的状态而不会返回,子进程接着执行后续代码

实际上 TCPServerbind/start 方法内部也是调用的 bind_socketsfork_processes 函数。

bind_sockets 函数

bind_sockets 函数的代码不多,比较简单,作者的注释也非常的详细。函数同时支持 ipv6 和 ipv4 ,能够监听的连接数队列默认最大是 128(_DEFAULT_BACKLOG=128)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
def bind_sockets(port, address=None, family=socket.AF_UNSPEC,
backlog=_DEFAULT_BACKLOG, flags=None):
"""Creates listening sockets bound to the given port and address.

Returns a list of socket objects (multiple sockets are returned if
the given address maps to multiple IP addresses, which is most common
for mixed IPv4 and IPv6 use).

Address may be either an IP address or hostname. If it's a hostname,
the server will listen on all IP addresses associated with the
name. Address may be an empty string or None to listen on all
available interfaces. Family may be set to either `socket.AF_INET`
or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise
both will be used if available.

The ``backlog`` argument has the same meaning as for
`socket.listen() <socket.socket.listen>`.

``flags`` is a bitmask of AI_* flags to `~socket.getaddrinfo`, like
``socket.AI_PASSIVE | socket.AI_NUMERICHOST``.
"""
sockets = []
if address == "":
address = None
if not socket.has_ipv6 and family == socket.AF_UNSPEC:
# Python can be compiled with --disable-ipv6, which causes
# operations on AF_INET6 sockets to fail, but does not
# automatically exclude those results from getaddrinfo
# results.
# http://bugs.python.org/issue16208
#
# 由于可以通过指定编译选项来编译出仅支持 ipv4 的 python 版本,为了保证
# getaddrinfo 也只返回 ipv4 的地址,所以这里指定 socket.AF_INET
family = socket.AF_INET

# 函数注释中已经有说明了,flag 是一个传递给 getaddrinfo 函数的 AI_* 掩码。常用的如
# ``socket.AI_PASSIVE | socket.AI_CANNONAME | socket.AI_NUMERICHOST``。
# ``socket.AI_PASSIVE``: 指示函数返回的地址将用于 bind() 函数调用,否则用于 connect() 函数调用;
# ``socket.AI_CANNONAME``: 指示函数返回的地址需要一个规范名(而不是别名)。
# ``socket.AI_NUMERICHOST``: 指示函数返回数字格式的地址。
if flags is None:
flags = socket.AI_PASSIVE
bound_port = None

# socket.getaddrinfo(host, port[, family[, socktype[, proto[, flags]]]])
# family: 协议簇,常用的协议包括 AF_UNIX(1,本机通信)/AF_INET(2,IPV4)/AF_INET6(10,IPV6)。
# socktype:socket 的类型,常见的socket类型包括SOCK_STREAM(TCP流)/SOCK_DGRAM(UDP数据报)/SOCK_RAW(原始套接字)。
# 其中,SOCK_STREAM=1,SOCK_DGRAM=2,SOCK_RAW=3。
# proto:协议,套接口所用的协议。如调用者不想指定,可用0。常用的协议有,IPPROTO_TCP(=6) 和
# IPPTOTO_UDP(=17),它们分别对应 TCP 传输协议、UDP 传输协议。与 IP 数据包的 ``8位协议字段`` 对应。
for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,
0, flags)):
af, socktype, proto, canonname, sockaddr = res
if (platform.system() == 'Darwin' and address == 'localhost' and
af == socket.AF_INET6 and sockaddr[3] != 0):
# Mac OS X includes a link-local address fe80::1%lo0 in the
# getaddrinfo results for 'localhost'. However, the firewall
# doesn't understand that this is a local address and will
# prompt for access (often repeatedly, due to an apparent
# bug in its ability to remember granting access to an
# application). Skip these addresses.
#
# address = 'localhost' 时 Mac OS X 可能会返回一个 ipv6 地址 fe80::1%lo0,
# 而防火墙不能识别出这是一个本地地址而尝试访问会导致 bug ,所以这里忽略这个地址。
# ipv6 二进制 128 位,以 16 位为一组,每组以 `:` 分开,`::` 表示一组0或者多组连续的0,
# 但是只能出现 1 次。
# sockaddr is a tuple describing a socket address, whose format
# depends on the returned family (a (address, port) 2-tuple for
# AF_INET, a (address, port, flow info, scope id) 4-tuple for AF_INET6)
continue
try:
sock = socket.socket(af, socktype, proto)
except socket.error as e:
if errno_from_exception(e) == errno.EAFNOSUPPORT:
continue
raise
# 为 fd 设置 FD_CLOEXEC 标识
set_close_exec(sock.fileno())
if os.name != 'nt':
# 避免在服务器重启的时候发生“该地址以被使用”这种错误。
# socket.SOL_SOCKET 指定在套接字级别设置 `可选项`。
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if af == socket.AF_INET6:
# On linux, ipv6 sockets accept ipv4 too by default,
# but this makes it impossible to bind to both
# 0.0.0.0 in ipv4 and :: in ipv6. On other systems,
# separate sockets *must* be used to listen for both ipv4
# and ipv6. For consistency, always disable ipv4 on our
# ipv6 sockets and use a separate ipv4 socket when needed.
#
# Python 2.x on windows doesn't have IPPROTO_IPV6.
if hasattr(socket, "IPPROTO_IPV6"):
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)

# automatic port allocation with port=None
# should bind on the same port on IPv4 and IPv6
host, requested_port = sockaddr[:2]
if requested_port == 0 and bound_port is not None:
sockaddr = tuple([host, bound_port] + list(sockaddr[2:]))

sock.setblocking(0)
sock.bind(sockaddr)
bound_port = sock.getsockname()[1]
sock.listen(backlog)
sockets.append(sock)
return sockets
fork_processes 函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def fork_processes(num_processes, max_restarts=100):
"""Starts multiple worker processes.

If ``num_processes`` is None or <= 0, we detect the number of cores
available on this machine and fork that number of child
processes. If ``num_processes`` is given and > 0, we fork that
specific number of sub-processes.

Since we use processes and not threads, there is no shared memory
between any server code.

Note that multiple processes are not compatible with the autoreload
module (or the ``autoreload=True`` option to `tornado.web.Application`
which defaults to True when ``debug=True``).
When using multiple processes, no IOLoops can be created or
referenced until after the call to ``fork_processes``.

In each child process, ``fork_processes`` returns its *task id*, a
number between 0 and ``num_processes``. Processes that exit
abnormally (due to a signal or non-zero exit status) are restarted
with the same id (up to ``max_restarts`` times). In the parent
process, ``fork_processes`` returns None if all child processes
have exited normally, but will otherwise only exit by throwing an
exception.
"""
global _task_id
assert _task_id is None
# num_processes 为 None 或者 <=0,则使用 cpu 核数。
if num_processes is None or num_processes <= 0:
num_processes = cpu_count()

if ioloop.IOLoop.initialized():
raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
"has already been initialized. You cannot call "
"IOLoop.instance() before calling start_processes()")
gen_log.info("Starting %d processes", num_processes)
children = {}

def start_child(i):
pid = os.fork()
if pid == 0:
# child process
_reseed_random()
global _task_id
_task_id = i
return i
else:
children[pid] = i
return None
for i in range(num_processes):
id = start_child(i)
if id is not None:
return id

# master 进程负责监控子进程,若子进程异常结束(due to a signal or non-zero exit status),
# 则负责重启子进程。`num_restarts` 变量记录了累计重启的子进程数量,若该值大于 `max_restarts`,
# 则抛出运行时异常。
num_restarts = 0
while children:
try:
pid, status = os.wait()
except OSError as e:
if errno_from_exception(e) == errno.EINTR:
continue
raise
if pid not in children:
continue
id = children.pop(pid)
if os.WIFSIGNALED(status):
gen_log.warning("child %d (pid %d) killed by signal %d, restarting",
id, pid, os.WTERMSIG(status))
elif os.WEXITSTATUS(status) != 0:
gen_log.warning("child %d (pid %d) exited with status %d, restarting",
id, pid, os.WEXITSTATUS(status))
else:
gen_log.info("child %d (pid %d) exited normally", id, pid)
continue
num_restarts += 1
if num_restarts > max_restarts:
raise RuntimeError("Too many child restarts, giving up")
new_id = start_child(id)
if new_id is not None:
return new_id
# All child processes exited cleanly, so exit the master process
# instead of just returning to right after the call to
# fork_processes (which will probably just start up another IOLoop
# unless the caller checks the return value).
sys.exit(0)TLS

TCPServer 的实现代码比较简单,重要的设计在前述内容中基本上都提到了。至于涉及到 SSL/TLS 部分TLS的知识,就需要另外查询相关资料了,这方面我目前也不懂。但是这个不影响对 tornado 的学习。额外补充两点内容:

  1. TCPServer 的构造函数中会对非 None 的 ssl_options 参数进行检查,要求必须包含 ‘certfile’‘keyfile’ 选项并且选项指定的文件存在,但不会检查文件的内容。对文件内容的检查将推迟到客户端连接建立时。ssl_options 是一个 dictionary, 在 python 3.2+ 还可以用 ssl.SSLContext 实例来代替。

  2. SSL Wrap 相关的函数被定义在 tornado.netutil 模块中,有效的 ssl_options 选项名称被定义在模块变量 _SSL_CONTEXT_KEYWORDS 中,包括 [‘ssl_version’, ‘certfile’, ‘keyfile’, ‘cert_reqs’, ‘ca_certs’, ‘ciphers’]。

add_accept_handler 函数

TCPServer.add_sockets 方法内部使用 tornado.netutil.add_accept_handler 函数完成将监听 socket 注册到 IOLoop event handlers 中,当 accept 客户端连接时回调。add_accept_handler 函数源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def add_accept_handler(sock, callback, io_loop=None):
if io_loop is None:
io_loop = IOLoop.current()

def accept_handler(fd, events):
# More connections may come in while we're handling callbacks;
# to prevent starvation of other tasks we must limit the number
# of connections we accept at a time. Ideally we would accept
# up to the number of connections that were waiting when we
# entered this method, but this information is not available
# (and rearranging this method to call accept() as many times
# as possible before running any callbacks would have adverse
# effects on load balancing in multiprocess configurations).
# Instead, we use the (default) listen backlog as a rough
# heuristic for the number of connections we can reasonably
# accept at once.
for i in xrange(_DEFAULT_BACKLOG):
try:
connection, address = sock.accept()
except socket.error as e:
# _ERRNO_WOULDBLOCK indicate we have accepted every
# connection that is available.
if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
return
# ECONNABORTED indicates that there was a connection
# but it was closed while still in the accept queue.
# (observed on FreeBSD).
if errno_from_exception(e) == errno.ECONNABORTED:
continue
raise
callback(connection, address)
io_loop.add_handler(sock, accept_handler, IOLoop.READ)

如代码注释所描述,accept_handler 函数在一次迭代中最多处理的 _DEFAULT_BACKLOG 指定数量的连接,也就是限制每次处理的连接数,以避免饿死其他的异步任务,同时有利于多进程时的负载均衡。