commit 086c5d340d8abaea4d4e447690e26b0c619fcfea Author: Isis Lovecruft isis@torproject.org Date: Tue Sep 25 13:56:01 2012 +0000
* Added singleton_semaphore utility function for safely adding processes to a reactor. * Fixes #6969 (bridget tries to start multiple tor processes). * Fixes #6970 (bridget skips testing the first bridge). --- ooni/plugins/bridget.py | 100 +++++++++++++++++++++------------------------- ooni/utils/onion.py | 67 ++++++++++++++++++++++++------- 2 files changed, 97 insertions(+), 70 deletions(-)
diff --git a/ooni/plugins/bridget.py b/ooni/plugins/bridget.py index 83c9ed6..31537ec 100644 --- a/ooni/plugins/bridget.py +++ b/ooni/plugins/bridget.py @@ -370,7 +370,9 @@ class BridgetTest(OONITest): The :class:`BridgetAsset` line currently being used. """ try: - from ooni.utils.onion import start_tor, CustomCircuit + from ooni.utils.onion import start_tor, singleton_semaphore + from ooni.utils.onion import setup_done, setup_fail + from ooni.utils.onion import CustomCircuit from ooni.lib.txtorcon import TorConfig, TorState except ImportError: raise TxtorconImportError @@ -507,54 +509,49 @@ class BridgetTest(OONITest):
log.msg("Bridget: initiating test ... ") - #defer.setDebugging(True) + d = defer.Deferred
- if self.bridges_remaining() > 0: - if not 'Bridge' in self.config.config: - self.config.Bridge = self.bridges.pop() - - ## Necessary for avoiding starting several processes: - self.config.save() - assert self.config.config.has_key('Bridge'), "NO BRIDGE" - - state = start_tor(self.reactor, self.config, self.control_port, - self.tor_binary, self.data_directory) - #state.addCallback(remove_public_relays, self.bridges) - #state.callback - #rm_public_relays = defer.Deferred() - #rm_public_relays.addCallback(remove_public_relays, - # self.bridges) - #state.chainDeferred(rm_public_relays) - #state = defer.DeferredList([state]) - from ooni.utils.onion import __setup_done__, __setup_fail__ - state.addCallback(__setup_done__) - state.addErrback(__setup_fail__) - + if self.bridges_remaining() > 0 and not 'Bridge' in self.config.config: + self.config.Bridge = self.bridges.pop() + + ## Necessary for avoiding starting several processes: + self.config.save() + assert self.config.config.has_key('Bridge'), "NO BRIDGE" + + tor = start_tor(self.reactor, + self.config, + self.control_port, + self.tor_binary, + self.data_directory).addCallback( + setup_done).addErrback( + setup_fail) + self.tor_process_semaphore = True + + run_once = d().addCallback(singleton_semaphore, tor) + run_once.addErrback(setup_fail)
- ## XXX Should do something like: if state.iscomplete - #if 'Bridge' in self.config.config: - if state and 'Bridge' in self.config.config: + only_bridges = d().addCallback(remove_public_relays, self.bridges) + + state = defer.gatherResults([run_once, only_bridges], consumeErrors=True) + log.debug("%s" % state.callbacks) + + if self.bridges_remaining() > 0: all = [] for bridge in self.bridges: self.current_bridge = bridge log.msg("We now have %d untested bridges..." % self.bridges_remaining()) - #reconf = defer.Deferred() - #reconf.addCallback(reconfigure_bridge, state, - # self.current_bridge, self.use_pt, - # self.pt_type) - #reconf.addErrback(reconfigure_fail, state, - # self.current_bridge, self.bridges_down) - #state.chainDeferred(reconf) - #state.callback - - reconf = reconfigure_bridge(state, - self.current_bridge, - self.use_pt, - self.pt_type) - all.append(reconf) - state.chainDeferred(defer.DeferredList(all)) - #state.addCallback(defer.DeferredList(all)) + reconf = d().addCallback(reconfigure_bridge, state, + self.current_bridge, + self.use_pt, + self.pt_type) + reconf.addCallback(reconfigure_done, self.current_bridge, + self.bridges_up) + reconf.addErrback(reconfigure_fail, self.current_bridge, + self.bridges_down) + all.append(reconf) + state.chainDeferred(defer.DeferredList(all)) + log.debug("%s" % state.callbacks)
if self.relays_remaining() > 0: while self.relays_remaining() >= 3: @@ -568,23 +565,18 @@ class BridgetTest(OONITest): self.relays_up.append(self.current_relay) if len(circ.path) < 3: try: - parameters = (state.attacher, circ, - self.current_relay) - ext = attacher_extend_circuit(parameters) + ext = attacher_extend_circuit(state.attacher, circ, + self.current_relay) ext.addCallback(attacher_extend_circuit_done, - parameters) + state.attacher, circ, + self.current_relay) except Exception, e: - log.msg("Extend circuit failed: %s %s" - % (e, parameters)) + log.msg("Extend circuit failed: %s" % e) else: continue
- #return state - ## still need to attach attacher to state - ## then build circuits - - - reactor.run() + #reactor.run() + return state
def control(self, experiment_result, args): experiment_result.callback diff --git a/ooni/utils/onion.py b/ooni/utils/onion.py index 08f27d4..5f15e90 100644 --- a/ooni/utils/onion.py +++ b/ooni/utils/onion.py @@ -23,18 +23,18 @@ from twisted.internet import defer from zope.interface import implements
-def __setup_done__(proto): - log.msg("Setup Complete: %s" % proto) +def setup_done(proto): + log.msg("Setup Complete") state = TorState(proto.tor_protocol) - state.post_bootstrap.addCallback(__state_complete__) - state.post_bootstrap.addErrback(__setup_fail__) + state.post_bootstrap.addCallback(state_complete) + state.post_bootstrap.addErrback(setup_fail)
-def __setup_fail__(proto): +def setup_fail(proto): log.err("Setup Failed: %s" % proto) report.update({'setup_fail': proto}) reactor.stop()
-def __state_complete__(state, bridge_list=None, relay_list=None): +def state_complete(state): """Called when we've got a TorState.""" log.msg("We've completely booted up a Tor version %s at PID %d" % (state.protocol.version, state.tor_pid)) @@ -44,14 +44,10 @@ def __state_complete__(state, bridge_list=None, relay_list=None): for circ in state.circuits.values(): log.msg("%s" % circ)
- if bridge_list is not None and relay_list is None: - return state, bridge_list - elif bridge_list is None and relay_list is not None: - raise NotImplemented - else: - return state, None + return state
-def __updates__(_progress, _tag, _summary): +def updates(_progress, _tag, _summary): + """Log updates on the Tor bootstrapping process.""" log.msg("%d%%: %s" % (_progress, _summary))
def write_torrc(conf, data_dir=None): @@ -59,6 +55,11 @@ def write_torrc(conf, data_dir=None): Create a torrc in our data_dir. If we don't yet have a data_dir, create a temporary one. Any temporary files or folders are added to delete_list.
+ :param conf: + A :class:`ooni.lib.txtorcon.TorConfig` object, with all configuration + values saved. + :param data_dir: + The Tor DataDirectory to use. :return: torrc, data_dir, delete_list """ try: @@ -78,6 +79,7 @@ def write_torrc(conf, data_dir=None): delete_list.append(torrc) write(fd, conf.create_torrc()) close(fd) + return torrc, data_dir, delete_list
def delete_files_or_dirs(delete_list): @@ -101,9 +103,41 @@ def delete_files_or_dirs(delete_list): rmtree(temp, ignore_errors=True)
@defer.inlineCallbacks +def singleton_semaphore(deferred_process_init, callbacks=[], errbacks=[]): + """ + Initialize a process only once, and do not return until + that initialization is complete. + + :param deferred_process_init: + A deferred which returns a connected process via + :meth:`twisted.internet.reactor.spawnProcess`. + :param callbacks: + A list of callback functions to add to the initialized processes' + deferred. + :param errbacks: + A list of errback functions to add to the initialized processes' + deferred. + :return: + The final state of the :param deferred_process_init: after the + callback chain has completed. This should be a fully initialized + process connected to a :class:`twisted.internet.reactor`. + """ + assert type(callbacks) is list + assert type(errbacks) is list + + for cb in callbacks: + deferred_process_init.addCallback(cb) + for eb in errbacks: + deferred_process_init.addErrback(eb) + + only_once = defer.DeferredSemaphore(1) + singleton = yield only_once.run(deferred_process_init) + defer.returnValue(singleton) + +@defer.inlineCallbacks def start_tor(reactor, config, control_port, tor_binary, data_dir, - report=None, progress=__updates__, process_cb=__setup_done__, - process_eb=__setup_fail__): + report=None, progress=updates, process_cb=setup_done, + process_eb=setup_fail): """ Use a txtorcon.TorConfig() instance, config, to write a torrc to a tempfile in our DataDirectory, data_dir. If data_dir is None, a temp @@ -139,7 +173,8 @@ def start_tor(reactor, config, control_port, tor_binary, data_dir, The function to errback to if class:`ooni.lib.txtorcon.TorProcessProtocol` fails. :return: - A class:`ooni.lib.txtorcon.TorProcessProtocol` which callbacks with a + The result of the callback of a + class:`ooni.lib.txtorcon.TorProcessProtocol` which callbacks with a class:`txtorcon.TorControlProtocol` as .protocol. """ try:
tor-commits@lists.torproject.org