[tor-commits] [ooni-probe/master] * Moved CustomCircuit class to /ooni/utils/circuit.py.

isis at torproject.org isis at torproject.org
Thu Oct 4 14:41:15 UTC 2012


commit 21647a9c5ce54341e188619e3c67426fc9114630
Author: Isis Lovecruft <isis at torproject.org>
Date:   Mon Sep 17 03:57:30 2012 +0000

    * Moved CustomCircuit class to /ooni/utils/circuit.py.
    * Tor is started improperly, and the timeout function is stupid and just
      returns the deferred, rather than a timeout *on* the deferred.
---
 ooni/plugins/bridget.py |  289 ++++++++++++++++++++++++++---------------------
 ooni/utils/circuit.py   |  106 +++++++++++++++++
 2 files changed, 264 insertions(+), 131 deletions(-)

diff --git a/ooni/plugins/bridget.py b/ooni/plugins/bridget.py
index 49565d8..f22741b 100644
--- a/ooni/plugins/bridget.py
+++ b/ooni/plugins/bridget.py
@@ -13,6 +13,7 @@
 # :version: 0.1.0-alpha
 
 from __future__             import with_statement
+from functools              import wraps
 from os                     import getcwd
 from os.path                import isfile
 from os.path                import join as pj
@@ -22,6 +23,7 @@ from twisted.internet       import defer, error, reactor
 from zope.interface         import implements
 
 import random
+import signal
 import sys
 
 from ooni.utils             import log
@@ -36,7 +38,7 @@ def portCheck(number):
 
 portCheckAllowed     = "must be between 1024 and 65535."
 sockCheck, ctrlCheck = portCheck, portCheck
-sockCheck.coerceDoc  = "Port to use for Tor's SocksPort, " + portCheckAllowed
+sockCheck.coerceDoc  = "Port to use for Tor's SocksPort, "   + portCheckAllowed
 ctrlCheck.coerceDoc  = "Port to use for Tor's ControlPort, " + portCheckAllowed
 
 
@@ -105,11 +107,11 @@ class BridgetTest(OONITest):
     """
     implements(IPlugin, ITest)
 
-    shortName = "bridget"
-    description = "Use a Tor process to test connecting to bridges and relays"
+    shortName    = "bridget"
+    description  = "Use a Tor process to test connecting to bridges and relays"
     requirements = None
-    options = BridgetArgs
-    blocking = False
+    options      = BridgetArgs
+    blocking     = False
 
     def initialize(self):
         """
@@ -117,10 +119,11 @@ class BridgetTest(OONITest):
         running, so we need to deal with the creation of TorConfig() only
         once, before the experiment runs.
         """
-        self.socks_port     = 9049
-        self.control_port   = 9052
-        self.tor_binary     = '/usr/sbin/tor'
-        self.data_directory = None
+        self.socks_port      = 9049
+        self.control_port    = 9052
+        self.tor_binary      = '/usr/sbin/tor'
+        self.data_directory  = None
+        self.circuit_timeout = 90
 
         if self.local_options:
             options = self.local_options
@@ -176,11 +179,32 @@ class BridgetTest(OONITest):
                 if not options['bridges']:
                     e = "You must use the bridge option to test a transport."
                     raise usage.UsageError("%s" % e)
-                    
-                log.msg("Using pluggable transport ...")
-                ## XXX fixme there's got to be a better way to check the exec
-                assert type(options['transport']) is str
-                self.config.ClientTransportPlugin = options['transport']
+                else:
+                    ## XXX fixme there's got to be a better way to check the exec
+                    ##
+                    ## we could use:
+                    ##    os.setgid( NEW_GID )
+                    ##    os.setuid( NEW_UID )
+                    ## to drop any and all privileges
+                    assert type(options['transport']) is str
+                    [self.pt_type, 
+                     self.pt_exec] = options['transport'].split(' ', 1)
+                    log.msg("Using ClientTransportPlugin %s %s" 
+                            % (self.pt_type, self.pt_exec))
+                    self.pt_use = True
+
+                    ## XXX fixme we need a better way to deal with all PTs
+                    if self.pt_type == "obfs2":
+                        self.ctp = self.pt_type +" "+ self.pt_exec
+                    else:
+                        m  = "Pluggable Transport type %s was " % self.pt_type
+                        m += "unaccounted for, please contact isis (at) "
+                        m += "torproject (dot) org, with info and it'll get "
+                        m += "included."
+                        log.msg("%s" % m)
+                        self.ctp = None
+            else:
+                self.pt_use = False
 
             self.config.SocksPort   = self.socks_port
             self.config.ControlPort = self.control_port
@@ -261,7 +285,7 @@ class BridgetTest(OONITest):
         def reconf_controller(conf, bridge):
             ## if bridges and relays, use one bridge then build a circuit 
             ## from three relays
-            conf.Bridge = bridge
+            conf.Bridge = ""
             ## XXX do we need a SIGHUP to restart?                
 
             ## XXX see txtorcon.TorControlProtocol.add_event_listener we
@@ -289,131 +313,136 @@ class BridgetTest(OONITest):
         def updates(prog, tag, summary):
             log.msg("%d%%: %s" % (prog, summary))
 
+        if not self.circuit_timeout:
+            self.circuit_timeout = 90
+            
+        class TimeoutError(Exception):
+            pass
+
+        def stupid_timer(secs, e=None):
+            def decorator(func):
+                def _timer(signum, frame):
+                    raise TimeoutError, e
+                def wrapper(*args, **kwargs):
+                    signal.signal(signal.SIGALRM, _timer)
+                    signal.alarm(secs)
+                    try:
+                        res = func(*args, **kwargs)
+                    finally:
+                        signal.alarm(0)
+                    return res
+                return wraps(func)(wrapper)
+            return decorator
+        
+        @stupid_timer(self.circuit_timeout)
+        def _launch_tor_and_report_bridge_status(_config, 
+                                                 _reactor, 
+                                                 _updates, 
+                                                 _binary,
+                                                 _callback,
+                                                 _callback_arg,
+                                                 _errback):
+            """The grossest Python function you've ever seen."""
+
+            log.msg("Starting Tor ...")        
+            ## :return: a Deferred which callbacks with a TorProcessProtocol
+            ##          connected to the fully-bootstrapped Tor; this has a 
+            ##          txtorcon.TorControlProtocol instance as .protocol.
+            proc_proto = launch_tor(_config, _reactor, progress_updates=_updates, 
+                                    tor_binary=_binary).addCallback(_callback,
+                                                                    _callback_arg).addErrback(_errback)
+            return proc_proto
+                    ## now build circuits
+
+
+
         if len(args) == 0:
             log.msg("Bridget can't run without bridges or relays to test!")
             log.msg("Exiting ...")
             return sys.exit()
         else:
