# connection, and the user is reading more bytes than will be provided
# (for example, reading in 1k chunks)
# Ideally, we would raise IncompleteRead if the content-length
# wasn't satisfied, but it might break compatibility.
elif self.length is not None:
def _read_next_chunk_size(self):
# Read the next chunk size from the file
line = self.fp.readline(_MAXLINE + 1)
raise LineTooLong("chunk size")
line = line[:i] # strip chunk-extensions
# close the connection as protocol synchronisation is
def _read_and_discard_trailer(self):
# read and discard trailer up to the CRLF terminator
### note: we shouldn't have any trailers!
line = self.fp.readline(_MAXLINE + 1)
raise LineTooLong("trailer line")
# a vanishingly small number of sites EOF without
if line in (b'\r\n', b'\n', b''):
def _get_chunk_left(self):
# return self.chunk_left, reading a new chunk if necessary.
# chunk_left == 0: at the end of the current chunk, need to close it
# chunk_left == None: No current chunk, should read next.
# This function returns non-zero or None if the last chunk has
chunk_left = self.chunk_left
if not chunk_left: # Can be 0 or None
if chunk_left is not None:
# We are at the end of chunk, discard chunk end
self._safe_read(2) # toss the CRLF at the end of the chunk
chunk_left = self._read_next_chunk_size()
raise IncompleteRead(b'')
# last chunk: 1*("0") [ chunk-extension ] CRLF
self._read_and_discard_trailer()
# we read everything; close the "file"
self.chunk_left = chunk_left
def _readall_chunked(self):
assert self.chunked != _UNKNOWN
chunk_left = self._get_chunk_left()
value.append(self._safe_read(chunk_left))
raise IncompleteRead(b''.join(value))
def _readinto_chunked(self, b):
assert self.chunked != _UNKNOWN
chunk_left = self._get_chunk_left()
if len(mvb) <= chunk_left:
n = self._safe_readinto(mvb)
self.chunk_left = chunk_left - n
temp_mvb = mvb[:chunk_left]
n = self._safe_readinto(temp_mvb)
raise IncompleteRead(bytes(b[0:total_bytes]))
def _safe_read(self, amt):
"""Read the number of bytes requested.
This function should be used when <amt> bytes "should" be present for
reading. If the bytes are truly not available (due to EOF), then the
IncompleteRead exception can be used to detect the problem.
raise IncompleteRead(data, amt-len(data))
def _safe_readinto(self, b):
"""Same as _safe_read, but for reading into a buffer."""
raise IncompleteRead(bytes(b[:n]), amt-n)
"""Read with at most one underlying system call. If at least one
byte is buffered, return that instead.
if self.fp is None or self._method == "HEAD":
return self._read1_chunked(n)
if self.length is not None and (n < 0 or n > self.length):
result = self.fp.read1(n)
elif self.length is not None:
self.length -= len(result)
# Having this enables IOBase.readline() to read more than one
if self.fp is None or self._method == "HEAD":
return self._peek_chunked(n)
def readline(self, limit=-1):
if self.fp is None or self._method == "HEAD":
# Fallback to IOBase readline which uses peek() and read()
return super().readline(limit)
if self.length is not None and (limit < 0 or limit > self.length):
result = self.fp.readline(limit)
elif self.length is not None:
self.length -= len(result)
def _read1_chunked(self, n):
# Strictly speaking, _get_chunk_left() may cause more than one read,
# but that is ok, since that is to satisfy the chunked protocol.
chunk_left = self._get_chunk_left()
if chunk_left is None or n == 0:
if not (0 <= n <= chunk_left):
n = chunk_left # if n is negative or larger than chunk_left
self.chunk_left -= len(read)
raise IncompleteRead(b"")
def _peek_chunked(self, n):
# Strictly speaking, _get_chunk_left() may cause more than one read,
# but that is ok, since that is to satisfy the chunked protocol.
chunk_left = self._get_chunk_left()
return b'' # peek doesn't worry about protocol
# peek is allowed to return more than requested. Just request the
# entire chunk, and truncate what we get.
return self.fp.peek(chunk_left)[:chunk_left]
def getheader(self, name, default=None):
'''Returns the value of the header matching *name*.
If there are multiple matching headers, the values are
combined into a single string separated by commas and spaces.
If no matching header is found, returns *default* or None if
the *default* is not specified.
If the headers are unknown, raises http.client.ResponseNotReady.
headers = self.headers.get_all(name) or default
if isinstance(headers, str) or not hasattr(headers, '__iter__'):
return ', '.join(headers)
"""Return list of (header, value) tuples."""
return list(self.headers.items())
# We override IOBase.__iter__ so that it doesn't check for closed-ness
# For compatibility with old-style urllib responses.
'''Returns an instance of the class mimetools.Message containing
meta-information associated with the URL.
When the method is HTTP, these headers are those returned by
the server at the head of the retrieved HTML page (including
Content-Length and Content-Type).
When the method is FTP, a Content-Length header will be
present if (as is now usual) the server passed back a file
length in response to the FTP retrieval request. A
Content-Type header will be present if the MIME type can be
When the method is local-file, returned headers will include
a Date representing the file's last-modified time, a
Content-Length giving file size, and a Content-Type
containing a guess at the file's type. See also the
description of the mimetools module.
'''Return the real URL of the page.
In some cases, the HTTP server redirects a client to another
URL. The urlopen() function handles this transparently, but in
some cases the caller needs to know which URL the client was
redirected to. The geturl() method can be used to get at this
'''Return the HTTP status code that was sent with the response,
or None if the URL is not an HTTP URL.
_http_vsn_str = 'HTTP/1.1'
response_class = HTTPResponse
"""Test whether a file-like object is a text or a binary stream.
return isinstance(stream, io.TextIOBase)
def _get_content_length(body, method):
"""Get the content-length based on the body.
If the body is None, we set Content-Length: 0 for methods that expect
a body (RFC 7230, Section 3.3.2). We also set the Content-Length for
any method if the body is a str or bytes-like object and not a file.
# do an explicit check for not None here to distinguish
# between unset and set but empty
if method.upper() in _METHODS_EXPECTING_BODY:
if hasattr(body, 'read'):
# does it implement the buffer protocol (bytes, bytearray, array)?
if isinstance(body, str):
def __init__(self, host, port=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, blocksize=8192):
self.source_address = source_address
self.blocksize = blocksize
self._tunnel_headers = {}
(self.host, self.port) = self._get_hostport(host, port)
self._validate_host(self.host)
# This is stored as an instance variable to allow unit
# tests to replace it with a suitable mockup
self._create_connection = socket.create_connection
def set_tunnel(self, host, port=None, headers=None):
"""Set up host and port for HTTP CONNECT tunnelling.
In a connection that uses HTTP CONNECT tunneling, the host passed to the
constructor is used as a proxy server that relays all communication to
the endpoint passed to `set_tunnel`. This done by sending an HTTP
CONNECT request to the proxy server when the connection is established.
This method must be called before the HTTP connection has been
The headers argument should be a mapping of extra HTTP headers to send
with the CONNECT request.
raise RuntimeError("Can't set up tunnel for established connection")
self._tunnel_host, self._tunnel_port = self._get_hostport(host, port)
self._tunnel_headers = headers
self._tunnel_headers.clear()
def _get_hostport(self, host, port):
j = host.rfind(']') # ipv6 addresses have [...]
if host[i+1:] == "": # http://foo.com:/ == http://foo.com/
raise InvalidURL("nonnumeric port: '%s'" % host[i+1:])
if host and host[0] == '[' and host[-1] == ']':
def set_debuglevel(self, level):
connect_str = "CONNECT %s:%d HTTP/1.0\r\n" % (self._tunnel_host,
connect_bytes = connect_str.encode("ascii")
for header, value in self._tunnel_headers.items():
header_str = "%s: %s\r\n" % (header, value)
header_bytes = header_str.encode("latin-1")
response = self.response_class(self.sock, method=self._method)
(version, code, message) = response._read_status()
if code != http.HTTPStatus.OK:
raise OSError("Tunnel connection failed: %d %s" % (code,
line = response.fp.readline(_MAXLINE + 1)
raise LineTooLong("header line")
# for sites which EOF without sending a trailer
if line in (b'\r\n', b'\n', b''):
print('header:', line.decode())
"""Connect to the host and port specified in __init__."""
self.sock = self._create_connection(
(self.host,self.port), self.timeout, self.source_address)
self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
"""Close the connection to the HTTP server."""
sock.close() # close it manually... there may be other refs
response = self.__response
"""Send `data' to the server.
``data`` can be a string object, a bytes object, an array object, a
file-like object that supports a .read() method, or an iterable object.
print("send:", repr(data))
if hasattr(data, "read") :
print("sendIng a read()able")
encode = self._is_textIO(data)
if encode and self.debuglevel > 0:
print("encoding file using iso-8859-1")
datablock = data.read(self.blocksize)
datablock = datablock.encode("iso-8859-1")
self.sock.sendall(datablock)
if isinstance(data, collections.abc.Iterable):
raise TypeError("data should be a bytes-like object "
"or an iterable, got %r" % type(data))
"""Add a line of output to the current request buffer.
Assumes that the line does *not* end with \\r\\n.
def _read_readable(self, readable):
print("sendIng a read()able")
encode = self._is_textIO(readable)
if encode and self.debuglevel > 0:
print("encoding file using iso-8859-1")
datablock = readable.read(self.blocksize)
datablock = datablock.encode("iso-8859-1")