"""Stream-related things."""
__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
'open_connection', 'start_server',
if hasattr(socket, 'AF_UNIX'):
__all__.extend(['open_unix_connection', 'start_unix_server'])
from .coroutines import coroutine
class IncompleteReadError(EOFError):
Incomplete read error. Attributes:
- partial: read bytes string before the end of stream was reached
- expected: total number of expected bytes (or None if unknown)
def __init__(self, partial, expected):
super().__init__("%d bytes read on a total of %r expected bytes"
% (len(partial), expected))
return type(self), (self.partial, self.expected)
class LimitOverrunError(Exception):
"""Reached the buffer limit while looking for a separator.
- consumed: total number of to be consumed bytes.
def __init__(self, message, consumed):
super().__init__(message)
return type(self), (self.args[0], self.consumed)
def open_connection(host=None, port=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""A wrapper for create_connection() returning a (reader, writer) pair.
The reader returned is a StreamReader instance; the writer is a
The arguments are all the usual arguments to create_connection()
except protocol_factory; most common are positional host and port,
with various optional keyword arguments following.
Additional optional keyword arguments are loop (to set the event loop
instance to use) and limit (to set the buffer limit passed to the
(If you want to customize the StreamReader and/or
StreamReaderProtocol classes, just copy the code -- there's
really nothing special here except some convenience.)
loop = events.get_event_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = yield from loop.create_connection(
lambda: protocol, host, port, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
def start_server(client_connected_cb, host=None, port=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Start a socket server, call back for each client connected.
The first parameter, `client_connected_cb`, takes two parameters:
client_reader, client_writer. client_reader is a StreamReader
object, while client_writer is a StreamWriter object. This
parameter can either be a plain callback function or a coroutine;
if it is a coroutine, it will be automatically converted into a
The rest of the arguments are all the usual arguments to
loop.create_server() except protocol_factory; most common are
positional host and port, with various optional keyword arguments
following. The return value is the same as loop.create_server().
Additional optional keyword arguments are loop (to set the event loop
instance to use) and limit (to set the buffer limit passed to the
The return value is the same as loop.create_server(), i.e. a
Server object which can be used to stop the service.
loop = events.get_event_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, client_connected_cb,
return (yield from loop.create_server(factory, host, port, **kwds))
if hasattr(socket, 'AF_UNIX'):
# UNIX Domain Sockets are supported on this platform
def open_unix_connection(path=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
loop = events.get_event_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = yield from loop.create_unix_connection(
lambda: protocol, path, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
def start_unix_server(client_connected_cb, path=None, *,
loop=None, limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `start_server` but works with UNIX Domain Sockets."""
loop = events.get_event_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, client_connected_cb,
return (yield from loop.create_unix_server(factory, path, **kwds))
class FlowControlMixin(protocols.Protocol):
"""Reusable flow control logic for StreamWriter.drain().
This implements the protocol methods pause_writing(),
resume_reading() and connection_lost(). If the subclass overrides
these it must call the super methods.
StreamWriter.drain() must wait for _drain_helper() coroutine.
def __init__(self, loop=None):
self._loop = events.get_event_loop()
self._drain_waiter = None
self._connection_lost = False
if self._loop.get_debug():
logger.debug("%r pauses writing", self)
def resume_writing(self):
if self._loop.get_debug():
logger.debug("%r resumes writing", self)
waiter = self._drain_waiter
self._drain_waiter = None
def connection_lost(self, exc):
self._connection_lost = True
# Wake up the writer if currently paused.
waiter = self._drain_waiter
self._drain_waiter = None
waiter.set_exception(exc)
if self._connection_lost:
raise ConnectionResetError('Connection lost')
waiter = self._drain_waiter
assert waiter is None or waiter.cancelled()
waiter = self._loop.create_future()
self._drain_waiter = waiter
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
"""Helper class to adapt between Protocol and StreamReader.
(This is a helper class instead of making StreamReader itself a
Protocol subclass, because the StreamReader has other potential
uses, and to prevent the user of the StreamReader to accidentally
call inappropriate methods of the protocol.)
def __init__(self, stream_reader, client_connected_cb=None, loop=None):
super().__init__(loop=loop)
self._stream_reader = stream_reader
self._stream_writer = None
self._client_connected_cb = client_connected_cb
def connection_made(self, transport):
self._stream_reader.set_transport(transport)
self._over_ssl = transport.get_extra_info('sslcontext') is not None
if self._client_connected_cb is not None:
self._stream_writer = StreamWriter(transport, self,
res = self._client_connected_cb(self._stream_reader,
if coroutines.iscoroutine(res):
self._loop.create_task(res)
def connection_lost(self, exc):
if self._stream_reader is not None:
self._stream_reader.feed_eof()
self._stream_reader.set_exception(exc)
super().connection_lost(exc)
self._stream_reader = None
self._stream_writer = None
def data_received(self, data):
self._stream_reader.feed_data(data)
self._stream_reader.feed_eof()
# Prevent a warning in SSLProtocol.eof_received:
# "returning true from eof_received()
# has no effect when using ssl"
This exposes write(), writelines(), [can_]write_eof(),
get_extra_info() and close(). It adds drain() which returns an
optional Future on which you can wait for flow control. It also
adds a transport property which references the Transport
def __init__(self, transport, protocol, reader, loop):
self._transport = transport
self._protocol = protocol
# drain() expects that the reader has an exception() method
assert reader is None or isinstance(reader, StreamReader)
info = [self.__class__.__name__, 'transport=%r' % self._transport]
if self._reader is not None:
info.append('reader=%r' % self._reader)
return '<%s>' % ' '.join(info)
self._transport.write(data)
def writelines(self, data):
self._transport.writelines(data)
return self._transport.write_eof()
return self._transport.can_write_eof()
return self._transport.close()
def get_extra_info(self, name, default=None):
return self._transport.get_extra_info(name, default)
"""Flush the write buffer.
The intended use is to write
if self._reader is not None:
exc = self._reader.exception()
if self._transport is not None:
if self._transport.is_closing():
# Yield to the event loop so connection_lost() may be
# called. Without this, _drain_helper() would return
# immediately, and code that calls
# write(...); yield from drain()
# in a loop would never call connection_lost(), so it
# would not see an error when the socket is closed.
yield from self._protocol._drain_helper()
def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
# The line length limit is a security feature;
# it also doubles as half the buffer limit.
raise ValueError('Limit cannot be <= 0')
self._loop = events.get_event_loop()
self._buffer = bytearray()
self._eof = False # Whether we're done.
self._waiter = None # A future used by _wait_for_data()
info.append('%d bytes' % len(self._buffer))
if self._limit != _DEFAULT_LIMIT:
info.append('l=%d' % self._limit)
info.append('w=%r' % self._waiter)
info.append('e=%r' % self._exception)
info.append('t=%r' % self._transport)
return '<%s>' % ' '.join(info)
def set_exception(self, exc):
if not waiter.cancelled():
waiter.set_exception(exc)
def _wakeup_waiter(self):
"""Wakeup read*() functions waiting for data or EOF."""
if not waiter.cancelled():
def set_transport(self, transport):
assert self._transport is None, 'Transport already set'
self._transport = transport
def _maybe_resume_transport(self):
if self._paused and len(self._buffer) <= self._limit:
self._transport.resume_reading()
"""Return True if the buffer is empty and 'feed_eof' was called."""
return self._eof and not self._buffer
def feed_data(self, data):
assert not self._eof, 'feed_data after feed_eof'
self._buffer.extend(data)
if (self._transport is not None and
len(self._buffer) > 2 * self._limit):
self._transport.pause_reading()
except NotImplementedError:
# The transport can't be paused.
# We'll just have to buffer all data.
# Forget the transport so we don't keep trying.
def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called.
If stream was paused, automatically resume it.
# StreamReader uses a future to link the protocol feed_data() method
# to a read coroutine. Running two read coroutines at the same time
# would have an unexpected behaviour. It would not possible to know
# which coroutine would get the next data.
if self._waiter is not None:
raise RuntimeError('%s() called while another coroutine is '
'already waiting for incoming data' % func_name)
assert not self._eof, '_wait_for_data after EOF'
# Waiting for data while paused will make deadlock, so prevent it.
# This is essential for readexactly(n) for case when n > self._limit.
self._transport.resume_reading()
self._waiter = self._loop.create_future()
"""Read chunk of data from the stream until newline (b'\n') is found.
On success, return chunk that ends with newline. If only partial
line can be read due to EOF, return incomplete line without
terminating newline. When EOF was reached while no bytes read, empty
bytes object is returned.
If limit is reached, ValueError will be raised. In that case, if
newline was found, complete line including newline will be removed
from internal buffer. Else, internal buffer will be cleared. Limit is
compared against part of the line without newline.
If stream was paused, this function will automatically resume it if
line = yield from self.readuntil(sep)
except IncompleteReadError as e:
except LimitOverrunError as e:
if self._buffer.startswith(sep, e.consumed):
del self._buffer[:e.consumed + seplen]
self._maybe_resume_transport()
raise ValueError(e.args[0])