-
-            class CustomCircuit(CircuitListenerMixin):
-                implements(IStreamAttacher)
-
-                from txtorcon.interface import IRouterContainer
-                from txtorcon.interface import ICircuitContainer
-            
-                def __init__(self, state):
-                    self.state = state
-                    self.waiting_circuits = []
-
-                def waiting_on(self, circuit):
-                    for (circid, d) in self.waiting_circuits:
-                        if circuit.id == circid:
-                            return true
-                    return False
-
-                def circuit_extend(self, circuit, router):
-                    "ICircuitListener"
-                    if circuit.purpose != 'GENERAL':
-                        return
-                    if self.waiting_on(circuit):
-                        log.msg("Circuit %d (%s)" 
-                                % (circuit.id, router.id_hex))
-
-                def circuit_built(self, circuit):
-                    "ICircuitListener"
-                    if circuit.purpose != 'GENERAL':
-                        return
-                    log.msg("Circuit %s built ..." % circuit.id)
-                    log.msg("Full path of %s: %s" % (circuit.id, circuit.path))
-                    for (circid, d) in self.waiting_circuits:
-                        if circid == circuit.id:
-                            self.waiting_circuits.remove(circid, d)
-                            d.callback(circuit)
-
-                def circuit_failed(self, circuit, reason):
-                    if self.waiting_on(circuit):
-                        log.msg("A circuit we requested %s failed for reason %s" 
-                                % (circuit.id, reason))
-                        circid, d = None, None
-                        for x in self.waiting_circuits:
-                            if x[0] == circuit.id:
-                                circid, d, stream_cc = x
-                        if d is None:
-                            raise Exception("Expected to find circuit.")
-
-                        self.waiting_circuits.remove((circid, d))
-                        log.msg("Trying to build a circuit for %s" % circid)
-                        self.request_circuit_build(d)
-
-                def check_circuit_route(self, circuit, router):
-                    if router in circuit.path:
-                        #router.update() ## XXX can i use without args? no.
-                        TorInfo.dump(self)
-
-                def request_circuit_build(self, deferred):
-                    entries = self.state.entry_guards.value()
-                    relays  = self.state.routers.values()
-                    log.msg("We have these nodes listed as entry guards:") 
-                    log.msg("%s" % entries)
-                    log.msg("We have these nodes listed as relays:")
-                    log.msg("%s" % relays)
-                    path = [random.choice(entries),
-                            random.choice(relays),
-                            random.choice(relays)]
-                    log.msg("Requesting a circuit: %s" 
-                            % '-->'.join(map(lambda x: x.location.countrycode, 
-                                             path)))
-
-                    class AppendWaiting:
-                        def __init__(self, attacher, deferred):
-                            self.attacher = attacher
-                            self.d        = deferred
-
-                        def __call__(self, circuit):
-                            """
-                            Return from build_circuit is a Circuit, however,
-                            we want to wait until it is built before we can
-                            issue an attach on it and callback to the Deferred
-                            we issue here.
-                            """
-                            log.msg("Circuit %s is in progress ..." % circuit.id)
-                            self.attacher.waiting_circuits.append((circuit.id, 
-                                                                   self.d))
-
-                    fin = self.state.build_circuit(path)
-                    fin.addCallback(AppendWaiting(self, deferred_to_callback))
-                    fin.addErrback(log.err)
-                    return fin
-
+            log.msg("Bridget: initiating test ... ")
+            self.london_bridge          = []
+            self.working_bridge         = []
 
             if len(self.bridge_list) >= 1:
