From 9611e1a832a411dc3ec99ddd9390168e1faa162d Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Fri, 23 Aug 2024 16:00:29 +0530 Subject: [PATCH] Fix #2077704 [connecting to calibre-ebook.com does not work in python 3.13](https://bugs.launchpad.net/calibre/+bug/2077704) Also remove old code present for legacy python versions --- src/calibre/utils/https.py | 147 ++++--------------------------------- 1 file changed, 15 insertions(+), 132 deletions(-) diff --git a/src/calibre/utils/https.py b/src/calibre/utils/https.py index 05878aed51..a2ae76fd2a 100644 --- a/src/calibre/utils/https.py +++ b/src/calibre/utils/https.py @@ -4,8 +4,6 @@ __license__ = 'GPL v3' __copyright__ = '2014, Kovid Goyal ' -import re -import socket import ssl from contextlib import closing @@ -14,8 +12,6 @@ from calibre.utils.resources import get_path as P from polyglot import http_client from polyglot.urllib import urlsplit -has_ssl_verify = hasattr(ssl, 'create_default_context') and hasattr(ssl, '_create_unverified_context') - class HTTPError(ValueError): @@ -27,130 +23,21 @@ class HTTPError(ValueError): self.url = url -if has_ssl_verify: - class HTTPSConnection(http_client.HTTPSConnection): +class HTTPSConnection(http_client.HTTPSConnection): - def __init__(self, ssl_version, *args, **kwargs): - cafile = kwargs.pop('cert_file', None) - if cafile is None: - kwargs['context'] = ssl._create_unverified_context() - else: - kwargs['context'] = ssl.create_default_context(cafile=cafile) - http_client.HTTPSConnection.__init__(self, *args, **kwargs) -else: - # Check certificate hostname {{{ - # Implementation taken from python 3 - class CertificateError(ValueError): - pass - - def _dnsname_match(dn, hostname, max_wildcards=1): - """Matching according to RFC 6125, section 6.4.3 - - http://tools.ietf.org/html/rfc6125#section-6.4.3 - """ - pats = [] - if not dn: - return False - - parts = dn.split(r'.') - leftmost, remainder = parts[0], parts[1:] - - wildcards = leftmost.count('*') - if wildcards > max_wildcards: - # Issue #17980: avoid denials of service by refusing more - # than one wildcard per fragment. A survery of established - # policy among SSL implementations showed it to be a - # reasonable choice. - raise CertificateError( - "too many wildcards in certificate DNS name: " + repr(dn)) - - # speed up common case w/o wildcards - if not wildcards: - return dn.lower() == hostname.lower() - - # RFC 6125, section 6.4.3, subitem 1. - # The client SHOULD NOT attempt to match a presented identifier in which - # the wildcard character comprises a label other than the left-most label. - if leftmost == '*': - # When '*' is a fragment by itself, it matches a non-empty dotless - # fragment. - pats.append('[^.]+') - elif leftmost.startswith('xn--') or hostname.startswith('xn--'): - # RFC 6125, section 6.4.3, subitem 3. - # The client SHOULD NOT attempt to match a presented identifier - # where the wildcard character is embedded within an A-label or - # U-label of an internationalized domain name. - pats.append(re.escape(leftmost)) + def __init__(self, *args, **kwargs): + cafile = kwargs.pop('cert_file', None) + if cafile is None: + kwargs['context'] = ssl._create_unverified_context() else: - # Otherwise, '*' matches any dotless string, e.g. www* - pats.append(re.escape(leftmost).replace(r'\*', '[^.]*')) - - # add the remaining fragments, ignore any wildcards - for frag in remainder: - pats.append(re.escape(frag)) - - pat = re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE) - return pat.match(hostname) - - def match_hostname(cert, hostname): - """Verify that *cert* (in decoded format as returned by - SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125 - rules are followed, but IP addresses are not accepted for *hostname*. - - CertificateError is raised on failure. On success, the function - returns nothing. - """ - if not cert: - raise ValueError("empty or no certificate") - dnsnames = [] - san = cert.get('subjectAltName', ()) - for key, value in san: - if key == 'DNS': - if _dnsname_match(value, hostname): - return - dnsnames.append(value) - if not dnsnames: - # The subject is only checked when there is no dNSName entry - # in subjectAltName - for sub in cert.get('subject', ()): - for key, value in sub: - # XXX according to RFC 2818, the most specific Common Name - # must be used. - if key == 'commonName': - if _dnsname_match(value, hostname): - return - dnsnames.append(value) - if len(dnsnames) > 1: - raise CertificateError("hostname %r " - "doesn't match either of %s" - % (hostname, ', '.join(map(repr, dnsnames)))) - elif len(dnsnames) == 1: - raise CertificateError("hostname %r " - "doesn't match %r" - % (hostname, dnsnames[0])) + kwargs['context'] = ssl.create_default_context(cafile=cafile) + if kwargs.pop('disable_x509_strict_checking', False): + # python 3.13 forces VERIFY_X509_STRICT which breaks with the + # private certificate used for downloads from code.calibre-ebook.com + kwargs['context'].verify_flags &= ~ssl.VERIFY_X509_STRICT else: - raise CertificateError("no appropriate commonName or " - "subjectAltName fields were found") - # }}} - - class HTTPSConnection(http_client.HTTPSConnection): - - def __init__(self, ssl_version, *args, **kwargs): - http_client.HTTPSConnection.__init__(self, *args, **kwargs) - self.calibre_ssl_version = ssl_version - - def connect(self): - """Connect to a host on a given (SSL) port, properly verifying the SSL - certificate, both that it is valid and that its declared hostnames - match the hostname we are connecting to.""" - - sock = socket.create_connection((self.host, self.port), - self.timeout, self.source_address) - if self._tunnel_host: - self.sock = sock - self._tunnel() - self.sock = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_REQUIRED, ca_certs=self.cert_file, ssl_version=self.calibre_ssl_version) - getattr(ssl, 'match_hostname', match_hostname)(self.sock.getpeercert(), self.host) + kwargs['context'].verify_flags |= ssl.VERIFY_X509_STRICT + http_client.HTTPSConnection.__init__(self, *args, **kwargs) def get_https_resource_securely( @@ -163,11 +50,7 @@ def get_https_resource_securely( You can pass cacerts=None to download using SSL but without verifying the server certificate. ''' - if ssl_version is None: - try: - ssl_version = ssl.PROTOCOL_TLSv1_2 - except AttributeError: - ssl_version = ssl.PROTOCOL_TLSv1 # old python + disable_x509_strict_checking = cacerts == 'calibre-ebook-root-CA.crt' cert_file = None if cacerts is not None: cert_file = P(cacerts, allow_user_override=False) @@ -190,7 +73,7 @@ def get_https_resource_securely( # Invalid proxy, ignore pass - c = HTTPSConnection(ssl_version, hostname, port, cert_file=cert_file, timeout=timeout) + c = HTTPSConnection(hostname, port, cert_file=cert_file, timeout=timeout, disable_x509_strict_checking=disable_x509_strict_checking) if has_proxy: c.set_tunnel(p.hostname, p.port) @@ -208,7 +91,7 @@ def get_https_resource_securely( if newurl is None: raise ValueError('%s returned a redirect response with no Location header' % url) return get_https_resource_securely( - newurl, cacerts=cacerts, timeout=timeout, max_redirects=max_redirects-1, ssl_version=ssl_version, get_response=get_response) + newurl, cacerts=cacerts, timeout=timeout, max_redirects=max_redirects-1, get_response=get_response) if response.status != http_client.OK: raise HTTPError(url, response.status) if get_response: