commit c171a250c1dfea890f5e6f965361e0829838264e Author: Isis Lovecruft isis@torproject.org Date: Tue Mar 4 05:21:48 2014 +0000
Implement cert-chain and hostname checking OpenSSL.SSL.Context factory.
* ADD bridgedb.crypto.SSLVerifyingContextFactory class, which verifies certificate chains and checks certificate hostnames for a requested resource. --- lib/bridgedb/crypto.py | 98 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 1 deletion(-)
diff --git a/lib/bridgedb/crypto.py b/lib/bridgedb/crypto.py index 6c0101b..7f6d597 100644 --- a/lib/bridgedb/crypto.py +++ b/lib/bridgedb/crypto.py @@ -33,12 +33,16 @@ import hashlib import hmac import logging import os +import re +import urllib
-import OpenSSL.rand +import OpenSSL
from Crypto.Cipher import PKCS1_OAEP from Crypto.PublicKey import RSA
+from twisted.internet import ssl +
#: The hash digest to use for HMACs. DIGESTMOD = hashlib.sha1 @@ -189,3 +193,95 @@ def getHMACFunc(key, hex=True): else: return h_tmp.digest() return hmac_fn + + +class SSLVerifyingContextFactory(ssl.CertificateOptions): + """``OpenSSL.SSL.Context`` factory which does full certificate-chain and + hostname verfication. + """ + isClient = True + + def __init__(self, url, **kwargs): + """Create a client-side verifying SSL Context factory. + + To pass acceptable certificates for a server which does + client-authentication checks: initialise with a ``caCerts=[]`` keyword + argument, which should be a list of ``OpenSSL.crypto.X509`` instances + (one for each peer certificate to add to the store), and set + ``SSLVerifyingContextFactory.isClient=False``. + + :param str url: The URL being requested by an + :api:`twisted.web.client.Agent`. + :param bool isClient: True if we're being used in a client + implementation; False if we're a server. + """ + self.hostname = self.getHostnameFromURL(url) + + # ``verify`` here refers to server-side verification of certificates + # presented by a client: + self.verify = False if self.isClient else True + super(SSLVerifyingContextFactory, self).__init__(verify=self.verify, + fixBrokenPeers=True, + **kwargs) + + def getContext(self, hostname=None, port=None): + """Retrieve a configured ``OpenSSL.SSL.Context``. + + Any certificates in the ``caCerts`` list given during initialisation + are added to the ``Context``'s certificate store. + + The **hostname** and **port** arguments seem unused, but they are + required due to some Twisted and pyOpenSSL internals. See + :api:`twisted.web.client.Agent._wrapContextFactory`. + + :rtype: ``OpenSSL.SSL.Context`` + :returns: An SSL Context which verifies certificates. + """ + ctx = super(SSLVerifyingContextFactory, self).getContext() + store = ctx.get_cert_store() + verifyOptions = OpenSSL.SSL.VERIFY_PEER + ctx.set_verify(verifyOptions, self.verifyHostname) + return ctx + + def getHostnameFromURL(self, url): + """Parse the hostname from the originally requested URL. + + :param str url: The URL being requested by an + :api:`twisted.web.client.Agent`. + :rtype: str + :returns: The full hostname (including any subdomains). + """ + hostname = urllib.splithost(urllib.splittype(url)[1])[0] + logging.debug("Parsed hostname %r for cert CN matching." % hostname) + return hostname + + def verifyHostname(self, connection, x509, errnum, depth, okay): + """Callback method for additional SSL certificate validation. + + If the certificate is signed by a valid CA, and the chain is valid, + verify that the level 0 certificate has a subject common name which is + valid for the hostname of the originally requested URL. + + :param connection: An ``OpenSSL.SSL.Connection``. + :param x509: An ``OpenSSL.crypto.X509`` object. + :param errnum: A pyOpenSSL error number. See that project's docs. + :param depth: The depth which the current certificate is at in the + certificate chain. + :param bool okay: True if all the pyOpenSSL default checks on the + certificate passed. False otherwise. + """ + commonName = x509.get_subject().commonName + logging.debug("Received cert at level %d: '%s'" % (depth, commonName)) + + # We only want to verify that the hostname matches for the level 0 + # certificate: + if okay and (depth == 0): + cn = commonName.replace('*', '.*') + hostnamesMatch = re.search(cn, self.hostname) + if not hostnamesMatch: + logging.warn("Invalid certificate subject CN for '%s': '%s'" + % (self.hostname, commonName)) + return False + logging.debug("Valid certificate subject CN for '%s': '%s'" + % (self.hostname, commonName)) + return True