-                for bridge in self.bridge_list:
-                    try:
-                        log.msg("Current Bridge: %s" % bridge)
-                        reconf_controller(self.config, bridge)
-                    except:
-                        reconf_fail(bridge)
+                self.untested_bridge_count  = len(self.bridge_list)
+                self.burnt_bridge_count     = len(self.london_bridge)
+                self.current_bridge         = None
 
-            log.msg("Bridget: initiating test ... ")
-            log.msg("Using the following as our torrc:\n%s" 
-                    % self.config.create_torrc())
-            report = {'tor_config': self.config.config}
-            log.msg("Starting Tor ...")        
+            ## while bridges are in the bucket
+            while self.config.UseBridges and self.untested_bridge_count > 0:
+                try:
+                    ## if the user wanted to use pluggable transports
+                    assert self.pt_use is True
+                except AssertionError as no_use_pt:
+                    ## if the user didn't want to use pluggable transports
+                    log.msg("Configuring Bridge line without pluggable transport")
+                    log.msg("%s" % no_use_pt)
+                    self.pt_good = False
+                else:
+                    ## user wanted PT, and we recognized transport
+                    if self.ctp is not None:
+                        try:
+                            assert self.ctp is str
+                        except AssertionError as not_a_string:
+                            log.msg("Error: ClientTransportPlugin string unusable: %s" 
+                                    % not_a_string)
+                            log.msg("       Exiting ...")
+                            sys.exit()
+                        else:
+                            ## configure the transport
+                            self.config.ClientTransportPlugin = self.ctp
+                            self.pt_good = True
+                    ## user wanted PT, but we didn't recognize it
+                    else:
+                        log.msg("Error: Unable to use ClientTransportPlugin %s %s" 
+                                % (self.pt_type, self.pt_exec))
+                        log.msg("       Exiting...")
+                        sys.exit()
+                ## whether or not we're using a pluggable transport, we need
+                ## to set the Bridge line
+                finally:
+                    log.msg("We now have %d bridges in our list..." 
+                            % self.untested_bridge_count)
+                    self.current_bridge = self.bridge_list.pop()
+                    self.untested_bridge_count -= 1
+
+                    log.msg("Current Bridge: %s" % self.current_bridge)
+                    
+                    if self.pt_good:
+                        self.config.Bridge = self.pt_type +" "+ self.current_bridge
+                    else:
+                        self.config.Bridge = self.current_bridge
+                        
+                    log.msg("Using the following as our torrc:\n%s" 
+                            % self.config.create_torrc())
+                    report = {'tor_config': self.config.create_torrc()}
+                    self.config.save()
 
-            ## :return: a Deferred which callbacks with a TorProcessProtocol
-            ##          connected to the fully-bootstrapped Tor; this has a 
-            ##          txtorcon.TorControlProtocol instance as .protocol.
-            d = launch_tor(self.config, 
-                           reactor, 
-                           progress_updates=updates,
-                           tor_binary=self.tor_binary)
-            d.addCallback(bootstrap, self.config)
-            d.addErrback(setup_fail)
-            ## now build circuits
-
-            #print "Tor process ID: %s" % d.transport.pid
+                    try:
+                        _launch_tor_and_report_bridge_status(self.config,
+                                                             reactor,
+                                                             updates,
+                                                             self.tor_binary,
+                                                             bootstrap,
+                                                             self.config,
+                                                             setup_fail)
+                    except TimeoutError:
+                        log.msg("Adding %s to bad bridges..." % self.current_bridge)
+                        self.london_bridge.append(self.current_bridge)
+                    else:
+                        log.msg("Adding %s to good bridges..." % self.current_bridge)
+                        self.working_bridge.append(self.current_bridge)
+
+            d = defer.Deferred()
+            d.addCallback(log.msg, 'Working Bridges:\n%s\nUnreachable Bridges:\n%s\n'
+                          % (self.working_bridge, self.london_bridge))
             return d
 
