# Wrapper module for _ssl, providing some additional facilities
# implemented in Python. Written by Bill Janssen.
"""This module provides some more Pythonic support for SSL.
SSLSocket -- subtype of socket.socket which does SSL over the socket
SSLError -- exception raised for I/O errors
cert_time_to_seconds -- convert time string used for certificate
notBefore and notAfter functions to integer
seconds past the Epoch (the time values
returned from time.time())
fetch_server_certificate (HOST, PORT) -- fetch the certificate provided
by the server running on HOST at port PORT. No
validation of the certificate is performed.
SSL_ERROR_WANT_X509_LOOKUP
SSL_ERROR_INVALID_ERROR_CODE
The following group define certificate requirements that one side is
allowing/requiring from the other side:
CERT_NONE - no certificates from the other side are required (or will
be looked at if provided)
CERT_OPTIONAL - certificates are not required, but if provided will be
validated, and if validation fails, the connection will
CERT_REQUIRED - certificates are required, and will be validated, and
if validation fails, the connection will also fail
The following constants identify various SSL protocol variants:
The following constants identify various SSL alert message descriptions as per
http://www.iana.org/assignments/tls-parameters/tls-parameters.xml#tls-parameters-6
ALERT_DESCRIPTION_CLOSE_NOTIFY
ALERT_DESCRIPTION_UNEXPECTED_MESSAGE
ALERT_DESCRIPTION_BAD_RECORD_MAC
ALERT_DESCRIPTION_RECORD_OVERFLOW
ALERT_DESCRIPTION_DECOMPRESSION_FAILURE
ALERT_DESCRIPTION_HANDSHAKE_FAILURE
ALERT_DESCRIPTION_BAD_CERTIFICATE
ALERT_DESCRIPTION_UNSUPPORTED_CERTIFICATE
ALERT_DESCRIPTION_CERTIFICATE_REVOKED
ALERT_DESCRIPTION_CERTIFICATE_EXPIRED
ALERT_DESCRIPTION_CERTIFICATE_UNKNOWN
ALERT_DESCRIPTION_ILLEGAL_PARAMETER
ALERT_DESCRIPTION_UNKNOWN_CA
ALERT_DESCRIPTION_ACCESS_DENIED
ALERT_DESCRIPTION_DECODE_ERROR
ALERT_DESCRIPTION_DECRYPT_ERROR
ALERT_DESCRIPTION_PROTOCOL_VERSION
ALERT_DESCRIPTION_INSUFFICIENT_SECURITY
ALERT_DESCRIPTION_INTERNAL_ERROR
ALERT_DESCRIPTION_USER_CANCELLED
ALERT_DESCRIPTION_NO_RENEGOTIATION
ALERT_DESCRIPTION_UNSUPPORTED_EXTENSION
ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE
ALERT_DESCRIPTION_UNRECOGNIZED_NAME
ALERT_DESCRIPTION_BAD_CERTIFICATE_STATUS_RESPONSE
ALERT_DESCRIPTION_BAD_CERTIFICATE_HASH_VALUE
ALERT_DESCRIPTION_UNKNOWN_PSK_IDENTITY
from collections import namedtuple
from contextlib import closing
import _ssl # if we can't import it, let the error propagate
from _ssl import OPENSSL_VERSION_NUMBER, OPENSSL_VERSION_INFO, OPENSSL_VERSION
from _ssl import _SSLContext
SSLError, SSLZeroReturnError, SSLWantReadError, SSLWantWriteError,
SSLSyscallError, SSLEOFError,
from _ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
from _ssl import txt2obj as _txt2obj, nid2obj as _nid2obj
from _ssl import RAND_status, RAND_add
from _ssl import RAND_egd
# LibreSSL does not provide RAND_egd
def _import_symbols(prefix):
globals()[n] = getattr(_ssl, n)
_import_symbols('ALERT_DESCRIPTION_')
_import_symbols('SSL_ERROR_')
_import_symbols('PROTOCOL_')
_import_symbols('VERIFY_')
from _ssl import HAS_SNI, HAS_ECDH, HAS_NPN, HAS_ALPN
from _ssl import _OPENSSL_API_VERSION
_PROTOCOL_NAMES = {value: name for name, value in globals().items() if name.startswith('PROTOCOL_')}
_SSLv2_IF_EXISTS = PROTOCOL_SSLv2
from socket import socket, _fileobject, _delegate_methods, error as socket_error
if sys.platform == "win32":
from _ssl import enum_certificates, enum_crls
from socket import socket, AF_INET, SOCK_STREAM, create_connection
from socket import SOL_SOCKET, SO_TYPE
import base64 # for DER-to-PEM translation
CHANNEL_BINDING_TYPES = ['tls-unique']
CHANNEL_BINDING_TYPES = []
# Disable weak or insecure ciphers by default
# (OpenSSL's default setting is 'DEFAULT:!aNULL:!eNULL')
# Enable a better set of ciphers by default
# This list has been explicitly chosen to:
# * Prefer cipher suites that offer perfect forward secrecy (DHE/ECDHE)
# * Prefer ECDHE over DHE for better performance
# * Prefer any AES-GCM over any AES-CBC for better performance and security
# * Then Use HIGH cipher suites as a fallback
# * Then Use 3DES as fallback which is secure but slow
# * Disable NULL authentication, NULL encryption, and MD5 MACs for security
'ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+HIGH:'
'DH+HIGH:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+HIGH:RSA+3DES:!aNULL:'
# Restricted and more secure ciphers for the server side
# This list has been explicitly chosen to:
# * Prefer cipher suites that offer perfect forward secrecy (DHE/ECDHE)
# * Prefer ECDHE over DHE for better performance
# * Prefer any AES-GCM over any AES-CBC for better performance and security
# * Then Use HIGH cipher suites as a fallback
# * Then Use 3DES as fallback which is secure but slow
# * Disable NULL authentication, NULL encryption, MD5 MACs, DSS, and RC4 for
_RESTRICTED_SERVER_CIPHERS = (
'ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+HIGH:'
'DH+HIGH:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+HIGH:RSA+3DES:!aNULL:'
class CertificateError(ValueError):
def _dnsname_match(dn, hostname, max_wildcards=1):
"""Matching according to RFC 6125, section 6.4.3
http://tools.ietf.org/html/rfc6125#section-6.4.3
wildcards = leftmost.count('*')
if wildcards > max_wildcards:
# Issue #17980: avoid denials of service by refusing more
# than one wildcard per fragment. A survery of established
# policy among SSL implementations showed it to be a
"too many wildcards in certificate DNS name: " + repr(dn))
# speed up common case w/o wildcards
return dn.lower() == hostname.lower()
# RFC 6125, section 6.4.3, subitem 1.
# The client SHOULD NOT attempt to match a presented identifier in which
# the wildcard character comprises a label other than the left-most label.
# When '*' is a fragment by itself, it matches a non-empty dotless
elif leftmost.startswith('xn--') or hostname.startswith('xn--'):
# RFC 6125, section 6.4.3, subitem 3.
# The client SHOULD NOT attempt to match a presented identifier
# where the wildcard character is embedded within an A-label or
# U-label of an internationalized domain name.
pats.append(re.escape(leftmost))
# Otherwise, '*' matches any dotless string, e.g. www*
pats.append(re.escape(leftmost).replace(r'\*', '[^.]*'))
# add the remaining fragments, ignore any wildcards
pats.append(re.escape(frag))
pat = re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE)
return pat.match(hostname)
def match_hostname(cert, hostname):
"""Verify that *cert* (in decoded format as returned by
SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125
rules are followed, but IP addresses are not accepted for *hostname*.
CertificateError is raised on failure. On success, the function
raise ValueError("empty or no certificate, match_hostname needs a "
"SSL socket or SSL context with either "
"CERT_OPTIONAL or CERT_REQUIRED")
san = cert.get('subjectAltName', ())
if _dnsname_match(value, hostname):
# The subject is only checked when there is no dNSName entry
for sub in cert.get('subject', ()):
# XXX according to RFC 2818, the most specific Common Name
if _dnsname_match(value, hostname):
raise CertificateError("hostname %r "
"doesn't match either of %s"
% (hostname, ', '.join(map(repr, dnsnames))))
raise CertificateError("hostname %r "
% (hostname, dnsnames[0]))
raise CertificateError("no appropriate commonName or "
"subjectAltName fields were found")
DefaultVerifyPaths = namedtuple("DefaultVerifyPaths",
"cafile capath openssl_cafile_env openssl_cafile openssl_capath_env "
def get_default_verify_paths():
"""Return paths to default cafile and capath.
parts = _ssl.get_default_verify_paths()
# environment vars shadow paths
cafile = os.environ.get(parts[0], parts[1])
capath = os.environ.get(parts[2], parts[3])
return DefaultVerifyPaths(cafile if os.path.isfile(cafile) else None,
capath if os.path.isdir(capath) else None,
class _ASN1Object(namedtuple("_ASN1Object", "nid shortname longname oid")):
"""ASN.1 object identifier lookup
return super(_ASN1Object, cls).__new__(cls, *_txt2obj(oid, name=False))
"""Create _ASN1Object from OpenSSL numeric ID
return super(_ASN1Object, cls).__new__(cls, *_nid2obj(nid))
"""Create _ASN1Object from short name, long name or OID
return super(_ASN1Object, cls).__new__(cls, *_txt2obj(name, name=True))
class Purpose(_ASN1Object):
"""SSLContext purpose flags with X509v3 Extended Key Usage objects
Purpose.SERVER_AUTH = Purpose('1.3.6.1.5.5.7.3.1')
Purpose.CLIENT_AUTH = Purpose('1.3.6.1.5.5.7.3.2')
class SSLContext(_SSLContext):
"""An SSLContext holds various SSL-related configuration options and
data, such as certificates and possibly a private key."""
__slots__ = ('protocol', '__weakref__')
_windows_cert_stores = ("CA", "ROOT")
def __new__(cls, protocol, *args, **kwargs):
self = _SSLContext.__new__(cls, protocol)
if protocol != _SSLv2_IF_EXISTS:
self.set_ciphers(_DEFAULT_CIPHERS)
def __init__(self, protocol):
def wrap_socket(self, sock, server_side=False,
do_handshake_on_connect=True,
suppress_ragged_eofs=True,
return SSLSocket(sock=sock, server_side=server_side,
do_handshake_on_connect=do_handshake_on_connect,
suppress_ragged_eofs=suppress_ragged_eofs,
server_hostname=server_hostname,
def set_npn_protocols(self, npn_protocols):
for protocol in npn_protocols:
b = protocol.encode('ascii')
if len(b) == 0 or len(b) > 255:
raise SSLError('NPN protocols must be 1 to 255 in length')
self._set_npn_protocols(protos)
def set_alpn_protocols(self, alpn_protocols):
for protocol in alpn_protocols:
b = protocol.encode('ascii')
if len(b) == 0 or len(b) > 255:
raise SSLError('ALPN protocols must be 1 to 255 in length')
self._set_alpn_protocols(protos)
def _load_windows_store_certs(self, storename, purpose):
for cert, encoding, trust in enum_certificates(storename):
# CA certs are never PKCS#7 encoded
if encoding == "x509_asn":
if trust is True or purpose.oid in trust:
warnings.warn("unable to enumerate Windows certificate store")
self.load_verify_locations(cadata=certs)
def load_default_certs(self, purpose=Purpose.SERVER_AUTH):
if not isinstance(purpose, _ASN1Object):
if sys.platform == "win32":
for storename in self._windows_cert_stores:
self._load_windows_store_certs(storename, purpose)
self.set_default_verify_paths()
def create_default_context(purpose=Purpose.SERVER_AUTH, cafile=None,
capath=None, cadata=None):
"""Create a SSLContext object with default settings.
NOTE: The protocol and settings may change anytime without prior
deprecation. The values represent a fair balance between maximum
compatibility and security.
if not isinstance(purpose, _ASN1Object):
context = SSLContext(PROTOCOL_SSLv23)
# SSLv2 considered harmful.
context.options |= OP_NO_SSLv2
# SSLv3 has problematic security and is only required for really old
# clients such as IE6 on Windows XP
context.options |= OP_NO_SSLv3
# disable compression to prevent CRIME attacks (OpenSSL 1.0+)
context.options |= getattr(_ssl, "OP_NO_COMPRESSION", 0)
if purpose == Purpose.SERVER_AUTH:
# verify certs and host name in client mode
context.verify_mode = CERT_REQUIRED
context.check_hostname = True
elif purpose == Purpose.CLIENT_AUTH:
# Prefer the server's ciphers by default so that we get stronger
context.options |= getattr(_ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
# Use single use keys in order to improve forward secrecy
context.options |= getattr(_ssl, "OP_SINGLE_DH_USE", 0)
context.options |= getattr(_ssl, "OP_SINGLE_ECDH_USE", 0)
# disallow ciphers with known vulnerabilities
context.set_ciphers(_RESTRICTED_SERVER_CIPHERS)
if cafile or capath or cadata:
context.load_verify_locations(cafile, capath, cadata)
elif context.verify_mode != CERT_NONE:
# no explicit cafile, capath or cadata but the verify mode is
# CERT_OPTIONAL or CERT_REQUIRED. Let's try to load default system
# root CA certificates for the given purpose. This may fail silently.
context.load_default_certs(purpose)
def _create_unverified_context(protocol=PROTOCOL_SSLv23, cert_reqs=None,
check_hostname=False, purpose=Purpose.SERVER_AUTH,
certfile=None, keyfile=None,
cafile=None, capath=None, cadata=None):
"""Create a SSLContext object for Python stdlib modules
All Python stdlib modules shall use this function to create SSLContext
objects in order to keep common settings in one place. The configuration
is less restrict than create_default_context()'s to increase backward
if not isinstance(purpose, _ASN1Object):
context = SSLContext(protocol)
# SSLv2 considered harmful.
context.options |= OP_NO_SSLv2
# SSLv3 has problematic security and is only required for really old
# clients such as IE6 on Windows XP
context.options |= OP_NO_SSLv3
if cert_reqs is not None:
context.verify_mode = cert_reqs
context.check_hostname = check_hostname
if keyfile and not certfile:
raise ValueError("certfile must be specified")
context.load_cert_chain(certfile, keyfile)
if cafile or capath or cadata:
context.load_verify_locations(cafile, capath, cadata)
elif context.verify_mode != CERT_NONE:
# no explicit cafile, capath or cadata but the verify mode is
# CERT_OPTIONAL or CERT_REQUIRED. Let's try to load default system
# root CA certificates for the given purpose. This may fail silently.
context.load_default_certs(purpose)
# Backwards compatibility alias, even though it's not a public name.
_create_stdlib_context = _create_unverified_context
# PEP 493: Verify HTTPS by default, but allow envvar to override that
_https_verify_envvar = 'PYTHONHTTPSVERIFY'
def _get_https_context_factory():
if not sys.flags.ignore_environment:
config_setting = os.environ.get(_https_verify_envvar)
if config_setting == '0':
return _create_unverified_context
return create_default_context