[tor-commits] [ooni-probe/master] * Added singleton_semaphore utility function for safely adding processes to a

isis at torproject.org isis at torproject.org
Thu Oct 4 14:41:15 UTC 2012


commit 086c5d340d8abaea4d4e447690e26b0c619fcfea
Author: Isis Lovecruft <isis at torproject.org>
Date:   Tue Sep 25 13:56:01 2012 +0000

    * Added singleton_semaphore utility function for safely adding processes to a
      reactor.
    * Fixes #6969 (bridget tries to start multiple tor processes).
    * Fixes #6970 (bridget skips testing the first bridge).
---
 ooni/plugins/bridget.py |  100 +++++++++++++++++++++-------------------------
 ooni/utils/onion.py     |   67 ++++++++++++++++++++++++-------
 2 files changed, 97 insertions(+), 70 deletions(-)

diff --git a/ooni/plugins/bridget.py b/ooni/plugins/bridget.py
index 83c9ed6..31537ec 100644
--- a/ooni/plugins/bridget.py
+++ b/ooni/plugins/bridget.py
@@ -370,7 +370,9 @@ class BridgetTest(OONITest):
             The :class:`BridgetAsset` line currently being used.
         """
         try:
-            from ooni.utils.onion  import start_tor, CustomCircuit
+            from ooni.utils.onion  import start_tor, singleton_semaphore
+            from ooni.utils.onion  import setup_done, setup_fail
+            from ooni.utils.onion  import CustomCircuit
             from ooni.lib.txtorcon import TorConfig, TorState
         except ImportError:
             raise TxtorconImportError
@@ -507,54 +509,49 @@ class BridgetTest(OONITest):
 
 
         log.msg("Bridget: initiating test ... ")
-        #defer.setDebugging(True)
+        d = defer.Deferred
 
-        if self.bridges_remaining() > 0:
-            if not 'Bridge' in self.config.config:
-                self.config.Bridge = self.bridges.pop()
-
-                ## Necessary for avoiding starting several processes:
-                self.config.save()
-                assert self.config.config.has_key('Bridge'), "NO BRIDGE"
-
-                state = start_tor(self.reactor, self.config, self.control_port,
-                                  self.tor_binary, self.data_directory)
-                #state.addCallback(remove_public_relays, self.bridges)
-                #state.callback
-                #rm_public_relays = defer.Deferred()
-                #rm_public_relays.addCallback(remove_public_relays, 
-                #                             self.bridges)
-                #state.chainDeferred(rm_public_relays)
-                #state = defer.DeferredList([state])
-                from ooni.utils.onion import __setup_done__, __setup_fail__
-                state.addCallback(__setup_done__)
-                state.addErrback(__setup_fail__)
-                
+        if self.bridges_remaining() > 0 and not 'Bridge' in self.config.config:
+            self.config.Bridge = self.bridges.pop()
+
+            ## Necessary for avoiding starting several processes:
+            self.config.save()
+            assert self.config.config.has_key('Bridge'), "NO BRIDGE"
+
+            tor = start_tor(self.reactor, 
+                            self.config, 
+                            self.control_port, 
+                            self.tor_binary, 
+                            self.data_directory).addCallback(
+                setup_done).addErrback(
+                setup_fail)
+            self.tor_process_semaphore = True
+
+            run_once = d().addCallback(singleton_semaphore, tor)
+            run_once.addErrback(setup_fail)
 
-        ## XXX Should do something like: if state.iscomplete
-        #if 'Bridge' in self.config.config:
-        if state and 'Bridge' in self.config.config:
+            only_bridges = d().addCallback(remove_public_relays, self.bridges)
+
+        state = defer.gatherResults([run_once, only_bridges], consumeErrors=True)
+        log.debug("%s" % state.callbacks)
+
+        if self.bridges_remaining() > 0:
             all = []
             for bridge in self.bridges:
                 self.current_bridge = bridge
                 log.msg("We now have %d untested bridges..." 
                         % self.bridges_remaining())
-                #reconf = defer.Deferred()
-                #reconf.addCallback(reconfigure_bridge, state, 
-                #                   self.current_bridge, self.use_pt, 
-                #                   self.pt_type)
-                #reconf.addErrback(reconfigure_fail, state,
-                #                  self.current_bridge, self.bridges_down)
-                #state.chainDeferred(reconf)
-                #state.callback
-
-                reconf = reconfigure_bridge(state, 
-                                            self.current_bridge,
-                                            self.use_pt, 
-                                            self.pt_type)
-                all.append(reconf)
-            state.chainDeferred(defer.DeferredList(all))
-            #state.addCallback(defer.DeferredList(all))
+                reconf = d().addCallback(reconfigure_bridge, state, 
+                                         self.current_bridge,
+                                         self.use_pt, 
+                                         self.pt_type)
+                reconf.addCallback(reconfigure_done, self.current_bridge, 
+                                   self.bridges_up)
+                reconf.addErrback(reconfigure_fail, self.current_bridge, 
+                                  self.bridges_down)
+            all.append(reconf)
+        state.chainDeferred(defer.DeferredList(all))
+        log.debug("%s" % state.callbacks)
     
         if self.relays_remaining() > 0:
             while self.relays_remaining() >= 3:
@@ -568,23 +565,18 @@ class BridgetTest(OONITest):
                             self.relays_up.append(self.current_relay)
                     if len(circ.path) < 3:
                         try:
-                            parameters = (state.attacher, circ, 
-                                          self.current_relay)
-                            ext = attacher_extend_circuit(parameters)
+                            ext = attacher_extend_circuit(state.attacher, circ,
+                                                          self.current_relay)
                             ext.addCallback(attacher_extend_circuit_done, 
-                                            parameters)
+                                            state.attacher, circ, 
+                                            self.current_relay)
                         except Exception, e:
-                            log.msg("Extend circuit failed: %s %s" 
-                                    % (e, parameters))
+                            log.msg("Extend circuit failed: %s" % e)
                     else:
                         continue
 
-        #return state
-        ## still need to attach attacher to state
-        ## then build circuits
-
-
-        reactor.run()
+        #reactor.run()
+        return state
 
     def control(self, experiment_result, args):
         experiment_result.callback
diff --git a/ooni/utils/onion.py b/ooni/utils/onion.py
index 08f27d4..5f15e90 100644
--- a/ooni/utils/onion.py
+++ b/ooni/utils/onion.py
@@ -23,18 +23,18 @@ from twisted.internet  import defer
 from zope.interface    import implements
 
 
-def __setup_done__(proto):
-    log.msg("Setup Complete: %s" % proto)
+def setup_done(proto):
+    log.msg("Setup Complete")
     state = TorState(proto.tor_protocol)
-    state.post_bootstrap.addCallback(__state_complete__)
-    state.post_bootstrap.addErrback(__setup_fail__)
+    state.post_bootstrap.addCallback(state_complete)
+    state.post_bootstrap.addErrback(setup_fail)
 
-def __setup_fail__(proto):
+def setup_fail(proto):
     log.err("Setup Failed: %s" % proto)
     report.update({'setup_fail': proto})
     reactor.stop()
 
-def __state_complete__(state, bridge_list=None, relay_list=None):
+def state_complete(state):
     """Called when we've got a TorState."""
     log.msg("We've completely booted up a Tor version %s at PID %d"
             % (state.protocol.version, state.tor_pid))
@@ -44,14 +44,10 @@ def __state_complete__(state, bridge_list=None, relay_list=None):
     for circ in state.circuits.values():
         log.msg("%s" % circ)
 
-    if bridge_list is not None and relay_list is None:
-        return state, bridge_list
-    elif bridge_list is None and relay_list is not None:
-        raise NotImplemented
-    else:
-        return state, None
+    return state
 
-def __updates__(_progress, _tag, _summary):
+def updates(_progress, _tag, _summary):
+    """Log updates on the Tor bootstrapping process.""" 
     log.msg("%d%%: %s" % (_progress, _summary))
 
 def write_torrc(conf, data_dir=None):
@@ -59,6 +55,11 @@ def write_torrc(conf, data_dir=None):
     Create a torrc in our data_dir. If we don't yet have a data_dir, create a
     temporary one. Any temporary files or folders are added to delete_list.
     
+    :param conf:
+        A :class:`ooni.lib.txtorcon.TorConfig` object, with all configuration
+        values saved.
+    :param data_dir:
+        The Tor DataDirectory to use.
     :return: torrc, data_dir, delete_list
     """
     try:
@@ -78,6 +79,7 @@ def write_torrc(conf, data_dir=None):
     delete_list.append(torrc)
     write(fd, conf.create_torrc())
     close(fd)
+
     return torrc, data_dir, delete_list
 
 def delete_files_or_dirs(delete_list):
@@ -101,9 +103,41 @@ def delete_files_or_dirs(delete_list):
             rmtree(temp, ignore_errors=True)
 
 @defer.inlineCallbacks
+def singleton_semaphore(deferred_process_init, callbacks=[], errbacks=[]):
+    """
+    Initialize a process only once, and do not return until
+    that initialization is complete.
+
+    :param deferred_process_init:
+        A deferred which returns a connected process via
+        :meth:`twisted.internet.reactor.spawnProcess`.
+    :param callbacks:
+        A list of callback functions to add to the initialized processes'
+        deferred.
+    :param errbacks:
+        A list of errback functions to add to the initialized processes'
+        deferred.
+    :return:
+        The final state of the :param deferred_process_init: after the
+        callback chain has completed. This should be a fully initialized
+        process connected to a :class:`twisted.internet.reactor`.
+    """
+    assert type(callbacks) is list
+    assert type(errbacks) is list
+
+    for cb in callbacks:
+        deferred_process_init.addCallback(cb)
+    for eb in errbacks:
+        deferred_process_init.addErrback(eb)
+
+    only_once = defer.DeferredSemaphore(1)
+    singleton = yield only_once.run(deferred_process_init)
+    defer.returnValue(singleton)
+
+ at defer.inlineCallbacks
 def start_tor(reactor, config, control_port, tor_binary, data_dir,
-              report=None, progress=__updates__, process_cb=__setup_done__,
-              process_eb=__setup_fail__):
+              report=None, progress=updates, process_cb=setup_done,
+              process_eb=setup_fail):
     """
     Use a txtorcon.TorConfig() instance, config, to write a torrc to a
     tempfile in our DataDirectory, data_dir. If data_dir is None, a temp
@@ -139,7 +173,8 @@ def start_tor(reactor, config, control_port, tor_binary, data_dir,
         The function to errback to if 
         class:`ooni.lib.txtorcon.TorProcessProtocol` fails.
     :return:
-        A class:`ooni.lib.txtorcon.TorProcessProtocol` which callbacks with a
+        The result of the callback of a
+        class:`ooni.lib.txtorcon.TorProcessProtocol` which callbacks with a
         class:`txtorcon.TorControlProtocol` as .protocol.
     """
     try:





More information about the tor-commits mailing list