0%

tornado.iostream 模块解析

从 Files/Sockets 到 Stream

tornado.iostream 模块为 Tornado 提供了一系列读写非阻塞 files/sockets 的工具类。该模块主要包括以下 4 个主要的工具类:

  • BaseIOStream: 基础流读写接口,作为特定流的父类;
  • IOStream: 针对非阻塞 sockets 的流实现;
  • SSLIOStream: SSL-aware版本的 IOStream 实现;
  • PipeIOStream: 针对管道(Pipe)的流实现;

BaseIOStream 作为基础流读写接口,实现了大部分的功能封装。后面的源代码分析中,主要就是基于该类并结合其非阻塞 socket 版本的 IOStream 来讨论。

查看该模块的时候,我们会发现两个模块内函数 _double_prefix(deque)_merge_prefix(deque, size)。这两个工具函数的实现都很简单,但是为流的读写提供通用的操作数据块(chunk)的功能:

  • _double_prefix(deque): 该函数提供了将 buffer 的第 1 个 chunk 增大至少 1 倍的功能,该功能现在用在按条件在流的 buffer 中搜索匹配字符串时逐渐扩大搜索的数据块大小。
  • _merge_prefix(deque, size): 该函数提供了将 buffer 的第 1 个 chunk 调整到指定 size 大小。这在读写流时非常有用, _double_prefix(deque) 就是通过该函数来调整 chunk 大小的。在将流的 write_buffer 写入 fd 时,通过该函数适当调整第 1 个 chunk 的大小,我们就可以直接操作 buffer 的第 1 个 chunk 来达到操作整个 buffer 的目的,简化了实现的难度。详细可见 BaseStream._handle_writ 函数实现代码。

IOStream

一些基础知识

在源码的开始部分,作者写了一大段介绍 recv/send 与 read/write 函数的区别,以及各平台的操作非阻塞 I/O 时返回的错误码。recv/send 与 read/write 函数的区别大体上就是说,前者是特化的函数,提供了一些额外的选项来控制 fd 的读写操作,针对具体的 fd 实例你可以设置选项忽略 SIGPIPE 信号或者让 socket 发送带外数据等等, 后者只是提供了通用的 fd 读写操作。对于操作非阻塞 fd 返回的错误码,如下模块的静态变量对应的注释所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 非阻塞操作时,缓冲区满(无法写)时或者缓冲区空(读不到数据)时返回 EAGAIN, BSD 下使用 EWOULDBLOCK, Windows下使用 WSAEWOULDBLOCK
_ERRNO_WOULDBLOCK = (errno.EWOULDBLOCK, errno.EAGAIN)

# For windows
if hasattr(errno, "WSAEWOULDBLOCK"):
_ERRNO_WOULDBLOCK += (errno.WSAEWOULDBLOCK,)

# These errnos indicate that a connection has been abruptly terminated.
# They should be caught and handled less noisily than other errors.
#
# `ECONNRESET`: 该异常一般发生在连接的一端(A)进程较另一端(B)提前终止时。A 进程终止时会向 B 发送 FIN 后进入
# FIN_WAIT1 状态,B 回应 ACK,A 收到 FIN 的 ACK 进入 FIN_WAIT2 状态。B 收到 FIN 时,会向应用程序交付 EOF,
# 进入 CLOSE_WAIT 状态。若此时 B 进程没有正常处理 FIN(例如被阻塞)而再次向处于 FIN_WAIT2 的 A 发送数据,将会
# 收到 RST,引发该错误。
#
# `ECONNABORTED`: 软件引起的连接中止,当服务端和客户端完成三次握手后,服务端正在等待服务进程调用 accept 时候却收到客户端
# 发来一个 RST 分节,引发该错误。POSIX 规定此时的 errno 值必须 ECONNABORTED。源自 Berkeley 的实现完全在内核中处理中
# 止的连接,服务进程将永远不知道该中止的发生。服务器进程一般可以忽略该错误,直接再次调用 accept。
#
# `EPIPE`: 错误被描述为 "broken pipe" ,即 "管道破裂",这种情况一般发生在客户进程不理会(或未及时处理)socket 错误,
# 而继续向 socket 写入更多数据时,内核将向客户进程发送 SIGPIPE 信号,该信号默认会使进程终止(此时该前台进程未进行 core dump)。
#
# `ETIMEDOUT`: 连接超时, 这种错误一般发生在服务器端崩溃而不响应客户端 ACK 时,客户端最终放弃尝试连接时引发该错误。
_ERRNO_CONNRESET = (errno.ECONNRESET, errno.ECONNABORTED, errno.EPIPE,
errno.ETIMEDOUT)

# For windows
if hasattr(errno, "WSAECONNRESET"):
_ERRNO_CONNRESET += (errno.WSAECONNRESET, errno.WSAECONNABORTED, errno.WSAETIMEDOUT)

# More non-portable errnos:
# 在以非阻塞方式 connect() 时,返回的结果如果是 -1 ,并且错误号为 EINPROGRESS ,那么表示
# 连接还在进行并处理中(IN PROGRESS),而不是真的发生了错误。
_ERRNO_INPROGRESS = (errno.EINPROGRESS,)

# For windows
if hasattr(errno, "WSAEINPROGRESS"):
_ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,)

流的读写实现

BaseIOStream 提供了一个对非阻塞 I/O 读写的抽象,它主要提供了 5 个读写方法:

  1. read_until_regex: 从流中读取数据直到遇上与指定正则表达式(由 regex 参数指定)匹配的字符串。在底层 fd 关闭或者读取指定的最大字节数据(由 max_bytes 参数指定)后还没有匹配上则抛出 UnsatisfiableReadError 异常。

  2. read_until: 从流中读取数据直到遇到指定的分隔符(有 delimiter 参数指定)为止。在底层的 fd 关闭或者读取指定的最大字节数据后还没有遇到分隔符则抛出 UnsatisfiableReadError异常。该方法实际上为读取 Http Header 遇到空行分隔符自动停止提供了方便。

  3. read_bytes: 从流中读取指定大小的字节数据。这实际上为根据 “Content-Length” 读取 Http Body 提供了方便。需要注意的时,如果底层 fd 关闭时也没有读取到指定大小的字节数据(由num_bytes 参数指定),callback 不会被调用。

  4. read_until_close: 从流中读取数据直到底层 fd 关闭为止。与通过无限大的 num_bytes 调用 read_bytes 方法类似,但是 callback 始终会被调用。

  5. write: 将指定的数据写入流的 write buffer,并持续监测底层 fd 的写事件知道将 write buffer 全部写入 fd。

上述方法都提供了 callback 参数作为异步操作完成后的回调函数,当该参数不为 None 时读取的数据将会作为 callback 的参数回调;为 None 时,方法将返回一个 .Future 实例,数据将作为 .Future 的 result 返回。这里稍微提一下,read_bytesread_until_close 允许分多次返回读取的数据而不必等待所有数据都到了才返回,所以提供了一个额外的参数 streaming_callback,当该参数不为 None 时,一旦流的 read buffer 中有数据可用便立即将数据作为 streaming_callback 的参数回调, callback 的回调参数为空(b””)

对于具体类型 fd 的操作, BaseIOStream 提供了相关的抽象方法由具体的流实现类来实现,这些方法诸如:write_to_fdread_from_fdclose_fdget_fd_error 等等。

BaseIOStream 提供的读写方法基本上实现了流读取的逻辑,接下来将就各个方法的实现代码进行分析。

深入 read_until_regex 与 read_until 方法

这两个方法比较相似,都是提供匹配特定字符串的读,read_until 更像是 read_until_regex 的特化版本,所以把二者放在一起分析。先来看看 read_until_regex 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def read_until_regex(self, regex, callback=None, max_bytes=None):
"""Asynchronously read until we have matched the given regex.

The result includes the data that matches the regex and anything
that came before it. If a callback is given, it will be run
with the data as an argument; if not, this method returns a
`.Future`.

If ``max_bytes`` is not None, the connection will be closed
if more than ``max_bytes`` bytes have been read and the regex is
not satisfied.

.. versionchanged:: 4.0
Added the ``max_bytes`` argument. The ``callback`` argument is
now optional and a `.Future` will be returned if it is omitted.
"""
future = self._set_read_callback(callback)
# 在 read_until 方法中分隔符是保存在另外一个实例字段中:
# self._read_delimiter = delimiter
self._read_regex = re.compile(regex)
self._read_max_bytes = max_bytes
try:
self._try_inline_read()
except UnsatisfiableReadError as e:
# Handle this the same way as in _handle_events.
gen_log.info("Unsatisfiable read, closing connection: %s" % e)
self.close(exc_info=True)
return future
return future

