从 Files/Sockets 到 Stream
tornado.iostream
模块为 Tornado 提供了一系列读写非阻塞 files/sockets 的工具类。该模块主要包括以下 4 个主要的工具类:
BaseIOStream
: 基础流读写接口,作为特定流的父类;IOStream
: 针对非阻塞 sockets 的流实现;SSLIOStream
: SSL-aware版本的IOStream
实现;PipeIOStream
: 针对管道(Pipe)的流实现;
BaseIOStream
作为基础流读写接口,实现了大部分的功能封装。后面的源代码分析中,主要就是基于该类并结合其非阻塞 socket 版本的 IOStream
来讨论。
查看该模块的时候,我们会发现两个模块内函数 _double_prefix(deque)
与 _merge_prefix(deque, size)
。这两个工具函数的实现都很简单,但是为流的读写提供通用的操作数据块(chunk)的功能:
_double_prefix(deque)
: 该函数提供了将 buffer 的第 1 个 chunk 增大至少 1 倍的功能,该功能现在用在按条件在流的 buffer 中搜索匹配字符串时逐渐扩大搜索的数据块大小。_merge_prefix(deque, size)
: 该函数提供了将 buffer 的第 1 个 chunk 调整到指定 size 大小。这在读写流时非常有用,_double_prefix(deque)
就是通过该函数来调整 chunk 大小的。在将流的 write_buffer 写入 fd 时,通过该函数适当调整第 1 个 chunk 的大小,我们就可以直接操作 buffer 的第 1 个 chunk 来达到操作整个 buffer 的目的,简化了实现的难度。详细可见BaseStream._handle_writ
函数实现代码。
IOStream
一些基础知识
在源码的开始部分,作者写了一大段介绍 recv/send 与 read/write 函数的区别,以及各平台的操作非阻塞 I/O 时返回的错误码。recv/send 与 read/write 函数的区别大体上就是说,前者是特化的函数,提供了一些额外的选项来控制 fd 的读写操作,针对具体的 fd 实例你可以设置选项忽略 SIGPIPE 信号或者让 socket 发送带外数据等等, 后者只是提供了通用的 fd 读写操作。对于操作非阻塞 fd 返回的错误码,如下模块的静态变量对应的注释所示:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38# 非阻塞操作时,缓冲区满(无法写)时或者缓冲区空(读不到数据)时返回 EAGAIN, BSD 下使用 EWOULDBLOCK, Windows下使用 WSAEWOULDBLOCK
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)
# For windows
if hasattr(errno, "WSAEWOULDBLOCK"):
_ERRNO_WOULDBLOCK += (errno.WSAEWOULDBLOCK,)
# These errnos indicate that a connection has been abruptly terminated.
# They should be caught and handled less noisily than other errors.
#
# `ECONNRESET`: 该异常一般发生在连接的一端(A)进程较另一端(B)提前终止时。A 进程终止时会向 B 发送 FIN 后进入
# FIN_WAIT1 状态,B 回应 ACK,A 收到 FIN 的 ACK 进入 FIN_WAIT2 状态。B 收到 FIN 时,会向应用程序交付 EOF,
# 进入 CLOSE_WAIT 状态。若此时 B 进程没有正常处理 FIN(例如被阻塞)而再次向处于 FIN_WAIT2 的 A 发送数据,将会
# 收到 RST,引发该错误。
#
# `ECONNABORTED`: 软件引起的连接中止,当服务端和客户端完成三次握手后,服务端正在等待服务进程调用 accept 时候却收到客户端
# 发来一个 RST 分节,引发该错误。POSIX 规定此时的 errno 值必须 ECONNABORTED。源自 Berkeley 的实现完全在内核中处理中
# 止的连接,服务进程将永远不知道该中止的发生。服务器进程一般可以忽略该错误,直接再次调用 accept。
#
# `EPIPE`: 错误被描述为 "broken pipe" ,即 "管道破裂",这种情况一般发生在客户进程不理会(或未及时处理)socket 错误,
# 而继续向 socket 写入更多数据时,内核将向客户进程发送 SIGPIPE 信号,该信号默认会使进程终止(此时该前台进程未进行 core dump)。
#
# `ETIMEDOUT`: 连接超时, 这种错误一般发生在服务器端崩溃而不响应客户端 ACK 时,客户端最终放弃尝试连接时引发该错误。
_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE,
errno.ETIMEDOUT)
# For windows
if hasattr(errno, "WSAECONNRESET"):
_ERRNO_CONNRESET += (errno.WSAECONNRESET, errno.WSAECONNABORTED, errno.WSAETIMEDOUT)
# More non-portable errnos:
# 在以非阻塞方式 connect() 时,返回的结果如果是 -1 ,并且错误号为 EINPROGRESS ,那么表示
# 连接还在进行并处理中(IN PROGRESS),而不是真的发生了错误。
_ERRNO_INPROGRESS = (errno.EINPROGRESS,)
# For windows
if hasattr(errno, "WSAEINPROGRESS"):
_ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,)
流的读写实现
BaseIOStream
提供了一个对非阻塞 I/O 读写的抽象,它主要提供了 5 个读写方法:
read_until_regex
: 从流中读取数据直到遇上与指定正则表达式(由 regex 参数指定)匹配的字符串。在底层 fd 关闭或者读取指定的最大字节数据(由 max_bytes 参数指定)后还没有匹配上则抛出 UnsatisfiableReadError 异常。read_until
: 从流中读取数据直到遇到指定的分隔符(有 delimiter 参数指定)为止。在底层的 fd 关闭或者读取指定的最大字节数据后还没有遇到分隔符则抛出 UnsatisfiableReadError异常。该方法实际上为读取 Http Header 遇到空行分隔符自动停止提供了方便。read_bytes
: 从流中读取指定大小的字节数据。这实际上为根据 “Content-Length” 读取 Http Body 提供了方便。需要注意的时,如果底层 fd 关闭时也没有读取到指定大小的字节数据(由num_bytes 参数指定),callback 不会被调用。read_until_close
: 从流中读取数据直到底层 fd 关闭为止。与通过无限大的 num_bytes 调用read_bytes
方法类似,但是 callback 始终会被调用。write
: 将指定的数据写入流的 write buffer,并持续监测底层 fd 的写事件知道将 write buffer 全部写入 fd。
上述方法都提供了 callback
参数作为异步操作完成后的回调函数,当该参数不为 None 时读取的数据将会作为 callback
的参数回调;为 None 时,方法将返回一个 .Future
实例,数据将作为 .Future
的 result 返回。这里稍微提一下,read_bytes
与 read_until_close
允许分多次返回读取的数据而不必等待所有数据都到了才返回,所以提供了一个额外的参数 streaming_callback
,当该参数不为 None 时,一旦流的 read buffer 中有数据可用便立即将数据作为 streaming_callback
的参数回调, callback
的回调参数为空(b””)。
对于具体类型 fd 的操作, BaseIOStream
提供了相关的抽象方法由具体的流实现类来实现,这些方法诸如:write_to_fd
、read_from_fd
、close_fd
、get_fd_error
等等。
BaseIOStream
提供的读写方法基本上实现了流读取的逻辑,接下来将就各个方法的实现代码进行分析。
深入 read_until_regex 与 read_until 方法
这两个方法比较相似,都是提供匹配特定字符串的读,read_until
更像是 read_until_regex
的特化版本,所以把二者放在一起分析。先来看看 read_until_regex
方法的实现:
1 | def read_until_regex(self, regex, callback=None, max_bytes=None): |
在 read_until
方法中分隔符是保存在另外一个实例字段 _read_delimiter
中,以便在数据搜索匹配时使用(参见后面 _find_read_pos()
方法),除此之外两方法的实现是相同的。 _set_read_callback
方法负责设置异步读操作完成的回调, _read_callback
与 _read_future
二选一(常见后面 _run_read_callback
方法实现)。_try_inline_read
尝试先在流的 read buffer 中完成读操作以便在下一次 IOLoop 时将结果返回,否则就注册监听 fd 的读事件,代码如下所示:
1 | def _try_inline_read(self): |
_run_streaming_callback 方法
上面代码中的 _run_streaming_callback()
方法顾名思义,是用于执行 self._streaming_callback
回调,代码很简单,在 read buffer 有数据可读时调用 streaming_callback,代码如下所示:
1 | def _run_streaming_callback(self): |
最终负责调用回调函数的方法是 _run_read_callback
,该方法全权负责根据指定的参数调用 read 操作的回调。参数 size
指定要从 read buffer 中读取的数据大小,streaming
指定回调的是 _streaming_callback
还是最终的 _read_callback
。
1 | def _run_read_callback(self, size, streaming): |
NOTE: 正如上面代码注释中指出的,这个版本的 Tornado(4.0.1) 在 _run_read_callback
方法实现有 bug。后续版本已经修复了这个 bug,代码如下所示:
1 | def _run_read_callback(self, size, streaming): |
_run_callback
方法会将 callback
加入到 IOLoop 的回调列表中,以便在下一次 IOLoop 时执行回调。在将 callback
加入 IOLoop 之前, _run_callback
对其进行了上下文清理(注:阻止 callback 中又添加 callback 到 IOLoop 造成上下文无限增长和重入)和包装(异常时主动释放资源,回调完成后调用 _maybe_add_error_listener 方法),代码很少,但是注释很负责。
1 | def _run_callback(self, callback, *args): |
需要注意一下的是 _maybe_add_error_listener()
方法,该方法保证在最后一个 callback
调用结束后尝试注册检测底层 fd Error 事件。该方法和其内部调用的 _add_io_state
方法使用了一个优化技巧,作者在代码注释中写的很清楚。简单来说就是,由于对流的读写实现有优先级:
- 优先直接从流的 buffer 和 fd buffer 中读写(fast path);
- 只有流和 fd 的 buffer 不可用时,才监听 fd 的读写事件,异步读写(slow path)。
注:不管采用的是哪一种读写方式,最终执行 callback
还是通过 _run_callback
方法。_add_io_state
方法将对 fd 读写事件的监听与 Error 事件监听捆绑在一起,所以只有我们需要异步读写时才捕获 fd Error 事件,这样实际上也就推迟了对 fd 关闭的检测,以便流和fd 的 buffer 尽可能被读取。
1 | def _maybe_add_error_listener(self): |
_handle_events
方法会根据 fd 的事件类型进行 _handle_connect
、 _handle_read
、_handle_write
以及 error 处理。
_find_read_pos 方法
_find_read_pos
方法会在当前流的 read buffer 中尝试完成此次读取操作,如果读取操作可以完成,就返回一个 position,后续便可以将这个 position 传递给 _read_from_buffer
方法以读取数据执行回调操作,完成此次读取调用。
1 | def _find_read_pos(self): |
_read_from_buffer 和 _read_to_buffer_loop 方法
_read_from_buffer
方法里面重置了一下读取操作的状态,然后就直接从流的read buffer 中读取数据完成此次读取操作。
_read_to_buffer_loop
方法先从 fd 的 read buffer 中把数据读到流的 read buffer 中,然后再执行 run streaming callback 和数据搜索,返回搜索的结果。看起来就像一个需要自己先准备数据再搜索数据的 _find_read_pos
方法版本。_read_to_buffer_loop
方法调用 _read_to_buffer
方法实现把数据从 fd 的 buffer 中读到流的 buffer,其中从 fd 中读取数据的方法 read_from_fd
是一个抽象方法,在 IOStream
中有对应的 socket
实现,在 PipeIOStream
中有 pipe
的实现。
深入 read_bytes 与 read_until_close 方法
与 read_until_regex
与 read_until
方法一样,read_bytes
与 read_until_close
方法实现的核心也是 _try_inline_read
方法。但是由于这两个方法可以支持分多次将读取的数据返回而不必等到所有数据完整后才返回,所以都多出了一个 streaming_callback
回调参数,该参数不为 None 时,当流的 read buffer 有数据可读时便可会立即将数据作为参数回调 streaming_callback
, callback
仅仅作为异步接口的协议在异步操作结束的时候调用,其回调参数为 b””。
对于这两个方法,有几点需要注意一下:
read_bytes
方法的签名中有一个默认参数partial
, 该参数指定是否允许部分读数据,当partial=True
和streaming_callback=None
时只要能从流的 read buffer 中读取到数据,就立刻完成此次读操作而不管读取的数据大小是否达到了指定的num_bytes
(注:若streaming_callback is not None
, 那么 read buffer 中的数据是轮不到callback
的,实际上也就无从谈起部分读取返回)。这个方法的签名设计让人有点费解,streaming_callback
参数表明的是一种允许分多次读取数据,表达了必须读取足够的数据才算是完成读操作的意图;而partial
又表明的是允许读取部分数据的意图,表达的是可以部分读取数据就算完成读操作的意图。那么当streaming_callback is not None
和partial=True
时, 后者实际是无效的参数设置;read_bytes
方法完成读操作,有一个num_bytes
限制,那么就存在这么一种未完成读取的情况:当底层 fd 关闭时还没有读取到足够的数据就不算完成操作,也就不会调用callback
或者为_read_future
设置 result;read_until_close
方法表达的是底层 fd 关闭时完成读取操作,所以callback
一定会被调用,这也就是为什么read_until_close
代码中需要先检查流的关闭状态,然后再委托_try_inline_read
方法。而read_bytes
不需要这个检查,直接委托_try_inline_read
即可。代码如下所示:
1 | def read_bytes(self, num_bytes, callback=None, streaming_callback=None, |
深入 write 方法
write
方法实现很简单,就是将数据分块(大小为 128 * 1024 字节,以避免在写入 socket 的时候再分块)写入流的 write buffer,然后监听 fd 的写事件,直到将 write buffer 中的数据全部写入 fd 时完成写操作,调用写回调或者设置 _write_future
的 result。同样涉及到写具体 fd 的操作方 write_to_fd
是在具体实现类中实现的。