# Wrapper module for _ssl, providing some additional facilities
# implemented in Python. Written by Bill Janssen.
"""This module provides some more Pythonic support for SSL.
SSLSocket -- subtype of socket.socket which does SSL over the socket
SSLError -- exception raised for I/O errors
cert_time_to_seconds -- convert time string used for certificate
notBefore and notAfter functions to integer
seconds past the Epoch (the time values
returned from time.time())
fetch_server_certificate (HOST, PORT) -- fetch the certificate provided
by the server running on HOST at port PORT. No
validation of the certificate is performed.
SSL_ERROR_WANT_X509_LOOKUP
SSL_ERROR_INVALID_ERROR_CODE
The following group define certificate requirements that one side is
allowing/requiring from the other side:
CERT_NONE - no certificates from the other side are required (or will
be looked at if provided)
CERT_OPTIONAL - certificates are not required, but if provided will be
validated, and if validation fails, the connection will
CERT_REQUIRED - certificates are required, and will be validated, and
if validation fails, the connection will also fail
The following constants identify various SSL protocol variants:
The following constants identify various SSL alert message descriptions as per
http://www.iana.org/assignments/tls-parameters/tls-parameters.xml#tls-parameters-6
ALERT_DESCRIPTION_CLOSE_NOTIFY
ALERT_DESCRIPTION_UNEXPECTED_MESSAGE
ALERT_DESCRIPTION_BAD_RECORD_MAC
ALERT_DESCRIPTION_RECORD_OVERFLOW
ALERT_DESCRIPTION_DECOMPRESSION_FAILURE
ALERT_DESCRIPTION_HANDSHAKE_FAILURE
ALERT_DESCRIPTION_BAD_CERTIFICATE
ALERT_DESCRIPTION_UNSUPPORTED_CERTIFICATE
ALERT_DESCRIPTION_CERTIFICATE_REVOKED
ALERT_DESCRIPTION_CERTIFICATE_EXPIRED
ALERT_DESCRIPTION_CERTIFICATE_UNKNOWN
ALERT_DESCRIPTION_ILLEGAL_PARAMETER
ALERT_DESCRIPTION_UNKNOWN_CA
ALERT_DESCRIPTION_ACCESS_DENIED
ALERT_DESCRIPTION_DECODE_ERROR
ALERT_DESCRIPTION_DECRYPT_ERROR
ALERT_DESCRIPTION_PROTOCOL_VERSION
ALERT_DESCRIPTION_INSUFFICIENT_SECURITY
ALERT_DESCRIPTION_INTERNAL_ERROR
ALERT_DESCRIPTION_USER_CANCELLED
ALERT_DESCRIPTION_NO_RENEGOTIATION
ALERT_DESCRIPTION_UNSUPPORTED_EXTENSION
ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
ALERT_DESCRIPTION_UNRECOGNIZED_NAME
ALERT_DESCRIPTION_BAD_CERTIFICATE_STATUS_RESPONSE
ALERT_DESCRIPTION_BAD_CERTIFICATE_HASH_VALUE
ALERT_DESCRIPTION_UNKNOWN_PSK_IDENTITY
from collections import namedtuple
from enum import Enum as _Enum, IntEnum as _IntEnum, IntFlag as _IntFlag
import _ssl # if we can't import it, let the error propagate
from _ssl import OPENSSL_VERSION_NUMBER, OPENSSL_VERSION_INFO, OPENSSL_VERSION
from _ssl import _SSLContext, MemoryBIO, SSLSession
SSLError, SSLZeroReturnError, SSLWantReadError, SSLWantWriteError,
SSLSyscallError, SSLEOFError, SSLCertVerificationError
from _ssl import txt2obj as _txt2obj, nid2obj as _nid2obj
from _ssl import RAND_status, RAND_add, RAND_bytes, RAND_pseudo_bytes
from _ssl import RAND_egd
# LibreSSL does not provide RAND_egd
HAS_SNI, HAS_ECDH, HAS_NPN, HAS_ALPN, HAS_SSLv2, HAS_SSLv3, HAS_TLSv1,
HAS_TLSv1_1, HAS_TLSv1_2, HAS_TLSv1_3
from _ssl import _DEFAULT_CIPHERS, _OPENSSL_API_VERSION
lambda name: name.startswith('PROTOCOL_') and name != 'PROTOCOL_SSLv23',
lambda name: name.startswith('OP_'),
'AlertDescription', __name__,
lambda name: name.startswith('ALERT_DESCRIPTION_'),
'SSLErrorNumber', __name__,
lambda name: name.startswith('SSL_ERROR_'),
lambda name: name.startswith('VERIFY_'),
lambda name: name.startswith('CERT_'),
PROTOCOL_SSLv23 = _SSLMethod.PROTOCOL_SSLv23 = _SSLMethod.PROTOCOL_TLS
_PROTOCOL_NAMES = {value: name for name, value in _SSLMethod.__members__.items()}
_SSLv2_IF_EXISTS = getattr(_SSLMethod, 'PROTOCOL_SSLv2', None)
class TLSVersion(_IntEnum):
MINIMUM_SUPPORTED = _ssl.PROTO_MINIMUM_SUPPORTED
TLSv1_1 = _ssl.PROTO_TLSv1_1
TLSv1_2 = _ssl.PROTO_TLSv1_2
TLSv1_3 = _ssl.PROTO_TLSv1_3
MAXIMUM_SUPPORTED = _ssl.PROTO_MAXIMUM_SUPPORTED
class _TLSContentType(_IntEnum):
"""Content types (record layer)
See RFC 8446, section B.1
INNER_CONTENT_TYPE = 0x101
class _TLSAlertType(_IntEnum):
"""Alert types for TLSContentType.ALERT messages
See RFC 8466, section B.2
DECOMPRESSION_FAILURE = 30
UNSUPPORTED_CERTIFICATE = 43
INSUFFICIENT_SECURITY = 71
INAPPROPRIATE_FALLBACK = 86
UNSUPPORTED_EXTENSION = 110
CERTIFICATE_UNOBTAINABLE = 111
BAD_CERTIFICATE_STATUS_RESPONSE = 113
BAD_CERTIFICATE_HASH_VALUE = 114
UNKNOWN_PSK_IDENTITY = 115
CERTIFICATE_REQUIRED = 116
NO_APPLICATION_PROTOCOL = 120
class _TLSMessageType(_IntEnum):
"""Message types (handshake protocol)
See RFC 8446, section B.3
CHANGE_CIPHER_SPEC = 0x0101
if sys.platform == "win32":
from _ssl import enum_certificates, enum_crls
from socket import socket, AF_INET, SOCK_STREAM, create_connection
from socket import SOL_SOCKET, SO_TYPE
import base64 # for DER-to-PEM translation
socket_error = OSError # keep that public name in module namespace
CHANNEL_BINDING_TYPES = ['tls-unique']
HAS_NEVER_CHECK_COMMON_NAME = hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT')
_RESTRICTED_SERVER_CIPHERS = _DEFAULT_CIPHERS
CertificateError = SSLCertVerificationError
def _dnsname_match(dn, hostname):
"""Matching according to RFC 6125, section 6.4.3
- Hostnames are compared lower case.
- For IDNA, both dn and hostname must be encoded as IDN A-label (ACE).
- Partial wildcards like 'www*.example.org', multiple wildcards, sole
wildcard or wildcards in labels other then the left-most label are not
supported and a CertificateError is raised.
- A wildcard must match at least one character.
wildcards = dn.count('*')
# speed up common case w/o wildcards
return dn.lower() == hostname.lower()
"too many wildcards in certificate DNS name: {!r}.".format(dn))
dn_leftmost, sep, dn_remainder = dn.partition('.')
# Only match wildcard in leftmost segment.
"wildcard can only be present in the leftmost label: "
"sole wildcard without additional labels are not support: "
# no partial wildcard matching
"partial wildcards in leftmost label are not supported: "
hostname_leftmost, sep, hostname_remainder = hostname.partition('.')
if not hostname_leftmost or not sep:
# wildcard must match at least one char
return dn_remainder.lower() == hostname_remainder.lower()
"""Try to convert an IP address to packed binary form
Supports IPv4 addresses on all platforms and IPv6 on platforms with IPv6
# inet_aton() also accepts strings like '1', '127.1', some also trailing
# data like '127.0.0.1 whatever'.
addr = _socket.inet_aton(ipname)
if _socket.inet_ntoa(addr) == ipname:
# only accept injective ipnames
# refuse for short IPv4 notation and additional trailing data
"{!r} is not a quad-dotted IPv4 address.".format(ipname)
return _socket.inet_pton(_socket.AF_INET6, ipname)
raise ValueError("{!r} is neither an IPv4 nor an IP6 "
"address.".format(ipname))
raise ValueError("{!r} is not an IPv4 address.".format(ipname))
def _ipaddress_match(cert_ipaddress, host_ip):
"""Exact matching of IP addresses.
RFC 6125 explicitly doesn't define an algorithm for this
(section 1.7.2 - "Out of Scope").
# OpenSSL may add a trailing newline to a subjectAltName's IP address,
# commonly woth IPv6 addresses. Strip off trailing \n.
ip = _inet_paton(cert_ipaddress.rstrip())
def match_hostname(cert, hostname):
"""Verify that *cert* (in decoded format as returned by
SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125
The function matches IP addresses rather than dNSNames if hostname is a
valid ipaddress string. IPv4 addresses are supported on all platforms.
IPv6 addresses are supported on platforms with IPv6 support (AF_INET6
CertificateError is raised on failure. On success, the function
raise ValueError("empty or no certificate, match_hostname needs a "
"SSL socket or SSL context with either "
"CERT_OPTIONAL or CERT_REQUIRED")
host_ip = _inet_paton(hostname)
# Not an IP address (common case)
san = cert.get('subjectAltName', ())
if host_ip is None and _dnsname_match(value, hostname):
elif key == 'IP Address':
if host_ip is not None and _ipaddress_match(value, host_ip):
# The subject is only checked when there is no dNSName entry
for sub in cert.get('subject', ()):
# XXX according to RFC 2818, the most specific Common Name
if _dnsname_match(value, hostname):
raise CertificateError("hostname %r "
"doesn't match either of %s"
% (hostname, ', '.join(map(repr, dnsnames))))
raise CertificateError("hostname %r "
% (hostname, dnsnames[0]))
raise CertificateError("no appropriate commonName or "
"subjectAltName fields were found")
DefaultVerifyPaths = namedtuple("DefaultVerifyPaths",
"cafile capath openssl_cafile_env openssl_cafile openssl_capath_env "
def get_default_verify_paths():
"""Return paths to default cafile and capath.
parts = _ssl.get_default_verify_paths()
# environment vars shadow paths
cafile = os.environ.get(parts[0], parts[1])
capath = os.environ.get(parts[2], parts[3])
return DefaultVerifyPaths(cafile if os.path.isfile(cafile) else None,
capath if os.path.isdir(capath) else None,
class _ASN1Object(namedtuple("_ASN1Object", "nid shortname longname oid")):
"""ASN.1 object identifier lookup
return super().__new__(cls, *_txt2obj(oid, name=False))
"""Create _ASN1Object from OpenSSL numeric ID
return super().__new__(cls, *_nid2obj(nid))
"""Create _ASN1Object from short name, long name or OID
return super().__new__(cls, *_txt2obj(name, name=True))
class Purpose(_ASN1Object, _Enum):
"""SSLContext purpose flags with X509v3 Extended Key Usage objects
SERVER_AUTH = '1.3.6.1.5.5.7.3.1'
CLIENT_AUTH = '1.3.6.1.5.5.7.3.2'
class SSLContext(_SSLContext):
"""An SSLContext holds various SSL-related configuration options and
data, such as certificates and possibly a private key."""
_windows_cert_stores = ("CA", "ROOT")
sslsocket_class = None # SSLSocket is assigned later.
sslobject_class = None # SSLObject is assigned later.
def __new__(cls, protocol=PROTOCOL_TLS, *args, **kwargs):
self = _SSLContext.__new__(cls, protocol)
def _encode_hostname(self, hostname):
elif isinstance(hostname, str):
return hostname.encode('idna').decode('ascii')
return hostname.decode('ascii')
def wrap_socket(self, sock, server_side=False,
do_handshake_on_connect=True,
suppress_ragged_eofs=True,
server_hostname=None, session=None):
# SSLSocket class handles server_hostname encoding before it calls
return self.sslsocket_class._create(