def _set_read_callback(self, callback):
assert self._read_callback is None, "Already reading"
assert self._read_future is None, "Already reading"
# 按照异步设计,在 callback 指定的情况下,self._read_future = None,
# 否则设置 self._read_future = TracebackFuture() 用于返回异步执
# 行结果。
if callback is not None:
self._read_callback = stack_context.wrap(callback)
else:
self._read_future = TracebackFuture()
return self._read_future

read_until 方法中分隔符是保存在另外一个实例字段 _read_delimiter 中,以便在数据搜索匹配时使用(参见后面 _find_read_pos() 方法),除此之外两方法的实现是相同的。 _set_read_callback 方法负责设置异步读操作完成的回调, _read_callback_read_future 二选一(常见后面 _run_read_callback 方法实现)。_try_inline_read 尝试先在流的 read buffer 中完成读操作以便在下一次 IOLoop 时将结果返回,否则就注册监听 fd 的读事件,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def _try_inline_read(self):
"""Attempt to complete the current read operation from buffered data.

If the read can be completed without blocking, schedules the
read callback on the next IOLoop iteration; otherwise starts
listening for reads on the socket.
"""
# See if we've already got the data from a previous read
# 目前实际上仅在通过 read_bytes/read_until_close 方法调用时有意义,read_until_regex/read_until
# 方法是没有 streaming_callback 参数的,当然也就不存在 run streaming callback。在这里调用
# self._run_streaming_callback() 只是为了一般性而已。
self._run_streaming_callback()
# pos 若不为 None,则表示数据读取完成(读取到足够大小的数据、正则表达式匹配成功或者遇到指定的分隔符)。对于
# read_until_close 方法调用始终返回 None
pos = self._find_read_pos()
if pos is not None:
# 重置读取状态,获取数据,将读操作回调(_read_callback 或者 _read_future )加入 IOLoop,在下一次
# IOLoop 时将数据返回。
self._read_from_buffer(pos)
return
# 检测底层 fd 是否已经关闭,如果关闭则抛出 StreamClosedError 异常
self._check_closed()
try:
# 因为前面尝试从 read buffer 中读取数据完成操作失败了,所以这里再尝试从底层的 fd 中读取(可读)数据来完成此次读操作。
# 其内部实现也就是从 fd 中把数据读到 read buffer 再重复前面的 self._run_streaming_callback() 和
# self._find_read_pos() 操作,然后把 self._find_read_pos() 的结果返回。
pos = self._read_to_buffer_loop()
except Exception:
# If there was an in _read_to_buffer, we called close() already,
# but couldn't run the close callback because of _pending_callbacks.
# Before we escape from this function, run the close callback if
# applicable.
self._maybe_run_close_callback()
raise
if pos is not None:
self._read_from_buffer(pos)
return
# We couldn't satisfy the read inline, so either close the stream
# or listen for new data.
if self.closed():
self._maybe_run_close_callback()
else:
# 如果经过上述尝试都还无法完成此次读操作,则注册监听 fd 的读事件直到读操作完成或者 fd 关闭。
# _handle_events 方法会异步处理 fd 的事件,对应的 READ 事件会调用 _handle_read 方法,
# 而 _handle_read 方法内部会调用 _read_to_buffer_loop 方法,然后执行和上述差不多的操作。
self._add_io_state(ioloop.IOLoop.READ)
_run_streaming_callback 方法

上面代码中的 _run_streaming_callback() 方法顾名思义,是用于执行 self._streaming_callback 回调,代码很简单,在 read buffer 有数据可读时调用 streaming_callback,代码如下所示:

1
2
3
4
5
6
7
8
def _run_streaming_callback(self):
if self._streaming_callback is not None and self._read_buffer_size:
bytes_to_consume = self._read_buffer_size
# 读取不超过 buffer size 的数据
if self._read_bytes is not None:
bytes_to_consume = min(self._read_bytes, bytes_to_consume)
self._read_bytes -= bytes_to_consume
self._run_read_callback(bytes_to_consume, True)

