0%

tornado.httpserver 模块解析

引言

HTTPServer 是 HTTP 协议的 TCPServer 子类实现,HTTPServer 覆写了 handle_stream 方法用于处理 HTTP 协议。与 TCPServer 一样,可以有三种使用模式,源码注释中写的很详细,在 TCPServer 源码解析的部分也已经详细讨论过,这里就不再赘述。

来看看一个简单的 HTTPServer 使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import tornado.httpserver
import tornado.ioloop

def handle_request(request):
message = "You requested %s\n" % request.uri
request.connection.write_headers(
httputil.ResponseStartLine('HTTP/1.1', 200, 'OK'),
{"Content-Length": str(len(message))})
request.connection.write(message)
request.connection.finish()

http_server = tornado.httpserver.HTTPServer(handle_request)
http_server.listen(8888)
tornado.ioloop.IOLoop.instance().start()

示例代码实现的是一个简单的 echo 服务器,HTTPServer 接受的是一个以 HTTPServerRequest 作为参数的函数 handle_request

HTTPServerRequestconnection 字段是一个 HTTPConnection 实例对象,应用代码通过使用 HTTPConnection 的方法响应客户端(write response )。

HTTPServer 支持 HTTP keep-alive 连接和 X-Real-Ip/X-Forwarded-For HTTP Heads。

HTTPServer 实际上还接受 HTTPServerConnectionDelegate 实例作为请求处理的委托对象。如下代码把对请求的处理委托给 tornado.web.Application 对象处理:

1
2
3
4
5
6
application = web.Application([
(r"/", MainPageHandler),
])
http_server = httpserver.HTTPServer(application)
http_server.listen(8080)
ioloop.IOLoop.instance().start()

ApplicationHTTPServer 都是 HTTPServerConnectionDelegate 的子类,HTTPServer 会委托 Application 完成对请求的处理。这是大多数情况下我们使用 Tornado 的方式

HTTPServer

Tornado 中与 HTTP 处理相关的功能被定义在 tornado.httpservertornado.httputiltornado.http1connectiontornado.httpserver 主要定义的是 HTTPServertornado.httputil 定义了通用的 HTTP 处理接口、功能类和函数,tornado.http1connection 是 针对 HTTP/1.x 的实现模块。

HTTPServer 实现紧密相关的接口和实现主要有下面这些:

  1. tornado.httputil.HTTPServerConnectionDelegate 接口定义了 Web 服务器处理请求的接口,负责启动请求处理和处理完成后的清理工作。tornado.httpserver.HTTPServertornado.web.Application 是该接口的具体实现类型。通常情况下我们都是结合二者使用 Tornado, HTTPServer 将请求封装好后委托给 ApplicationApplication 按照请求 path 分发请求到目标 Handler,然后由 Handler 负责响应请求

  2. tornado.httputil.HTTPMessageDelegate 接口定义了具体的 HTTP 数据(作为服务端时的请求数据,或者作为客户端时的响应数据)处理接口,在对 HTTP 数据解析的一定阶段触发相应方法, 对解析过程进行拦截处理。从设计的角度来看,这个接口实际上就是分离了对 HTTP 协议的解析逻辑和其他逻辑(包括业务逻辑),所有与 HTTP 协议解析无关的逻辑都交由该接口处理(注:表达可能不够准确),比如提供 xheaders 支持(tornado.httpserver._ServerRequestAdapter),解压缩 Gzip 数据(tornado.http1connection._GzipMessageDelegate),分发请求给相应的 Handler(tornado.web._RequestDispatcher)。

  3. tornado.httputil.HTTPConnection 接口定义了应用程序响应 HTTP 请求的接口,包括对 HTTP Header 和 Body 的 write。tornado.http1connection.HTTP1Connection 是该接口的实现类,同时也是整个 HTTP/1.x 数据解析的核心实现,封装了对 HTTP 请求(作为服务端)和响应(作为客户端)数据的解析,并在解析的相应阶段触发 HTTPMessageDelegate 对应的方法

HTTPServerConnectionDelegate 与 HTTPServer

TCPServer 负责监听端口,接收来自客户端的连接请求,并将请求关联的 socket 封装成 IOStream 传递给抽象方法 handle_stream 进行处理。HTTPServer 覆写了 handle_stream 方法,将 IOStream 封装成 HTTP 请求相关的对象,并委托 HTTPServerConnectionDelegate 实例对象进行处理。