+        def control(self, exp_res):
+            exp_res
+
 ## So that getPlugins() can register the Test:
 bridget = BridgetTest(None, None, None)
 
@@ -438,5 +467,3 @@ bridget = BridgetTest(None, None, None)
 ##
 ## FIX:
 ##       o  DataDirectory is not found, or permissions aren't right
-##       o  Bridge line needs generation of transport properties
-##              Bridge <transport> IP:ORPort <fingerprint>
diff --git a/ooni/utils/circuit.py b/ooni/utils/circuit.py
new file mode 100644
index 0000000..bb1ab11
--- /dev/null
+++ b/ooni/utils/circuit.py
@@ -0,0 +1,106 @@
+#
+# circuit.py
+# ----------
+# Utilities for working with Tor circuits.
+#
+# This code is largely taken from the txtorcon documentation, and as
+# such any and all credit should go to meejah.
+#
+# :author: Mike Warren, Isis Lovecruft
+# :license: see included license file
+# :version: 0.1.0-alpha
+#
+from zope.interface    import implements
+
+from ooni.lib.txtorcon import CircuitListenerMixin, IStreamAttacher
+
+
+class CustomCircuit(CircuitListenerMixin):
+    implements(IStreamAttacher)
+
+    from txtorcon.interface import IRouterContainer
+    from txtorcon.interface import ICircuitContainer
+
+    def __init__(self, state):
+        self.state = state
+        self.waiting_circuits = []
+
+    def waiting_on(self, circuit):
+        for (circid, d) in self.waiting_circuits:
+            if circuit.id == circid:
+                return true
+        return False
+
+    def circuit_extend(self, circuit, router):
+        "ICircuitListener"
+        if circuit.purpose != 'GENERAL':
+            return
+        if self.waiting_on(circuit):
+            log.msg("Circuit %d (%s)" 
+                    % (circuit.id, router.id_hex))
+
+    def circuit_built(self, circuit):
+        "ICircuitListener"
+        if circuit.purpose != 'GENERAL':
+            return
+        log.msg("Circuit %s built ..." % circuit.id)
+        log.msg("Full path of %s: %s" % (circuit.id, circuit.path))
+        for (circid, d) in self.waiting_circuits:
+            if circid == circuit.id:
+                self.waiting_circuits.remove(circid, d)
+                d.callback(circuit)
+
+    def circuit_failed(self, circuit, reason):
+        if self.waiting_on(circuit):
+            log.msg("A circuit we requested %s failed for reason %s" 
+                    % (circuit.id, reason))
+            circid, d = None, None
+            for x in self.waiting_circuits:
+                if x[0] == circuit.id:
+                    circid, d, stream_cc = x
+            if d is None:
+                raise Exception("Expected to find circuit.")
+
+            self.waiting_circuits.remove((circid, d))
+            log.msg("Trying to build a circuit for %s" % circid)
+            self.request_circuit_build(d)
+
+    def check_circuit_route(self, circuit, router):
+        if router in circuit.path:
+            #router.update() ## XXX can i use without args? no.
+            TorInfo.dump(self)
+
+    def request_circuit_build(self, deferred):
+        entries = self.state.entry_guards.value()
+        relays  = self.state.routers.values()
+        log.msg("We have these nodes listed as entry guards:") 
+        log.msg("%s" % entries)
+        log.msg("We have these nodes listed as relays:")
+        log.msg("%s" % relays)
+        path = [random.choice(entries),
+                random.choice(relays),
+                random.choice(relays)]
+        log.msg("Requesting a circuit: %s" 
+                % '-->'.join(map(lambda x: x.location.countrycode, 
+                                 path)))
+
+        class AppendWaiting:
+            def __init__(self, attacher, deferred):
+                self.attacher = attacher
+                self.d        = deferred
+
+            def __call__(self, circuit):
+                """
+                Return from build_circuit is a Circuit, however,
+                we want to wait until it is built before we can
+                issue an attach on it and callback to the Deferred
+                we issue here.
+                """
+                log.msg("Circuit %s is in progress ..." % circuit.id)
+                self.attacher.waiting_circuits.append((circuit.id, 
+                                                       self.d))
+
+        fin = self.state.build_circuit(path)
+        fin.addCallback(AppendWaiting(self, deferred_to_callback))
+        fin.addErrback(log.err)
+        return fin





More information about the tor-commits mailing list