最终负责调用回调函数的方法是 _run_read_callback,该方法全权负责根据指定的参数调用 read 操作的回调。参数 size 指定要从 read buffer 中读取的数据大小,streaming 指定回调的是 _streaming_callback 还是最终的 _read_callback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def _run_read_callback(self, size, streaming):
if streaming:
callback = self._streaming_callback
else:
callback = self._read_callback
self._read_callback = self._streaming_callback = None
# 这里其实有一个 bug ,后面这个 if 的内容逻辑上讲应该在前一个else里面(与
# callback = self._read_callback 处于同一作用域)。_read_callback 与 _read_future
# 是读操作完成后的回调,二者是二选一。streaming 指定为 True 的时候是不需要调用 _read_future。
#
# 当指定 streaming_callback(is not None) 而不指定 callback(is None) 调用 read_bytes 或者
# read_until_close 方法时,后面的断言就会失败。如果用 –O 或 –oo 选项运行 Python 而忽略掉断言的
# 话,callback 是不会被调用的,读取的数据将直接作为 _read_future 的结果返回。
if self._read_future is not None:
assert callback is None
future = self._read_future
self._read_future = None
future.set_result(self._consume(size))
if callback is not None:
assert self._read_future is None
self._run_callback(callback, self._consume(size))
else:
# If we scheduled a callback, we will add the error listener
# afterwards. If we didn't, we have to do it now.
self._maybe_add_error_listener()

def _consume(self, loc):
# loc=0时返回的是空而不是None,这就是前面分析到的 streaming_callback 被指定时,
# read 操作完成后调用 _read_callback 或者 _read_future 的数据是空。
if loc == 0:
return b""
_merge_prefix(self._read_buffer, loc)
self._read_buffer_size -= loc
return self._read_buffer.popleft()

NOTE: 正如上面代码注释中指出的,这个版本的 Tornado(4.0.1) 在 _run_read_callback 方法实现有 bug。后续版本已经修复了这个 bug,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def _run_read_callback(self, size, streaming):
if streaming:
callback = self._streaming_callback
else:
callback = self._read_callback
self._read_callback = self._streaming_callback = None
if self._read_future is not None:
assert callback is None
future = self._read_future
self._read_future = None
future.set_result(self._consume(size))
if callback is not None:
assert (self._read_future is None) or streaming
self._run_callback(callback, self._consume(size))
else:
# If we scheduled a callback, we will add the error listener
# afterwards. If we didn't, we have to do it now.
# 通过 self._run_callback 调用 callback 后会自动执行 _maybe_add_error_listener 方法。
# 而如果调用的是 future.set_result 的话,则需要在这里执行下 _maybe_add_error_listener 方法。
self._maybe_add_error_listener()

_run_callback 方法会将 callback 加入到 IOLoop 的回调列表中,以便在下一次 IOLoop 时执行回调。在将 callback 加入 IOLoop 之前, _run_callback 对其进行了上下文清理(注:阻止 callback 中又添加 callback 到 IOLoop 造成上下文无限增长和重入)和包装(异常时主动释放资源,回调完成后调用 _maybe_add_error_listener 方法),代码很少,但是注释很负责。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def _run_callback(self, callback, *args):
def wrapper():
self._pending_callbacks -= 1
try:
return callback(*args)
except Exception:
app_log.error("Uncaught exception, closing connection.",
exc_info=True)
# Close the socket on an uncaught exception from a user callback
# (It would eventually get closed when the socket object is
# gc'd, but we don't want to rely on gc happening before we
# run out of file descriptors)
#
# 主动关闭底层的 fd,并将其引用释放而不用等待 GC 来释放
self.close(exc_info=True)
# Re-raise the exception so that IOLoop.handle_callback_exception
# can see it and log the error
raise
finally:
self._maybe_add_error_listener()