HTTPServerConnectionDelegate 定义了 start_requeston_close 两个方法作为 HTTP 请求的入口和出口,分别在开始处理 HTTP 请求和连接关闭时调用。接口定义和方法签名如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class HTTPServerConnectionDelegate(object):
"""Implement this interface to handle requests from `.HTTPServer`.

.. versionadded:: 4.0
"""
def start_request(self, server_conn, request_conn):
"""This method is called by the server when a new request has started.

:arg server_conn: is an opaque object representing the long-lived
(e.g. tcp-level) connection.
:arg request_conn: is a `.HTTPConnection` object for a single
request/response exchange.

This method should return a `.HTTPMessageDelegate`.
"""
raise NotImplementedError()

def on_close(self, server_conn):
"""This method is called when a connection has been closed.

:arg server_conn: is a server connection that has previously been
passed to ``start_request``.
"""
pass

start_request(self, server_conn, request_conn) 方法签名接收两个参数,server_conn 参数是一个不透明(注:我理解为在该接口定义时不确定该参数的类型,参数类型在具体实现时决定。有点类似标记接口 MarkInterface 的意思。)的持久化连接对象,request_conn 参数是一个用于一次 HTTP 请求/响应的 HTTPConnection 对象。该方法返回一个 HTTPMessageDelegate 接口实例对象,以便在后续供 HTTP1Connection 使用。

on_close(self, server_conn) 方法的参数 server_conn 应与调用 start_request 方法时相同。

HTTPServer 的代码很简单,基本上只是一些控制逻辑,具体的实现代码都委托给其他对象来处理了,下面是其完整的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class HTTPServer(TCPServer, httputil.HTTPServerConnectionDelegate):
def __init__(self, request_callback, no_keep_alive=False, io_loop=None,
xheaders=False, ssl_options=None, protocol=None,
decompress_request=False,
chunk_size=None, max_header_size=None,
idle_connection_timeout=None, body_timeout=None,
max_body_size=None, max_buffer_size=None):
self.request_callback = request_callback
self.no_keep_alive = no_keep_alive
self.xheaders = xheaders
self.protocol = protocol
self.conn_params = HTTP1ConnectionParameters(
decompress=decompress_request,
chunk_size=chunk_size,
max_header_size=max_header_size,
header_timeout=idle_connection_timeout or 3600,
max_body_size=max_body_size,
body_timeout=body_timeout)
TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,
max_buffer_size=max_buffer_size,
read_chunk_size=chunk_size)
self._connections = set()

@gen.coroutine
def close_all_connections(self):
while self._connections:
# Peek at an arbitrary element of the set
conn = next(iter(self._connections))
yield conn.close()

def handle_stream(self, stream, address):
# 请求的上下文,以属性字段的形式封装请求关联的数据
context = _HTTPRequestContext(stream, address,
self.protocol)
# 支持 HTTP/1.x 的服务端持久化的连接对象, 作为 HTTPServerConnectionDelegate.start_request
# 方法的 server_conn 实参。
conn = HTTP1ServerConnection(
stream, self.conn_params, context)
self._connections.add(conn)
conn.start_serving(self)

def start_request(self, server_conn, request_conn):
# request_callback 可能是以 `HTTPServerRequest` 对象作为参数的函数,也
# 可能是一个 `HTTPServerConnectionDelegate` 实例。`_ServerRequestAdapter` 需
# 要正确处理这两种类型的对象,以适配 `HTTPServerConnectionDelegate`。
return _ServerRequestAdapter(self, request_conn)

def on_close(self, server_conn):
self._connections.remove(server_conn)

如上面 handle_stream 方法的所示,具体的 HTTP 请求处理交由 _HTTPRequestContextHTTP1ServerConnection 等来完成。但是 HTTPServer 实现了 HTTPServerConnectionDelegate 接口,同时也接受 request_callback 作为构造函数参数,所以其自身可以作为 HTTPServerConnectionDelegate 传递给 HTTP1ServerConnection 来处理请求。而委托给具体 request_callback 的逻辑则由 _ServerRequestAdapter 来完成。

HTTPMessageDelegate 与 _ServerRequestAdapter/_GzipMessageDelegate/_RequestDispatcher

HTTPMessageDelegate 通过定义请求处理流程各个阶段可调用的方法而分离了 HTTP 协议的解析逻辑和其他逻辑,这些方法包括:headers_receiveddata_receivedfinishon_connection_close。从方法名称大体就能推测出调用阶段,具体的定义,这里就不介绍了,直接参考源码即可。

_ServerRequestAdapter

