[tor-commits] [ooni-probe/develop] Ported tls_handshake.py to be a nettest, rather than an external script.

isis at torproject.org isis at torproject.org
Thu Jun 6 16:41:36 UTC 2013


commit 348befd12ca6e1f0e0deb75dff43e1994f098325
Author: Isis Lovecruft <isis at torproject.org>
Date:   Wed Jan 23 19:34:30 2013 +0000

    Ported tls_handshake.py to be a nettest, rather than an external script.
---
 nettests/experimental/tls_handshake.py |  231 ++++++++++++++++++++++++++++++++
 1 file changed, 231 insertions(+)

diff --git a/nettests/experimental/tls_handshake.py b/nettests/experimental/tls_handshake.py
new file mode 100644
index 0000000..d909603
--- /dev/null
+++ b/nettests/experimental/tls_handshake.py
@@ -0,0 +1,231 @@
+#!/usr/bin/env python
+# -*- encoding: utf-8 -*-
+"""
+  tls_handshake.py
+  ----------------
+  This file contains test cases for determining if a TLS handshake completes
+  successfully, including ways to test if a TLS handshake which uses Mozilla
+  Firefox's current ciphersuite list completes.
+
+  These NetTestCases are a rewrite of a script contributed by Hackerberry
+  Finn, in order to fit into OONI's core network tests.
+
+  @authors: Isis Agora Lovecruft <isis at torproject.org>,
+            Hackerberry Finn <anon at localhost>
+  @license: see included LICENSE file
+"""
+
+import os
+import socket
+from socket import error as serror
+import struct
+import sys
+
+from ipaddr import IPAddress
+from OpenSSL import SSL
+from twisted.internet import defer
+from twisted.python import usage
+
+from ooni import nettest, config
+from ooni.utils import log
+
+## For a way to obtain the current version of Firefox's default ciphersuite
+## list, see https://trac.torproject.org/projects/tor/attachment/ticket/4744/
+## and the attached file "get_mozilla_files.py".
+##
+## Note, however, that doing so requires the source code to the version of
+## firefox that you wish to emulate.
+
+firefox_ciphers = ["ECDHE-ECDSA-AES256-SHA",
+                   "ECDHE-RSA-AES256-SHA",
+                   "DHE-RSA-CAMELLIA256-SHA",
+                   "DHE-DSS-CAMELLIA256-SHA",
+                   "DHE-RSA-AES256-SHA",
+                   "DHE-DSS-AES256-SHA",
+                   "ECDH-ECDSA-AES256-CBC-SHA",
+                   "ECDH-RSA-AES256-CBC-SHA",
+                   "CAMELLIA256-SHA",
+                   "AES256-SHA",
+                   "ECDHE-ECDSA-RC4-SHA",
+                   "ECDHE-ECDSA-AES128-SHA",
+                   "ECDHE-RSA-RC4-SHA",
+                   "ECDHE-RSA-AES128-SHA",
+                   "DHE-RSA-CAMELLIA128-SHA",
+                   "DHE-DSS-CAMELLIA128-SHA",]
+
+
+class UsageOptions(usage.Options):
+    optParameters = [
+        ['host', 'h', None, 'Remote host IP address (v4/v6)'],
+        ['port', 'p', None,
+         "Use this port for all hosts, regardless of port specified in file"],
+        ['ciphersuite', 'c', None ,
+         'File containing ciphersuite list, one per line'],]
+    optFlags = [
+        ['ssl2', '2', 'Use SSLv2'],
+        ['ssl3', '3', 'Use SSLv3'],
+        ['tls1', 't', 'Use TLSv1'],]
+
+class TLSHandshakeTest(nettest.NetTestCase):
+    """
+    An ooniprobe NetTestCase for determining if we can complete a TLS handshake
+    with a remote host.
+    """
+    name         = 'tls-handshake'
+    author       = 'Isis Lovecruft <isis at torproject.org>'
+    description  = 'A test to determing if we can complete a TLS hankshake.'
+    version      = '0.0.1'
+
+    requiresRoot = False
+    usageOptions = UsageOptions
+
+    inputFile = ['file', 'f', None, 'List of <IP>:<PORT>s to test']
+
+    def setUp(self, *args, **kwargs):
+        if self.localOptions:
+            options = self.localOptions
+            self.ciphers = []
+            self.methods = []
+
+            ## check that we're actually testing an IP:PORT, else exit
+            ## gracefully:
+            if not (options['host'] and options['port']) \
+                    and not options['file']:
+                 sys.exit("Need --host and --port, or --file!")
+
+            ## set the SSL/TLS method to use:
+            for method in ['ssl2', 'ssl3', 'tls1']:
+                if options[method]:
+                    self.methods.append(method)
+
+            ## if we weren't given a file with a list of ciphersuites to use,
+            ## then use the firefox default list:
+            if not options['ciphersuite']:
+                self.ciphers = firefox_ciphers
+            else:
+                if os.path.isfile(options['ciphersuite']):
+                    with open(options['ciphersuite']) as cipherfile:
+                        for line in cipherfile.readlines():
+                            self.ciphers.append(line.strip())
+            self.ciphersuite = ":".join(self.ciphers)
+
+        if hasattr(config.advanced, 'default_timeout'):
+            timeout = config.advanced.default_timeout
+        else:
+            timeout = 10   ## default the timeout to 10 seconds
+        socket.setdefaulttimeout(timeout)
+        self.timeout = struct.pack('ll', int(timeout), 0)
+
+    def splitInput(self, input):
+        addr, port = input.strip().rsplit(':', 1)
+        if self.localOptions['port']:
+            port = self.localOptions['port']
+        return (str(addr), int(port))
+
+    def inputProcessor(self, file=None):
+        if os.path.isfile(file):
+            with open(file) as fh:
+                for line in fh.readlines():
+                    if line.startswith('#'):
+                        continue
+                    yield self.splitInput(line)
+
+    def buildSocket(self, addr):
+        ip = IPAddress(addr) ## learn if we're IPv4 or IPv6
+        if ip.version == 4:
+            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        elif ip.version == 6:
+            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+        return s
+
+    def getContext(self):
+        if 'ssl2' in self.methods:
+            if not 'ssl3' in self.methods:
+                context = SSL.Context(SSL.SSLv2_METHOD)
+            else:
+                context = SSL.Context(SSL.SSLv23_METHOD)
+        elif 'ssl3' in self.methods:
+            context = SSL.Context(SSL.SSLv3_METHOD)
+        elif 'tls1' in self.methods:
+            context = SSL.Context(SSL.TLSv1_METHOD)
+        else:
+            raise Exception("No SSL/TLS method chosen!")
+        context.set_cipher_list(self.ciphersuite)
+        return context
+
+    def test_tlsv1_handshake(self):
+
+        def makeConnection(addr, port):
+            socket = self.buildSocket(addr)
+            context = self.getContext()
+
+            connection = SSL.Connection(context, socket)
+
+            try:
+                connection.connect((addr, port))
+            except serror, se:
+                if se.message.find("[Errno 101]"):
+                    connection.shutdown()
+                log.err(se)
+            else:
+                connection.setblocking(1)
+                connection.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO,
+                                      self.timeout)
+                log.msg("Connected to %s" % connection.getpeername())
+                log.msg("Connection state: %s " % connection.state_string())
+            return connection
+
+        def doHandshake(connection):
+            try:
+                connection.do_handshake()
+            except SSL.WantReadError():
+                log.msg("Timeout exceeded.")
+                connection.shutdown()
+            else:
+                log.msg("State: %s" % connection.state_string())
+                log.msg("Transmitted %d bytes" % connection.send("o\r\n"))
+                try:
+                    recvstr = connection.recv(1024)
+                except SSL.WantReadError:
+                    log.msg("Timeout exceeded")
+                    connection.shutdown()
+                else:
+                    log.msg("Received: %s" % recvstr)
+            return connection
+
+        def handshakeSucceeded(connection):
+            if connection:
+                host, port = connection.getpeername()
+                self.report['host'] = host
+                self.report['port'] = port
+                self.report['state'] = connection.state_string()
+
+        def handshakeFailed(connection, addr, port):
+            if connection is None:
+                self.report['host'] = addr
+                self.report['port'] = port
+                self.report['state'] = 'FAILED'
+            else:
+                return handshakeSucceeded(connection)
+
+        addr, port = self.input
+        connection = defer.maybeDeferred(makeConnection, addr, port)
+        connection.addCallback(doHandshake)
+        connection.addErrback(log.err)
+        connection.addCallback(handshakeSucceeded)
+        connection.addErrback(handshakeFailed)
+
+        return connection
+
+## XXX clean me up
+## old function from anonymous contribution: (saved mostly for reference of the
+## success string)
+##
+#def checkBridgeConnection(host, port)
+#  cipher_arg = ":".join(ciphers)
+#  cmd  = ["openssl", "s_client", "-connect", "%s:%s" % (host,port)]
+#  cmd += ["-cipher", cipher_arg]
+#  proc = subprocess.Popen(cmd, stdout=PIPE, stderr=PIPE,stdin=PIPE)
+#  out, error = proc.communicate()
+#  success = "Cipher is DHE-RSA-AES256-SHA" in out
+#  return success





More information about the tor-commits mailing list