"""Event loop and event loop policy."""
__all__ = ['AbstractEventLoopPolicy',
'AbstractEventLoop', 'AbstractServer',
'get_event_loop_policy', 'set_event_loop_policy',
'get_event_loop', 'set_event_loop', 'new_event_loop',
'get_child_watcher', 'set_child_watcher',
'_set_running_loop', '_get_running_loop',
def _get_function_source(func):
func = inspect.unwrap(func)
elif hasattr(func, '__wrapped__'):
if inspect.isfunction(func):
return (code.co_filename, code.co_firstlineno)
if isinstance(func, functools.partial):
return _get_function_source(func.func)
if compat.PY34 and isinstance(func, functools.partialmethod):
return _get_function_source(func.func)
def _format_args_and_kwargs(args, kwargs):
"""Format function arguments and keyword arguments.
Special case for a single parameter: ('hello',) is formatted as ('hello').
# use reprlib to limit the length of the output
items.extend(reprlib.repr(arg) for arg in args)
items.extend('{}={}'.format(k, reprlib.repr(v))
for k, v in kwargs.items())
return '(' + ', '.join(items) + ')'
def _format_callback(func, args, kwargs, suffix=''):
if isinstance(func, functools.partial):
suffix = _format_args_and_kwargs(args, kwargs) + suffix
return _format_callback(func.func, func.args, func.keywords, suffix)
if hasattr(func, '__qualname__') and func.__qualname__:
func_repr = func.__qualname__
elif hasattr(func, '__name__') and func.__name__:
func_repr = func.__name__
func_repr += _format_args_and_kwargs(args, kwargs)
def _format_callback_source(func, args):
func_repr = _format_callback(func, args, None)
source = _get_function_source(func)
func_repr += ' at %s:%s' % source
def extract_stack(f=None, limit=None):
"""Replacement for traceback.extract_stack() that only does the
necessary work for asyncio debug mode.
f = sys._getframe().f_back
# Limit the amount of work to a reasonable amount, as extract_stack()
# can be called for each coroutine and future in debug mode.
limit = constants.DEBUG_STACK_DEPTH
stack = traceback.StackSummary.extract(traceback.walk_stack(f),
"""Object returned by callback registration methods."""
__slots__ = ('_callback', '_args', '_cancelled', '_loop',
'_source_traceback', '_repr', '__weakref__')
def __init__(self, callback, args, loop):
self._callback = callback
if self._loop.get_debug():
self._source_traceback = extract_stack(sys._getframe(1))
self._source_traceback = None
info = [self.__class__.__name__]
if self._callback is not None:
info.append(_format_callback_source(self._callback, self._args))
if self._source_traceback:
frame = self._source_traceback[-1]
info.append('created at %s:%s' % (frame[0], frame[1]))
if self._repr is not None:
return '<%s>' % ' '.join(info)
if self._loop.get_debug():
# Keep a representation in debug mode to keep callback and
# parameters. For example, to log the warning
# "Executing <Handle...> took 2.5 second"
self._callback(*self._args)
cb = _format_callback_source(self._callback, self._args)
msg = 'Exception in callback {}'.format(cb)
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
self = None # Needed to break cycles when an exception occurs.
class TimerHandle(Handle):
"""Object returned by timed callback registration methods."""
__slots__ = ['_scheduled', '_when']
def __init__(self, when, callback, args, loop):
super().__init__(callback, args, loop)
if self._source_traceback:
del self._source_traceback[-1]
info = super()._repr_info()
pos = 2 if self._cancelled else 1
info.insert(pos, 'when=%s' % self._when)
return self._when < other._when
if self._when < other._when:
return self.__eq__(other)
return self._when > other._when
if self._when > other._when:
return self.__eq__(other)
if isinstance(other, TimerHandle):
return (self._when == other._when and
self._callback == other._callback and
self._args == other._args and
self._cancelled == other._cancelled)
equal = self.__eq__(other)
return NotImplemented if equal is NotImplemented else not equal
self._loop._timer_handle_cancelled(self)
"""Abstract server returned by create_server()."""
"""Stop serving. This leaves existing connections open."""
"""Coroutine to wait until service is closed."""
"""Abstract event loop."""
# Running and stopping the event loop.
"""Run the event loop until stop() is called."""
raise NotImplementedError
def run_until_complete(self, future):
"""Run the event loop until a Future is done.
Return the Future's result, or raise its exception.
raise NotImplementedError
"""Stop the event loop as soon as reasonable.
Exactly how soon that is may depend on the implementation, but
no more I/O callbacks should be scheduled.
raise NotImplementedError
"""Return whether the event loop is currently running."""
raise NotImplementedError
"""Returns True if the event loop was closed."""
raise NotImplementedError
The loop should not be running.
This is idempotent and irreversible.
No other methods should be called after this one.
raise NotImplementedError
def shutdown_asyncgens(self):
"""Shutdown all active asynchronous generators."""
raise NotImplementedError
# Methods scheduling callbacks. All these return Handles.
def _timer_handle_cancelled(self, handle):
"""Notification that a TimerHandle has been cancelled."""
raise NotImplementedError
def call_soon(self, callback, *args):
return self.call_later(0, callback, *args)
def call_later(self, delay, callback, *args):
raise NotImplementedError
def call_at(self, when, callback, *args):
raise NotImplementedError
raise NotImplementedError
raise NotImplementedError
# Method scheduling a coroutine object: create a task.
def create_task(self, coro):
raise NotImplementedError
# Methods for interacting with threads.
def call_soon_threadsafe(self, callback, *args):
raise NotImplementedError
def run_in_executor(self, executor, func, *args):
raise NotImplementedError
def set_default_executor(self, executor):
raise NotImplementedError
# Network I/O methods returning Futures.
def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
raise NotImplementedError
def getnameinfo(self, sockaddr, flags=0):
raise NotImplementedError
def create_connection(self, protocol_factory, host=None, port=None, *,
ssl=None, family=0, proto=0, flags=0, sock=None,
local_addr=None, server_hostname=None):
raise NotImplementedError
def create_server(self, protocol_factory, host=None, port=None, *,
family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
sock=None, backlog=100, ssl=None, reuse_address=None,
"""A coroutine which creates a TCP server bound to host and port.
The return value is a Server object which can be used to stop
If host is an empty string or None all interfaces are assumed
and a list of multiple sockets will be returned (most likely
one for IPv4 and another one for IPv6). The host parameter can also be a
sequence (e.g. list) of hosts to bind to.
family can be set to either AF_INET or AF_INET6 to force the
socket to use IPv4 or IPv6. If not set it will be determined
from host (defaults to AF_UNSPEC).
flags is a bitmask for getaddrinfo().
sock can optionally be specified in order to use a preexisting
backlog is the maximum number of queued connections passed to
listen() (defaults to 100).
ssl can be set to an SSLContext to enable SSL over the
reuse_address tells the kernel to reuse a local socket in
TIME_WAIT state, without waiting for its natural timeout to
expire. If not specified will automatically be set to True on
reuse_port tells the kernel to allow this endpoint to be bound to
the same port as other existing endpoints are bound to, so long as
they all set this flag when being created. This option is not
raise NotImplementedError
def create_unix_connection(self, protocol_factory, path, *,
raise NotImplementedError
def create_unix_server(self, protocol_factory, path, *,
sock=None, backlog=100, ssl=None):
"""A coroutine which creates a UNIX Domain Socket server.
The return value is a Server object, which can be used to stop
path is a str, representing a file systsem path to bind the
sock can optionally be specified in order to use a preexisting
backlog is the maximum number of queued connections passed to
listen() (defaults to 100).
ssl can be set to an SSLContext to enable SSL over the
raise NotImplementedError
def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0,
reuse_address=None, reuse_port=None,
allow_broadcast=None, sock=None):
"""A coroutine which creates a datagram endpoint.
This method will try to establish the endpoint in the background.
When successful, the coroutine returns a (transport, protocol) pair.
protocol_factory must be a callable returning a protocol instance.
socket family AF_INET or socket.AF_INET6 depending on host (or
family if specified), socket type SOCK_DGRAM.
reuse_address tells the kernel to reuse a local socket in
TIME_WAIT state, without waiting for its natural timeout to
expire. If not specified it will automatically be set to True on
reuse_port tells the kernel to allow this endpoint to be bound to
the same port as other existing endpoints are bound to, so long as
they all set this flag when being created. This option is not
supported on Windows and some UNIX's. If the
:py:data:`~socket.SO_REUSEPORT` constant is not defined then this
capability is unsupported.
allow_broadcast tells the kernel to allow this endpoint to send
messages to the broadcast address.
sock can optionally be specified in order to use a preexisting
raise NotImplementedError
# Pipes and subprocesses.
def connect_read_pipe(self, protocol_factory, pipe):
"""Register read pipe in event loop. Set the pipe to non-blocking mode.
protocol_factory should instantiate object with Protocol interface.
pipe is a file-like object.
Return pair (transport, protocol), where transport supports the
ReadTransport interface."""
# The reason to accept file-like object instead of just file descriptor
# is: we need to own pipe and close it at transport finishing
# Can got complicated errors if pass f.fileno(),
# close fd in pipe transport then close f and vise versa.
raise NotImplementedError
def connect_write_pipe(self, protocol_factory, pipe):
"""Register write pipe in event loop.
protocol_factory should instantiate object with BaseProtocol interface.
Pipe is file-like object already switched to nonblocking.
Return pair (transport, protocol), where transport support
WriteTransport interface."""
# The reason to accept file-like object instead of just file descriptor
# is: we need to own pipe and close it at transport finishing
# Can got complicated errors if pass f.fileno(),
# close fd in pipe transport then close f and vise versa.
raise NotImplementedError
def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
raise NotImplementedError
def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
raise NotImplementedError
# Ready-based callback registration methods.
# The add_*() methods return None.
# The remove_*() methods return True if something was removed,
# False if there was nothing to delete.
def add_reader(self, fd, callback, *args):
raise NotImplementedError
def remove_reader(self, fd):
raise NotImplementedError
def add_writer(self, fd, callback, *args):
raise NotImplementedError
def remove_writer(self, fd):
raise NotImplementedError
# Completion based I/O methods returning Futures.
def sock_recv(self, sock, nbytes):
raise NotImplementedError
def sock_sendall(self, sock, data):
raise NotImplementedError
def sock_connect(self, sock, address):
raise NotImplementedError
def sock_accept(self, sock):
raise NotImplementedError
def add_signal_handler(self, sig, callback, *args):
raise NotImplementedError