# We schedule callbacks to be run on the next IOLoop iteration
# rather than running them directly for several reasons:
# * Prevents unbounded stack growth when a callback calls an
# IOLoop operation that immediately runs another callback
# * Provides a predictable execution context for e.g.
# non-reentrant mutexes
# * Ensures that the try/except in wrapper() is run outside
# of the application's StackContexts
with stack_context.NullContext():
# stack_context was already captured in callback, we don't need to
# capture it again for IOStream's wrapper. This is especially
# important if the callback was pre-wrapped before entry to
# IOStream (as in HTTPConnection._header_callback), as we could
# capture and leak the wrong context here.
#
# 对于已经被 stack_context 捕捉了上下文快照的 callback , 不需要再重复捕捉(capture)
# ,否则会造成上下文无限增长和重入(注:当 IOLoop 执行一个 callback 时,该 callback 又往
# IOLoop 添加一个 new callback 时会发生这种情况),所以这里清空上下文避免再次 capture。
# (注:异步回调一旦加入到 IOLoop 就会被指定调用 stack_context.wrap 方法捕捉上下文快照,
# 实现细节请参考 stack_context 模块。)
self._pending_callbacks += 1
self.io_loop.add_callback(wrapper)

需要注意一下的是 _maybe_add_error_listener() 方法,该方法保证在最后一个 callback 调用结束后尝试注册检测底层 fd Error 事件。该方法和其内部调用的 _add_io_state 方法使用了一个优化技巧,作者在代码注释中写的很清楚。简单来说就是,由于对流的读写实现有优先级:

  1. 优先直接从流的 buffer 和 fd buffer 中读写(fast path);
  2. 只有流和 fd 的 buffer 不可用时,才监听 fd 的读写事件,异步读写(slow path)。

注:不管采用的是哪一种读写方式,最终执行 callback 还是通过 _run_callback 方法。
_add_io_state 方法将对 fd 读写事件的监听与 Error 事件监听捆绑在一起,所以只有我们需要异步读写时才捕获 fd Error 事件,这样实际上也就推迟了对 fd 关闭的检测,以便流和fd 的 buffer 尽可能被读取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def _maybe_add_error_listener(self):
# This method is part of an optimization: to detect a connection that
# is closed when we're not actively reading or writing, we must listen
# for read events. However, it is inefficient to do this when the
# connection is first established because we are going to read or write
# immediately anyway. Instead, we insert checks at various times to
# see if the connection is idle and add the read listener then.
if self._pending_callbacks != 0:
return
if self._state is None or self._state == ioloop.IOLoop.ERROR:
if self.closed():
self._maybe_run_close_callback()
# 只要 buffer 中有数据或者 _close_callback=None 就不会主动去监测 fd 的读事件。
# 这样一来,对 fd 关闭的检测就依赖于明确的对 fd 的异步读写监测,即主动调用 _add_io_state
# 方法。
elif (self._read_buffer_size == 0 and
self._close_callback is not None):
self._add_io_state(ioloop.IOLoop.READ)

def _add_io_state(self, state):
"""Adds `state` (IOLoop.{READ,WRITE} flags) to our event handler.

Implementation notes: Reads and writes have a fast path and a
slow path. The fast path reads synchronously from socket
buffers, while the slow path uses `_add_io_state` to schedule
an IOLoop callback. Note that in both cases, the callback is
run asynchronously with `_run_callback`.

To detect closed connections, we must have called
`_add_io_state` at some point, but we want to delay this as
much as possible so we don't have to set an `IOLoop.ERROR`
listener that will be overwritten by the next slow-path
operation. As long as there are callbacks scheduled for
fast-path ops, those callbacks may do more reads.
If a sequence of fast-path ops do not end in a slow-path op,
(e.g. for an @asynchronous long-poll request), we must add
the error handler. This is done in `_run_callback` and `write`
(since the write callback is optional so we can have a
fast-path write with no `_run_callback`)
"""
if self.closed():
# connection has been closed, so there can be no future events
return
if self._state is None:
self._state = ioloop.IOLoop.ERROR | state
with stack_context.NullContext():
self.io_loop.add_handler(
self.fileno(), self._handle_events, self._state)
elif not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.fileno(), self._state)

_handle_events 方法会根据 fd 的事件类型进行 _handle_connect_handle_read_handle_write 以及 error 处理。

_find_read_pos 方法

