Edit File by line
/home/barbar84/public_h.../wp-conte.../plugins/sujqvwi/ShExBy/shex_roo.../lib64/python3..../asyncio
File: streams.py
"""Stream-related things."""
[0] Fix | Delete
[1] Fix | Delete
__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
[2] Fix | Delete
'open_connection', 'start_server',
[3] Fix | Delete
'IncompleteReadError',
[4] Fix | Delete
'LimitOverrunError',
[5] Fix | Delete
]
[6] Fix | Delete
[7] Fix | Delete
import socket
[8] Fix | Delete
[9] Fix | Delete
if hasattr(socket, 'AF_UNIX'):
[10] Fix | Delete
__all__.extend(['open_unix_connection', 'start_unix_server'])
[11] Fix | Delete
[12] Fix | Delete
from . import coroutines
[13] Fix | Delete
from . import compat
[14] Fix | Delete
from . import events
[15] Fix | Delete
from . import protocols
[16] Fix | Delete
from .coroutines import coroutine
[17] Fix | Delete
from .log import logger
[18] Fix | Delete
[19] Fix | Delete
[20] Fix | Delete
_DEFAULT_LIMIT = 2 ** 16
[21] Fix | Delete
[22] Fix | Delete
[23] Fix | Delete
class IncompleteReadError(EOFError):
[24] Fix | Delete
"""
[25] Fix | Delete
Incomplete read error. Attributes:
[26] Fix | Delete
[27] Fix | Delete
- partial: read bytes string before the end of stream was reached
[28] Fix | Delete
- expected: total number of expected bytes (or None if unknown)
[29] Fix | Delete
"""
[30] Fix | Delete
def __init__(self, partial, expected):
[31] Fix | Delete
super().__init__("%d bytes read on a total of %r expected bytes"
[32] Fix | Delete
% (len(partial), expected))
[33] Fix | Delete
self.partial = partial
[34] Fix | Delete
self.expected = expected
[35] Fix | Delete
[36] Fix | Delete
def __reduce__(self):
[37] Fix | Delete
return type(self), (self.partial, self.expected)
[38] Fix | Delete
[39] Fix | Delete
[40] Fix | Delete
class LimitOverrunError(Exception):
[41] Fix | Delete
"""Reached the buffer limit while looking for a separator.
[42] Fix | Delete
[43] Fix | Delete
Attributes:
[44] Fix | Delete
- consumed: total number of to be consumed bytes.
[45] Fix | Delete
"""
[46] Fix | Delete
def __init__(self, message, consumed):
[47] Fix | Delete
super().__init__(message)
[48] Fix | Delete
self.consumed = consumed
[49] Fix | Delete
[50] Fix | Delete
def __reduce__(self):
[51] Fix | Delete
return type(self), (self.args[0], self.consumed)
[52] Fix | Delete
[53] Fix | Delete
[54] Fix | Delete
@coroutine
[55] Fix | Delete
def open_connection(host=None, port=None, *,
[56] Fix | Delete
loop=None, limit=_DEFAULT_LIMIT, **kwds):
[57] Fix | Delete
"""A wrapper for create_connection() returning a (reader, writer) pair.
[58] Fix | Delete
[59] Fix | Delete
The reader returned is a StreamReader instance; the writer is a
[60] Fix | Delete
StreamWriter instance.
[61] Fix | Delete
[62] Fix | Delete
The arguments are all the usual arguments to create_connection()
[63] Fix | Delete
except protocol_factory; most common are positional host and port,
[64] Fix | Delete
with various optional keyword arguments following.
[65] Fix | Delete
[66] Fix | Delete
Additional optional keyword arguments are loop (to set the event loop
[67] Fix | Delete
instance to use) and limit (to set the buffer limit passed to the
[68] Fix | Delete
StreamReader).
[69] Fix | Delete
[70] Fix | Delete
(If you want to customize the StreamReader and/or
[71] Fix | Delete
StreamReaderProtocol classes, just copy the code -- there's
[72] Fix | Delete
really nothing special here except some convenience.)
[73] Fix | Delete
"""
[74] Fix | Delete
if loop is None:
[75] Fix | Delete
loop = events.get_event_loop()
[76] Fix | Delete
reader = StreamReader(limit=limit, loop=loop)
[77] Fix | Delete
protocol = StreamReaderProtocol(reader, loop=loop)
[78] Fix | Delete
transport, _ = yield from loop.create_connection(
[79] Fix | Delete
lambda: protocol, host, port, **kwds)
[80] Fix | Delete
writer = StreamWriter(transport, protocol, reader, loop)
[81] Fix | Delete
return reader, writer
[82] Fix | Delete
[83] Fix | Delete
[84] Fix | Delete
@coroutine
[85] Fix | Delete
def start_server(client_connected_cb, host=None, port=None, *,
[86] Fix | Delete
loop=None, limit=_DEFAULT_LIMIT, **kwds):
[87] Fix | Delete
"""Start a socket server, call back for each client connected.
[88] Fix | Delete
[89] Fix | Delete
The first parameter, `client_connected_cb`, takes two parameters:
[90] Fix | Delete
client_reader, client_writer. client_reader is a StreamReader
[91] Fix | Delete
object, while client_writer is a StreamWriter object. This
[92] Fix | Delete
parameter can either be a plain callback function or a coroutine;
[93] Fix | Delete
if it is a coroutine, it will be automatically converted into a
[94] Fix | Delete
Task.
[95] Fix | Delete
[96] Fix | Delete
The rest of the arguments are all the usual arguments to
[97] Fix | Delete
loop.create_server() except protocol_factory; most common are
[98] Fix | Delete
positional host and port, with various optional keyword arguments
[99] Fix | Delete
following. The return value is the same as loop.create_server().
[100] Fix | Delete
[101] Fix | Delete
Additional optional keyword arguments are loop (to set the event loop
[102] Fix | Delete
instance to use) and limit (to set the buffer limit passed to the
[103] Fix | Delete
StreamReader).
[104] Fix | Delete
[105] Fix | Delete
The return value is the same as loop.create_server(), i.e. a
[106] Fix | Delete
Server object which can be used to stop the service.
[107] Fix | Delete
"""
[108] Fix | Delete
if loop is None:
[109] Fix | Delete
loop = events.get_event_loop()
[110] Fix | Delete
[111] Fix | Delete
def factory():
[112] Fix | Delete
reader = StreamReader(limit=limit, loop=loop)
[113] Fix | Delete
protocol = StreamReaderProtocol(reader, client_connected_cb,
[114] Fix | Delete
loop=loop)
[115] Fix | Delete
return protocol
[116] Fix | Delete
[117] Fix | Delete
return (yield from loop.create_server(factory, host, port, **kwds))
[118] Fix | Delete
[119] Fix | Delete
[120] Fix | Delete
if hasattr(socket, 'AF_UNIX'):
[121] Fix | Delete
# UNIX Domain Sockets are supported on this platform
[122] Fix | Delete
[123] Fix | Delete
@coroutine
[124] Fix | Delete
def open_unix_connection(path=None, *,
[125] Fix | Delete
loop=None, limit=_DEFAULT_LIMIT, **kwds):
[126] Fix | Delete
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
[127] Fix | Delete
if loop is None:
[128] Fix | Delete
loop = events.get_event_loop()
[129] Fix | Delete
reader = StreamReader(limit=limit, loop=loop)
[130] Fix | Delete
protocol = StreamReaderProtocol(reader, loop=loop)
[131] Fix | Delete
transport, _ = yield from loop.create_unix_connection(
[132] Fix | Delete
lambda: protocol, path, **kwds)
[133] Fix | Delete
writer = StreamWriter(transport, protocol, reader, loop)
[134] Fix | Delete
return reader, writer
[135] Fix | Delete
[136] Fix | Delete
@coroutine
[137] Fix | Delete
def start_unix_server(client_connected_cb, path=None, *,
[138] Fix | Delete
loop=None, limit=_DEFAULT_LIMIT, **kwds):
[139] Fix | Delete
"""Similar to `start_server` but works with UNIX Domain Sockets."""
[140] Fix | Delete
if loop is None:
[141] Fix | Delete
loop = events.get_event_loop()
[142] Fix | Delete
[143] Fix | Delete
def factory():
[144] Fix | Delete
reader = StreamReader(limit=limit, loop=loop)
[145] Fix | Delete
protocol = StreamReaderProtocol(reader, client_connected_cb,
[146] Fix | Delete
loop=loop)
[147] Fix | Delete
return protocol
[148] Fix | Delete
[149] Fix | Delete
return (yield from loop.create_unix_server(factory, path, **kwds))
[150] Fix | Delete
[151] Fix | Delete
[152] Fix | Delete
class FlowControlMixin(protocols.Protocol):
[153] Fix | Delete
"""Reusable flow control logic for StreamWriter.drain().
[154] Fix | Delete
[155] Fix | Delete
This implements the protocol methods pause_writing(),
[156] Fix | Delete
resume_reading() and connection_lost(). If the subclass overrides
[157] Fix | Delete
these it must call the super methods.
[158] Fix | Delete
[159] Fix | Delete
StreamWriter.drain() must wait for _drain_helper() coroutine.
[160] Fix | Delete
"""
[161] Fix | Delete
[162] Fix | Delete
def __init__(self, loop=None):
[163] Fix | Delete
if loop is None:
[164] Fix | Delete
self._loop = events.get_event_loop()
[165] Fix | Delete
else:
[166] Fix | Delete
self._loop = loop
[167] Fix | Delete
self._paused = False
[168] Fix | Delete
self._drain_waiter = None
[169] Fix | Delete
self._connection_lost = False
[170] Fix | Delete
[171] Fix | Delete
def pause_writing(self):
[172] Fix | Delete
assert not self._paused
[173] Fix | Delete
self._paused = True
[174] Fix | Delete
if self._loop.get_debug():
[175] Fix | Delete
logger.debug("%r pauses writing", self)
[176] Fix | Delete
[177] Fix | Delete
def resume_writing(self):
[178] Fix | Delete
assert self._paused
[179] Fix | Delete
self._paused = False
[180] Fix | Delete
if self._loop.get_debug():
[181] Fix | Delete
logger.debug("%r resumes writing", self)
[182] Fix | Delete
[183] Fix | Delete
waiter = self._drain_waiter
[184] Fix | Delete
if waiter is not None:
[185] Fix | Delete
self._drain_waiter = None
[186] Fix | Delete
if not waiter.done():
[187] Fix | Delete
waiter.set_result(None)
[188] Fix | Delete
[189] Fix | Delete
def connection_lost(self, exc):
[190] Fix | Delete
self._connection_lost = True
[191] Fix | Delete
# Wake up the writer if currently paused.
[192] Fix | Delete
if not self._paused:
[193] Fix | Delete
return
[194] Fix | Delete
waiter = self._drain_waiter
[195] Fix | Delete
if waiter is None:
[196] Fix | Delete
return
[197] Fix | Delete
self._drain_waiter = None
[198] Fix | Delete
if waiter.done():
[199] Fix | Delete
return
[200] Fix | Delete
if exc is None:
[201] Fix | Delete
waiter.set_result(None)
[202] Fix | Delete
else:
[203] Fix | Delete
waiter.set_exception(exc)
[204] Fix | Delete
[205] Fix | Delete
@coroutine
[206] Fix | Delete
def _drain_helper(self):
[207] Fix | Delete
if self._connection_lost:
[208] Fix | Delete
raise ConnectionResetError('Connection lost')
[209] Fix | Delete
if not self._paused:
[210] Fix | Delete
return
[211] Fix | Delete
waiter = self._drain_waiter
[212] Fix | Delete
assert waiter is None or waiter.cancelled()
[213] Fix | Delete
waiter = self._loop.create_future()
[214] Fix | Delete
self._drain_waiter = waiter
[215] Fix | Delete
yield from waiter
[216] Fix | Delete
[217] Fix | Delete
[218] Fix | Delete
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
[219] Fix | Delete
"""Helper class to adapt between Protocol and StreamReader.
[220] Fix | Delete
[221] Fix | Delete
(This is a helper class instead of making StreamReader itself a
[222] Fix | Delete
Protocol subclass, because the StreamReader has other potential
[223] Fix | Delete
uses, and to prevent the user of the StreamReader to accidentally
[224] Fix | Delete
call inappropriate methods of the protocol.)
[225] Fix | Delete
"""
[226] Fix | Delete
[227] Fix | Delete
def __init__(self, stream_reader, client_connected_cb=None, loop=None):
[228] Fix | Delete
super().__init__(loop=loop)
[229] Fix | Delete
self._stream_reader = stream_reader
[230] Fix | Delete
self._stream_writer = None
[231] Fix | Delete
self._client_connected_cb = client_connected_cb
[232] Fix | Delete
self._over_ssl = False
[233] Fix | Delete
[234] Fix | Delete
def connection_made(self, transport):
[235] Fix | Delete
self._stream_reader.set_transport(transport)
[236] Fix | Delete
self._over_ssl = transport.get_extra_info('sslcontext') is not None
[237] Fix | Delete
if self._client_connected_cb is not None:
[238] Fix | Delete
self._stream_writer = StreamWriter(transport, self,
[239] Fix | Delete
self._stream_reader,
[240] Fix | Delete
self._loop)
[241] Fix | Delete
res = self._client_connected_cb(self._stream_reader,
[242] Fix | Delete
self._stream_writer)
[243] Fix | Delete
if coroutines.iscoroutine(res):
[244] Fix | Delete
self._loop.create_task(res)
[245] Fix | Delete
[246] Fix | Delete
def connection_lost(self, exc):
[247] Fix | Delete
if self._stream_reader is not None:
[248] Fix | Delete
if exc is None:
[249] Fix | Delete
self._stream_reader.feed_eof()
[250] Fix | Delete
else:
[251] Fix | Delete
self._stream_reader.set_exception(exc)
[252] Fix | Delete
super().connection_lost(exc)
[253] Fix | Delete
self._stream_reader = None
[254] Fix | Delete
self._stream_writer = None
[255] Fix | Delete
[256] Fix | Delete
def data_received(self, data):
[257] Fix | Delete
self._stream_reader.feed_data(data)
[258] Fix | Delete
[259] Fix | Delete
def eof_received(self):
[260] Fix | Delete
self._stream_reader.feed_eof()
[261] Fix | Delete
if self._over_ssl:
[262] Fix | Delete
# Prevent a warning in SSLProtocol.eof_received:
[263] Fix | Delete
# "returning true from eof_received()
[264] Fix | Delete
# has no effect when using ssl"
[265] Fix | Delete
return False
[266] Fix | Delete
return True
[267] Fix | Delete
[268] Fix | Delete
[269] Fix | Delete
class StreamWriter:
[270] Fix | Delete
"""Wraps a Transport.
[271] Fix | Delete
[272] Fix | Delete
This exposes write(), writelines(), [can_]write_eof(),
[273] Fix | Delete
get_extra_info() and close(). It adds drain() which returns an
[274] Fix | Delete
optional Future on which you can wait for flow control. It also
[275] Fix | Delete
adds a transport property which references the Transport
[276] Fix | Delete
directly.
[277] Fix | Delete
"""
[278] Fix | Delete
[279] Fix | Delete
def __init__(self, transport, protocol, reader, loop):
[280] Fix | Delete
self._transport = transport
[281] Fix | Delete
self._protocol = protocol
[282] Fix | Delete
# drain() expects that the reader has an exception() method
[283] Fix | Delete
assert reader is None or isinstance(reader, StreamReader)
[284] Fix | Delete
self._reader = reader
[285] Fix | Delete
self._loop = loop
[286] Fix | Delete
[287] Fix | Delete
def __repr__(self):
[288] Fix | Delete
info = [self.__class__.__name__, 'transport=%r' % self._transport]
[289] Fix | Delete
if self._reader is not None:
[290] Fix | Delete
info.append('reader=%r' % self._reader)
[291] Fix | Delete
return '<%s>' % ' '.join(info)
[292] Fix | Delete
[293] Fix | Delete
@property
[294] Fix | Delete
def transport(self):
[295] Fix | Delete
return self._transport
[296] Fix | Delete
[297] Fix | Delete
def write(self, data):
[298] Fix | Delete
self._transport.write(data)
[299] Fix | Delete
[300] Fix | Delete
def writelines(self, data):
[301] Fix | Delete
self._transport.writelines(data)
[302] Fix | Delete
[303] Fix | Delete
def write_eof(self):
[304] Fix | Delete
return self._transport.write_eof()
[305] Fix | Delete
[306] Fix | Delete
def can_write_eof(self):
[307] Fix | Delete
return self._transport.can_write_eof()
[308] Fix | Delete
[309] Fix | Delete
def close(self):
[310] Fix | Delete
return self._transport.close()
[311] Fix | Delete
[312] Fix | Delete
def get_extra_info(self, name, default=None):
[313] Fix | Delete
return self._transport.get_extra_info(name, default)
[314] Fix | Delete
[315] Fix | Delete
@coroutine
[316] Fix | Delete
def drain(self):
[317] Fix | Delete
"""Flush the write buffer.
[318] Fix | Delete
[319] Fix | Delete
The intended use is to write
[320] Fix | Delete
[321] Fix | Delete
w.write(data)
[322] Fix | Delete
yield from w.drain()
[323] Fix | Delete
"""
[324] Fix | Delete
if self._reader is not None:
[325] Fix | Delete
exc = self._reader.exception()
[326] Fix | Delete
if exc is not None:
[327] Fix | Delete
raise exc
[328] Fix | Delete
if self._transport is not None:
[329] Fix | Delete
if self._transport.is_closing():
[330] Fix | Delete
# Yield to the event loop so connection_lost() may be
[331] Fix | Delete
# called. Without this, _drain_helper() would return
[332] Fix | Delete
# immediately, and code that calls
[333] Fix | Delete
# write(...); yield from drain()
[334] Fix | Delete
# in a loop would never call connection_lost(), so it
[335] Fix | Delete
# would not see an error when the socket is closed.
[336] Fix | Delete
yield
[337] Fix | Delete
yield from self._protocol._drain_helper()
[338] Fix | Delete
[339] Fix | Delete
[340] Fix | Delete
class StreamReader:
[341] Fix | Delete
[342] Fix | Delete
def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
[343] Fix | Delete
# The line length limit is a security feature;
[344] Fix | Delete
# it also doubles as half the buffer limit.
[345] Fix | Delete
[346] Fix | Delete
if limit <= 0:
[347] Fix | Delete
raise ValueError('Limit cannot be <= 0')
[348] Fix | Delete
[349] Fix | Delete
self._limit = limit
[350] Fix | Delete
if loop is None:
[351] Fix | Delete
self._loop = events.get_event_loop()
[352] Fix | Delete
else:
[353] Fix | Delete
self._loop = loop
[354] Fix | Delete
self._buffer = bytearray()
[355] Fix | Delete
self._eof = False # Whether we're done.
[356] Fix | Delete
self._waiter = None # A future used by _wait_for_data()
[357] Fix | Delete
self._exception = None
[358] Fix | Delete
self._transport = None
[359] Fix | Delete
self._paused = False
[360] Fix | Delete
[361] Fix | Delete
def __repr__(self):
[362] Fix | Delete
info = ['StreamReader']
[363] Fix | Delete
if self._buffer:
[364] Fix | Delete
info.append('%d bytes' % len(self._buffer))
[365] Fix | Delete
if self._eof:
[366] Fix | Delete
info.append('eof')
[367] Fix | Delete
if self._limit != _DEFAULT_LIMIT:
[368] Fix | Delete
info.append('l=%d' % self._limit)
[369] Fix | Delete
if self._waiter:
[370] Fix | Delete
info.append('w=%r' % self._waiter)
[371] Fix | Delete
if self._exception:
[372] Fix | Delete
info.append('e=%r' % self._exception)
[373] Fix | Delete
if self._transport:
[374] Fix | Delete
info.append('t=%r' % self._transport)
[375] Fix | Delete
if self._paused:
[376] Fix | Delete
info.append('paused')
[377] Fix | Delete
return '<%s>' % ' '.join(info)
[378] Fix | Delete
[379] Fix | Delete
def exception(self):
[380] Fix | Delete
return self._exception
[381] Fix | Delete
[382] Fix | Delete
def set_exception(self, exc):
[383] Fix | Delete
self._exception = exc
[384] Fix | Delete
[385] Fix | Delete
waiter = self._waiter
[386] Fix | Delete
if waiter is not None:
[387] Fix | Delete
self._waiter = None
[388] Fix | Delete
if not waiter.cancelled():
[389] Fix | Delete
waiter.set_exception(exc)
[390] Fix | Delete
[391] Fix | Delete
def _wakeup_waiter(self):
[392] Fix | Delete
"""Wakeup read*() functions waiting for data or EOF."""
[393] Fix | Delete
waiter = self._waiter
[394] Fix | Delete
if waiter is not None:
[395] Fix | Delete
self._waiter = None
[396] Fix | Delete
if not waiter.cancelled():
[397] Fix | Delete
waiter.set_result(None)
[398] Fix | Delete
[399] Fix | Delete
def set_transport(self, transport):
[400] Fix | Delete
assert self._transport is None, 'Transport already set'
[401] Fix | Delete
self._transport = transport
[402] Fix | Delete
[403] Fix | Delete
def _maybe_resume_transport(self):
[404] Fix | Delete
if self._paused and len(self._buffer) <= self._limit:
[405] Fix | Delete
self._paused = False
[406] Fix | Delete
self._transport.resume_reading()
[407] Fix | Delete
[408] Fix | Delete
def feed_eof(self):
[409] Fix | Delete
self._eof = True
[410] Fix | Delete
self._wakeup_waiter()
[411] Fix | Delete
[412] Fix | Delete
def at_eof(self):
[413] Fix | Delete
"""Return True if the buffer is empty and 'feed_eof' was called."""
[414] Fix | Delete
return self._eof and not self._buffer
[415] Fix | Delete
[416] Fix | Delete
def feed_data(self, data):
[417] Fix | Delete
assert not self._eof, 'feed_data after feed_eof'
[418] Fix | Delete
[419] Fix | Delete
if not data:
[420] Fix | Delete
return
[421] Fix | Delete
[422] Fix | Delete
self._buffer.extend(data)
[423] Fix | Delete
self._wakeup_waiter()
[424] Fix | Delete
[425] Fix | Delete
if (self._transport is not None and
[426] Fix | Delete
not self._paused and
[427] Fix | Delete
len(self._buffer) > 2 * self._limit):
[428] Fix | Delete
try:
[429] Fix | Delete
self._transport.pause_reading()
[430] Fix | Delete
except NotImplementedError:
[431] Fix | Delete
# The transport can't be paused.
[432] Fix | Delete
# We'll just have to buffer all data.
[433] Fix | Delete
# Forget the transport so we don't keep trying.
[434] Fix | Delete
self._transport = None
[435] Fix | Delete
else:
[436] Fix | Delete
self._paused = True
[437] Fix | Delete
[438] Fix | Delete
@coroutine
[439] Fix | Delete
def _wait_for_data(self, func_name):
[440] Fix | Delete
"""Wait until feed_data() or feed_eof() is called.
[441] Fix | Delete
[442] Fix | Delete
If stream was paused, automatically resume it.
[443] Fix | Delete
"""
[444] Fix | Delete
# StreamReader uses a future to link the protocol feed_data() method
[445] Fix | Delete
# to a read coroutine. Running two read coroutines at the same time
[446] Fix | Delete
# would have an unexpected behaviour. It would not possible to know
[447] Fix | Delete
# which coroutine would get the next data.
[448] Fix | Delete
if self._waiter is not None:
[449] Fix | Delete
raise RuntimeError('%s() called while another coroutine is '
[450] Fix | Delete
'already waiting for incoming data' % func_name)
[451] Fix | Delete
[452] Fix | Delete
assert not self._eof, '_wait_for_data after EOF'
[453] Fix | Delete
[454] Fix | Delete
# Waiting for data while paused will make deadlock, so prevent it.
[455] Fix | Delete
# This is essential for readexactly(n) for case when n > self._limit.
[456] Fix | Delete
if self._paused:
[457] Fix | Delete
self._paused = False
[458] Fix | Delete
self._transport.resume_reading()
[459] Fix | Delete
[460] Fix | Delete
self._waiter = self._loop.create_future()
[461] Fix | Delete
try:
[462] Fix | Delete
yield from self._waiter
[463] Fix | Delete
finally:
[464] Fix | Delete
self._waiter = None
[465] Fix | Delete
[466] Fix | Delete
@coroutine
[467] Fix | Delete
def readline(self):
[468] Fix | Delete
"""Read chunk of data from the stream until newline (b'\n') is found.
[469] Fix | Delete
[470] Fix | Delete
On success, return chunk that ends with newline. If only partial
[471] Fix | Delete
line can be read due to EOF, return incomplete line without
[472] Fix | Delete
terminating newline. When EOF was reached while no bytes read, empty
[473] Fix | Delete
bytes object is returned.
[474] Fix | Delete
[475] Fix | Delete
If limit is reached, ValueError will be raised. In that case, if
[476] Fix | Delete
newline was found, complete line including newline will be removed
[477] Fix | Delete
from internal buffer. Else, internal buffer will be cleared. Limit is
[478] Fix | Delete
compared against part of the line without newline.
[479] Fix | Delete
[480] Fix | Delete
If stream was paused, this function will automatically resume it if
[481] Fix | Delete
needed.
[482] Fix | Delete
"""
[483] Fix | Delete
sep = b'\n'
[484] Fix | Delete
seplen = len(sep)
[485] Fix | Delete
try:
[486] Fix | Delete
line = yield from self.readuntil(sep)
[487] Fix | Delete
except IncompleteReadError as e:
[488] Fix | Delete
return e.partial
[489] Fix | Delete
except LimitOverrunError as e:
[490] Fix | Delete
if self._buffer.startswith(sep, e.consumed):
[491] Fix | Delete
del self._buffer[:e.consumed + seplen]
[492] Fix | Delete
else:
[493] Fix | Delete
self._buffer.clear()
[494] Fix | Delete
self._maybe_resume_transport()
[495] Fix | Delete
raise ValueError(e.args[0])
[496] Fix | Delete
return line
[497] Fix | Delete
[498] Fix | Delete
@coroutine
[499] Fix | Delete
12
It is recommended that you Edit text format, this type of Fix handles quite a lot in one request
Function