defadd_future(self, future, callback): """Schedules a callback on the ``IOLoop`` when the given `.Future` is finished. The callback is invoked with one argument, the `.Future`. """ assert is_future(future) callback = stack_context.wrap(callback) # 在 future 的完成回调列表中增加一个 lambda 表达式,负责在 # 将 “callback” 加入 IOLoop 调度执行。 future.add_done_callback( lambda future: self.add_callback(callback, future))
defrun(self): """Starts or resumes the generator, running until it reaches a yield point that is not ready. """ if self.running or self.finished: return try: self.running = True whileTrue: future = self.future
# Tornado 4.0 之前使用 YieldPoint 驱动,Callback 与 Wait/WaitAll # 协调时,Callback 的回调结果需要 Runner 作为中转站,通过 # Runner.register_callback(key) 登记 Callback ,再通过 # YieldPoint.result_callback(key) 取回“设置(回调)方法”, # 外部通过“设置(回调)方法”把结果保存到 Runner.results 字典中。 # Wait/WaitAll 通过 get_result(key) 取回 结果。 # YieldFuture 的实现也采用了相同的实现方式。 # Tornado 4.0 之后使用 Future 代替 YieldPoint,这些已经过时。 # 与 Yield 相关的代码都是为了向后兼容。 if self.pending_callbacks andnot self.had_exception: # If we ran cleanly without waiting on all callbacks # raise an error (really more of a warning). If we # had an exception then some callbacks may have been # orphaned, so skip the check in that case. raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) self.result_future.set_result(getattr(e, 'value', None)) self.result_future = None self._deactivate_stack_context() return except Exception: self.finished = True self.future = _null_future self.result_future.set_exc_info(sys.exc_info()) self.result_future = None self._deactivate_stack_context() return
def__init__(self, gen, result_future, first_yielded): self.gen = genreturn_futurereturn_future self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() # For efficiency, we do not create a stack context until we # reach a YieldPoint (stack contexts are required for the historical # semantics of YieldPoints, but not for Futures). When we have # done so, this field will be set and must be called at the end # of the coroutine. self.stack_context_deactivate = None if self.handle_yield(first_yielded): self.run()
def_argument_adapter(callback): """Returns a function that when invoked runs ``callback`` with one arg. If the function returned by this function is called with exactly one argument, that argument is passed to ``callback``. Otherwise the args tuple and kwargs dict are wrapped in an `Arguments` object. """ defwrapper(*args, **kwargs): if kwargs or len(args) > 1: callback(Arguments(args, kwargs)) elif args: callback(args[0]) else: callback(None) return wrapper