_find_read_pos 方法会在当前流的 read buffer 中尝试完成此次读取操作,如果读取操作可以完成,就返回一个 position,后续便可以将这个 position 传递给 _read_from_buffer 方法以读取数据执行回调操作,完成此次读取调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def _find_read_pos(self):
"""Attempts to find a position in the read buffer that satisfies
the currently-pending read.

Returns a position in the buffer if the current read can be satisfied,
or None if it cannot.
"""
# self._read_bytes 保存的是 read_bytes() 方法中的 num_bytes 。
# 也就是说通过 read_bytes() 读取数据时,当 buffer 中有数据时,若 >= num_bytes 或者
# _read_partial=True 则完成读取,返回可用的数据长度。
if (self._read_bytes is not None and
(self._read_buffer_size >= self._read_bytes or
(self._read_partial and self._read_buffer_size > 0))):
num_bytes = min(self._read_bytes, self._read_buffer_size)
return num_bytes
elif self._read_delimiter is not None:
# Multi-byte delimiters (e.g. '\r\n') may straddle two
# chunks in the read buffer, so we can't easily find them
# without collapsing the buffer. However, since protocols
# using delimited reads (as opposed to reads of a known
# length) tend to be "line" oriented, the delimiter is likely
# to be in the first few chunks. Merge the buffer gradually
# since large merges are relatively expensive and get undone in
# _consume().
if self._read_buffer:
while True:
loc = self._read_buffer[0].find(self._read_delimiter)
if loc != -1:
delimiter_len = len(self._read_delimiter)
# 检查找到分隔符的位置,如果该位置超过了指定的 read_max_bytes
# 则抛出 UnsatisfiableReadError
self._check_max_bytes(self._read_delimiter,
loc + delimiter_len)
return loc + delimiter_len
# 如果搜索完了 buffer 中所有数据都还没找到分隔符,就跳出循环。
# 注:这里基于一个假设,就是数据是符合协议要求的,那么在很少的几个 chunk 中就会找到分隔符,
# 所以没有在每次循环后调用 _check_max_bytes。那么恶意发送大量数据而不发送分隔符的话,
# 除了造成内存的浪费,合并 chunk 的操作也会很昂贵。
if len(self._read_buffer) == 1:
break
# 在第一个 chunk 中没有找到分隔符的话,需要调整一下 chunk,扩大搜索的数据范围
_double_prefix(self._read_buffer)
self._check_max_bytes(self._read_delimiter,
len(self._read_buffer[0]))
elif self._read_regex is not None:
if self._read_buffer:
while True:
m = self._read_regex.search(self._read_buffer[0])
if m is not None:
self._check_max_bytes(self._read_regex, m.end())
return m.end()
if len(self._read_buffer) == 1:
break
_double_prefix(self._read_buffer)
self._check_max_bytes(self._read_regex,
len(self._read_buffer[0]))
return None

def _check_max_bytes(self, delimiter, size):
if (self._read_max_bytes is not None and
size > self._read_max_bytes):
raise UnsatisfiableReadError(
"delimiter %r not found within %d bytes" % (
delimiter, self._read_max_bytes))
_read_from_buffer 和 _read_to_buffer_loop 方法

_read_from_buffer 方法里面重置了一下读取操作的状态,然后就直接从流的read buffer 中读取数据完成此次读取操作。

_read_to_buffer_loop 方法先从 fd 的 read buffer 中把数据读到流的 read buffer 中,然后再执行 run streaming callback 和数据搜索,返回搜索的结果。看起来就像一个需要自己先准备数据再搜索数据的 _find_read_pos 方法版本。
_read_to_buffer_loop 方法调用 _read_to_buffer 方法实现把数据从 fd 的 buffer 中读到流的 buffer,其中从 fd 中读取数据的方法 read_from_fd 是一个抽象方法,在 IOStream 中有对应的 socket 实现,在 PipeIOStream 中有 pipe 的实现。

深入 read_bytes 与 read_until_close 方法

read_until_regexread_until 方法一样,read_bytesread_until_close 方法实现的核心也是 _try_inline_read 方法。但是由于这两个方法可以支持分多次将读取的数据返回而不必等到所有数据完整后才返回,所以都多出了一个 streaming_callback 回调参数,该参数不为 None 时,当流的 read buffer 有数据可读时便可会立即将数据作为参数回调 streaming_callbackcallback 仅仅作为异步接口的协议在异步操作结束的时候调用,其回调参数为 b””