由于 HTTPServer.request_callback 可能是以 HTTPServerRequest 对象作为参数的函数,也可能是一个 HTTPServerConnectionDelegate 实例。所以 _ServerRequestAdapter 需要正确处理这两种类型的对象以适配 HTTPMessageDelegate

  1. HTTPServer.request_callbackHTTPServerConnectionDelegate 实例则将请求处理委托给该实例(调用 start_request 方法)返回的 HTTPMessageDelegate 对象;

  2. HTTPServer.request_callback 是以HTTPServerRequest 对象为参数的函数,则 _ServerRequestAdapter 要负责处理请求数据,最终将请求数据封装成 HTTPServerRequest 对象交由 HTTPServer.request_callback 处理。

顺便说一下, _ServerRequestAdapter 也负责在每次请求时提供对 xheaders 的支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class _ServerRequestAdapter(httputil.HTTPMessageDelegate):
"""Adapts the `HTTPMessageDelegate` interface to the interface expected
by our clients.
"""
def __init__(self, server, connection):
self.server = server
self.connection = connection
self.request = None
# 如果 server.request_callback 是一个 HTTPServerConnectionDelegate 实例,
# 比如 tornado.web.Application 实例,那就委托给它处理请求,否则就自己处理。
if isinstance(server.request_callback,
httputil.HTTPServerConnectionDelegate):
self.delegate = server.request_callback.start_request(connection)
self._chunks = None
else:
self.delegate = None
self._chunks = []

def headers_received(self, start_line, headers):
# 对这次请求应用 xheaders 支持
if self.server.xheaders:
self.connection.context._apply_xheaders(headers)
if self.delegate is None:
# 初始化一个 HTTPServerRequest 实例,HTTP 数据解析完成后将为该对象的 body 字段赋值。
self.request = httputil.HTTPServerRequest(
connection=self.connection, start_line=start_line,
headers=headers)
else:
return self.delegate.headers_received(start_line, headers)

def data_received(self, chunk):
if self.delegate is None:
self._chunks.append(chunk)
else:
return self.delegate.data_received(chunk)

def finish(self):
if self.delegate is None:
# 为 body 字段赋值,并解析成 key/value 的形式,对上传的文件也采用这种方式(value 的类型
# 为 HTTPFile,由于有 max_body_size 和 max_buffer_szie 的限制,这里倒是不用担心文件
# 过大的问题)。
self.request.body = b''.join(self._chunks)
self.request._parse_body()
self.server.request_callback(self.request)
else:
self.delegate.finish()
self._cleanup()

def on_connection_close(self):
if self.delegate is None:
self._chunks = None
else:
self.delegate.on_connection_close()
self._cleanup()

def _cleanup(self):
# 请求处理正常结束或者连接关闭时进行一些清理工作,如有必要取消对 xheaders 的支持。
if self.server.xheaders:
self.connection.context._unapply_xheaders()
_GzipMessageDelegate

_GzipMessageDelegate 是一个典型的 Decorator Pattern 应用,专门用于支持 Content-Encoding: gzip。它装饰一个 HTTPMessageDelegate 实例对象,为其提供 gzip 的解压缩支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class _GzipMessageDelegate(httputil.HTTPMessageDelegate):
"""Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``.
"""
def __init__(self, delegate, chunk_size):
self._delegate = delegate
self._chunk_size = chunk_size
self._decompressor = None

def headers_received(self, start_line, headers):
# 若是 gzip 数据,则提供 GzipDecompressor 实例用于 body 的解压缩。
# 并删除 Content-Encoding 报文头,用 X-Consumed-Content-Encoding 来代替。
if headers.get("Content-Encoding") == "gzip":
self._decompressor = GzipDecompressor()
# Downstream delegates will only see uncompressed data,
# so rename the content-encoding header.
# (but note that curl_httpclient doesn't do this).
headers.add("X-Consumed-Content-Encoding",
headers["Content-Encoding"])
del headers["Content-Encoding"]
return self._delegate.headers_received(start_line, headers)

@gen.coroutine
def data_received(self, chunk):
if self._decompressor:
compressed_data = chunk
while compressed_data:
decompressed = self._decompressor.decompress(
compressed_data, self._chunk_size)
if decompressed:
yield gen.maybe_future(
self._delegate.data_received(decompressed))
compressed_data = self._decompressor.unconsumed_tail
else:
yield gen.maybe_future(self._delegate.data_received(chunk))

