defbind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=_DEFAULT_BACKLOG, flags=None): """Creates listening sockets bound to the given port and address. Returns a list of socket objects (multiple sockets are returned if the given address maps to multiple IP addresses, which is most common for mixed IPv4 and IPv6 use). Address may be either an IP address or hostname. If it's a hostname, the server will listen on all IP addresses associated with the name. Address may be an empty string or None to listen on all available interfaces. Family may be set to either `socket.AF_INET` or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise both will be used if available. The ``backlog`` argument has the same meaning as for `socket.listen() <socket.socket.listen>`. ``flags`` is a bitmask of AI_* flags to `~socket.getaddrinfo`, like ``socket.AI_PASSIVE | socket.AI_NUMERICHOST``. """ sockets = [] if address == "": address = None ifnot socket.has_ipv6 and family == socket.AF_UNSPEC: # Python can be compiled with --disable-ipv6, which causes # operations on AF_INET6 sockets to fail, but does not # automatically exclude those results from getaddrinfo # results. # http://bugs.python.org/issue16208 # # 由于可以通过指定编译选项来编译出仅支持 ipv4 的 python 版本,为了保证 # getaddrinfo 也只返回 ipv4 的地址,所以这里指定 socket.AF_INET family = socket.AF_INET
# socket.getaddrinfo(host, port[, family[, socktype[, proto[, flags]]]]) # family: 协议簇,常用的协议包括 AF_UNIX(1,本机通信)/AF_INET(2,IPV4)/AF_INET6(10,IPV6)。 # socktype:socket 的类型,常见的socket类型包括SOCK_STREAM(TCP流)/SOCK_DGRAM(UDP数据报)/SOCK_RAW(原始套接字)。 # 其中,SOCK_STREAM=1,SOCK_DGRAM=2,SOCK_RAW=3。 # proto:协议,套接口所用的协议。如调用者不想指定,可用0。常用的协议有,IPPROTO_TCP(=6) 和 # IPPTOTO_UDP(=17),它们分别对应 TCP 传输协议、UDP 传输协议。与 IP 数据包的 ``8位协议字段`` 对应。 for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags)): af, socktype, proto, canonname, sockaddr = res if (platform.system() == 'Darwin'and address == 'localhost'and af == socket.AF_INET6 and sockaddr[3] != 0): # Mac OS X includes a link-local address fe80::1%lo0 in the # getaddrinfo results for 'localhost'. However, the firewall # doesn't understand that this is a local address and will # prompt for access (often repeatedly, due to an apparent # bug in its ability to remember granting access to an # application). Skip these addresses. # # address = 'localhost' 时 Mac OS X 可能会返回一个 ipv6 地址 fe80::1%lo0, # 而防火墙不能识别出这是一个本地地址而尝试访问会导致 bug ,所以这里忽略这个地址。 # ipv6 二进制 128 位,以 16 位为一组,每组以 `:` 分开,`::` 表示一组0或者多组连续的0, # 但是只能出现 1 次。 # sockaddr is a tuple describing a socket address, whose format # depends on the returned family (a (address, port) 2-tuple for # AF_INET, a (address, port, flow info, scope id) 4-tuple for AF_INET6) continue try: sock = socket.socket(af, socktype, proto) except socket.error as e: if errno_from_exception(e) == errno.EAFNOSUPPORT: continue raise # 为 fd 设置 FD_CLOEXEC 标识 set_close_exec(sock.fileno()) if os.name != 'nt': # 避免在服务器重启的时候发生“该地址以被使用”这种错误。 # socket.SOL_SOCKET 指定在套接字级别设置 `可选项`。 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if af == socket.AF_INET6: # On linux, ipv6 sockets accept ipv4 too by default, # but this makes it impossible to bind to both # 0.0.0.0 in ipv4 and :: in ipv6. On other systems, # separate sockets *must* be used to listen for both ipv4 # and ipv6. For consistency, always disable ipv4 on our # ipv6 sockets and use a separate ipv4 socket when needed. # # Python 2.x on windows doesn't have IPPROTO_IPV6. if hasattr(socket, "IPPROTO_IPV6"): sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
# automatic port allocation with port=None # should bind on the same port on IPv4 and IPv6 host, requested_port = sockaddr[:2] if requested_port == 0and bound_port isnotNone: sockaddr = tuple([host, bound_port] + list(sockaddr[2:]))
deffork_processes(num_processes, max_restarts=100): """Starts multiple worker processes. If ``num_processes`` is None or <= 0, we detect the number of cores available on this machine and fork that number of child processes. If ``num_processes`` is given and > 0, we fork that specific number of sub-processes. Since we use processes and not threads, there is no shared memory between any server code. Note that multiple processes are not compatible with the autoreload module (or the ``autoreload=True`` option to `tornado.web.Application` which defaults to True when ``debug=True``). When using multiple processes, no IOLoops can be created or referenced until after the call to ``fork_processes``. In each child process, ``fork_processes`` returns its *task id*, a number between 0 and ``num_processes``. Processes that exit abnormally (due to a signal or non-zero exit status) are restarted with the same id (up to ``max_restarts`` times). In the parent process, ``fork_processes`` returns None if all child processes have exited normally, but will otherwise only exit by throwing an exception. """ global _task_id assert _task_id isNone # num_processes 为 None 或者 <=0,则使用 cpu 核数。 if num_processes isNoneor num_processes <= 0: num_processes = cpu_count()
if ioloop.IOLoop.initialized(): raise RuntimeError("Cannot run in multiple processes: IOLoop instance " "has already been initialized. You cannot call " "IOLoop.instance() before calling start_processes()") gen_log.info("Starting %d processes", num_processes) children = {}
defstart_child(i): pid = os.fork() if pid == 0: # child process _reseed_random() global _task_id _task_id = i return i else: children[pid] = i returnNone for i in range(num_processes): id = start_child(i) if id isnotNone: return id
# master 进程负责监控子进程,若子进程异常结束(due to a signal or non-zero exit status), # 则负责重启子进程。`num_restarts` 变量记录了累计重启的子进程数量,若该值大于 `max_restarts`, # 则抛出运行时异常。 num_restarts = 0 while children: try: pid, status = os.wait() except OSError as e: if errno_from_exception(e) == errno.EINTR: continue raise if pid notin children: continue id = children.pop(pid) if os.WIFSIGNALED(status): gen_log.warning("child %d (pid %d) killed by signal %d, restarting", id, pid, os.WTERMSIG(status)) elif os.WEXITSTATUS(status) != 0: gen_log.warning("child %d (pid %d) exited with status %d, restarting", id, pid, os.WEXITSTATUS(status)) else: gen_log.info("child %d (pid %d) exited normally", id, pid) continue num_restarts += 1 if num_restarts > max_restarts: raise RuntimeError("Too many child restarts, giving up") new_id = start_child(id) if new_id isnotNone: return new_id # All child processes exited cleanly, so exit the master process # instead of just returning to right after the call to # fork_processes (which will probably just start up another IOLoop # unless the caller checks the return value). sys.exit(0)TLS
defadd_accept_handler(sock, callback, io_loop=None): if io_loop isNone: io_loop = IOLoop.current()
defaccept_handler(fd, events): # More connections may come in while we're handling callbacks; # to prevent starvation of other tasks we must limit the number # of connections we accept at a time. Ideally we would accept # up to the number of connections that were waiting when we # entered this method, but this information is not available # (and rearranging this method to call accept() as many times # as possible before running any callbacks would have adverse # effects on load balancing in multiprocess configurations). # Instead, we use the (default) listen backlog as a rough # heuristic for the number of connections we can reasonably # accept at once. for i in xrange(_DEFAULT_BACKLOG): try: connection, address = sock.accept() except socket.error as e: # _ERRNO_WOULDBLOCK indicate we have accepted every # connection that is available. if errno_from_exception(e) in _ERRNO_WOULDBLOCK: return # ECONNABORTED indicates that there was a connection # but it was closed while still in the accept queue. # (observed on FreeBSD). if errno_from_exception(e) == errno.ECONNABORTED: continue raise callback(connection, address) io_loop.add_handler(sock, accept_handler, IOLoop.READ)