对于这两个方法,有几点需要注意一下:

  1. read_bytes 方法的签名中有一个默认参数 partial , 该参数指定是否允许部分读数据,当 partial=Truestreaming_callback=None 时只要能从流的 read buffer 中读取到数据,就立刻完成此次读操作而不管读取的数据大小是否达到了指定的 num_bytes注:若 streaming_callback is not None , 那么 read buffer 中的数据是轮不到 callback 的,实际上也就无从谈起部分读取返回)。这个方法的签名设计让人有点费解,streaming_callback 参数表明的是一种允许分多次读取数据,表达了必须读取足够的数据才算是完成读操作的意图;而 partial 又表明的是允许读取部分数据的意图,表达的是可以部分读取数据就算完成读操作的意图。那么当 streaming_callback is not Nonepartial=True 时, 后者实际是无效的参数设置;

  2. read_bytes 方法完成读操作,有一个 num_bytes 限制,那么就存在这么一种未完成读取的情况:当底层 fd 关闭时还没有读取到足够的数据就不算完成操作,也就不会调用 callback 或者为 _read_future 设置 result;

  3. read_until_close 方法表达的是底层 fd 关闭时完成读取操作,所以 callback 一定会被调用,这也就是为什么 read_until_close 代码中需要先检查流的关闭状态,然后再委托 _try_inline_read 方法。而 read_bytes 不需要这个检查,直接委托 _try_inline_read 即可。代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def read_bytes(self, num_bytes, callback=None, streaming_callback=None,
partial=False):
"""
从 stream 中读取指定数量的字节。这实际上为读取 HTTP 消息的 body 部分提供了方便。
这里要注意一下 ``streaming_callback`` 和 ``callback`` 参数,在提供前者的情况
下,一旦有数据可读取便会调用 ``streaming_callback``,后者 ``callback`` 将在读
取到 num_bytes 大小字节数据后调用。如果直到 socket 关闭依然没有读取到 num_bytes
大小字节的数据, ``callback`` 是不会被调用的,_maybe_run_close_callback()方法
中会负责将其重置为 None。

``partial`` 参数指定是否只要 read buffer 中能读取到数据,即使数据大小小于 num_bytes
指定的字节数也完成此次异步读操作(streaming_callback=None时有效)。
"""
future = self._set_read_callback(callback)
assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes
self._read_partial = partial
self._streaming_callback = stack_context.wrap(streaming_callback)
self._try_inline_read()
return future

def read_until_close(self, callback=None, streaming_callback=None):
"""
大约与读取数据字节大小无限且不允许部分读取的 read_bytes() 方法等同,差别在 ``callback``
必须在 fd 关闭后调用,而 read_bytes() 方法不调用。
"""
future = self._set_read_callback(callback)
self._streaming_callback = stack_context.wrap(streaming_callback)

# 如果 socket 已经关闭那么就直接读取 buffer 中的数据并返回。
# 与 read_bytes() 不同, callback 的执行条件是 fd 关闭,所以必须要调用。
# self._try_inline_read() 现在的实现不会在检测到 fd 关闭时调用 callback,
# 所以这里必须要检查一下 fd 的状态,以确保 callback 被调用。
if self.closed():
if self._streaming_callback is not None:
self._run_read_callback(self._read_buffer_size, True)
self._run_read_callback(self._read_buffer_size, False)
return future

# _read_until_close 字段将在 close() 方法中使用并重置为 False,该字段决定 close 时
# 是否执行 self._read_callback 回调方法。通过调用 self._run_read_callback(self._read_buffer_size, False)
# 实现,具体参见 close() 方法。
self._read_until_close = True
self._try_inline_read()
return future
深入 write 方法

write 方法实现很简单,就是将数据分块(大小为 128 * 1024 字节,以避免在写入 socket 的时候再分块)写入流的 write buffer,然后监听 fd 的写事件,直到将 write buffer 中的数据全部写入 fd 时完成写操作,调用写回调或者设置 _write_future 的 result。同样涉及到写具体 fd 的操作方 write_to_fd 是在具体实现类中实现的。