def finish(self):
if self._decompressor is not None:
tail = self._decompressor.flush()
if tail:
# I believe the tail will always be empty (i.e.
# decompress will return all it can). The purpose
# of the flush call is to detect errors such
# as truncated input. But in case it ever returns
# anything, treat it as an extra chunk
self._delegate.data_received(tail)
return self._delegate.finish()

def on_connection_close(self):
return self._delegate.on_connection_close()

_GzipMessageDelegate 代码不复杂,解压缩的工作是由 tonrado.util.GzipDecompressor 委托 zlib.decompressobj 完成。

_RequestDispatcher

_RequestDispatcher 定义在 tornado.web 模块中,为 Applioation 适配 HTTPServerConnectionDelegate 接口,以完成根据请求的 path 分发请求到 handler。其实现并不复杂,后续会在分析 Application 时介绍详细的实现。

HTTPConnection 与 HTTP1Connection/HTTP1ServerConnection[*]

HTTPConnection 是 HTTP 响应的接口,应用通过该接口 write headers/body 信息。HTTP1Connection 是 HTTP/1.x 协议的 HTTPConnection 实现,提供了 HTTP 请求的解析和响应功能,可单独被客户端(比如 tornado.simple_httpclient.SimpleAsyncHTTPClient)使用,也可通过 HTTP1ServerConnection 被服务器(比如 HTTPServer)使用。

由于 HTTP1Connection 是对 HTTP/1.x 协议的实现,其代码涉及大量的协议数据格式解析,对深入理解 HTTP 非常有价值,这里主要是分析 Tornado 处理 HTTP 请求的流程,对协议的关注将放到相关的文章中详细介绍。

HTTP1ServerConnection 支持持久连接,在同一个连接上处理多个 HTTP 请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class HTTP1ServerConnection(object):
"""An HTTP/1.x server."""
def __init__(self, stream, params=None, context=None):
"""
:arg stream: an `.IOStream`
:arg params: a `.HTTP1ConnectionParameters` or None
:arg context: an opaque application-defined object that is accessible
as ``connection.context``
"""
self.stream = stream
if params is None:
params = HTTP1ConnectionParameters()
self.params = params
self.context = context
self._serving_future = None

@gen.coroutine
def close(self):
"""Closes the connection.

Returns a `.Future` that resolves after the serving loop has exited.
"""
self.stream.close()
# Block until the serving loop is done, but ignore any exceptions
# (start_serving is already responsible for logging them).
try:
yield self._serving_future
except Exception:
pass

def start_serving(self, delegate):
"""Starts serving requests on this connection.

:arg delegate: a `.HTTPServerConnectionDelegate`
"""
assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
self._serving_future = self._server_request_loop(delegate)
# Register the future on the IOLoop so its errors get logged.
self.stream.io_loop.add_future(self._serving_future,
lambda f: f.result())

@gen.coroutine
def _server_request_loop(self, delegate):
try:
while True:
conn = HTTP1Connection(self.stream, False,
self.params, self.context)
request_delegate = delegate.start_request(self, conn)
try:
# 解析 HTTP 数据,解析的结果交由 request_delegate 处理。通常 request_delegate
# 会将解析结果保存到 HTTPServerRequest 中,参见 _ServerRequestAdapter 实现。
ret = yield conn.read_response(request_delegate)
except (iostream.StreamClosedError,
iostream.UnsatisfiableReadError):
# 这两种异常由底层 IOStream 引发,发生时 IOStream 会自动关闭(包括关联的 fd) 并 logged。
# 若是其他异常,则需要调用 conn.close() 以关闭 IOStream。
return
except _QuietException:
# This exception was already logged.
#
# HTTP1Connection 中发生异常时由 _ExceptionLoggingContext 捕获并 logged(详细异常),
# 然后 _ExceptionLoggingContext 抛出 _QuietException 异常。
conn.close()
return
except Exception:
gen_log.error("Uncaught exception", exc_info=True)
conn.close()
return
# read_response 方法返回 True 表示此次请求正常处理完成,继续处理下一个请求:
# 1. keep-alive 时,等待下一次 read_response 完成;
# 2. 非 keep-alive 时,会抛出 StreamClosedError 终止循环;
# 返回 False 则表示处理请求时发生了异常无法再继续处理下一个请求,立即终止循环是最优的选择而不是通过下
# 一次抛出异常来终止。
if not ret:
return
yield gen.moment
finally:
# HTTPServer.on_close 方法仅将当前 HTTP1ServerConnection 从其内部连接列表中移除。
delegate.on_close(self)