commit f9f97ae03f85c4257ef26939d02f9dabc546ac8d Author: Isis Lovecruft isis@torproject.org Date: Tue Oct 2 09:01:43 2012 +0000
* Added pre-chained start_tor_filter_nodes() Tor utility. * Fixed the Deferred class decorator utility. * Added more Sphinx/pydoctor(markdownable) documentation. * Removed a couple unnecessary imports. --- ooni/plugins/bridget.py | 123 +++++++++----- ooni/utils/log.py | 8 +- ooni/utils/onion.py | 425 ++++++++++++++++++++++++++++++++++++++-------- ooni/utils/timer.py | 64 +++++++- 4 files changed, 495 insertions(+), 125 deletions(-)
diff --git a/ooni/plugins/bridget.py b/ooni/plugins/bridget.py index 538e2f3..92748c1 100644 --- a/ooni/plugins/bridget.py +++ b/ooni/plugins/bridget.py @@ -279,10 +279,11 @@ class BridgetTest(OONITest): """ try: from ooni.utils import process - from ooni.utils.onion import start_tor, remove_public_relays - from ooni.utils.onion import setup_done, setup_fail + from ooni.utils.onion import remove_public_relays, start_tor + from ooni.utils.onion import start_tor_filter_nodes + from ooni.utils.onion import setup_fail, setup_done from ooni.utils.onion import CustomCircuit - from ooni.utils.timer import timeout + from ooni.utils.timer import deferred_timeout, TimeoutError from ooni.lib.txtorcon import TorConfig, TorState except ImportError: raise TxtorconImportError @@ -291,12 +292,20 @@ class BridgetTest(OONITest): sys.exit()
def reconfigure_done(state, bridges): + """ + Append :ivar:`bridges['current']` to the list + :ivar:`bridges['up']. + """ log.msg("Reconfiguring with 'Bridge %s' successful" % bridges['current']) bridges['up'].append(bridges['current']) return state
def reconfigure_fail(state, bridges): + """ + Append :ivar:`bridges['current']` to the list + :ivar:`bridges['down']. + """ log.msg("Reconfiguring TorConfig with parameters %s failed" % state) bridges['down'].append(bridges['current']) @@ -307,8 +316,28 @@ class BridgetTest(OONITest): """ Rewrite the Bridge line in our torrc. If use of pluggable transports was specified, rewrite the line as: - Bridge <transport_type> <ip>:<orport> - Otherwise, rewrite in the standard form. + Bridge <transport_type> <IP>:<ORPort> + Otherwise, rewrite in the standard form: + Bridge <IP>:<ORPort> + + :param state: + A fully bootstrapped instance of + :class:`ooni.lib.txtorcon.TorState`. + :param bridges: + A dictionary of bridges containing the following keys: + + bridges['remaining'] :: A function returning and int for the + number of remaining bridges to test. + bridges['current'] :: A string containing the <IP>:<ORPort> + of the current bridge. + bridges['use_pt'] :: A boolean, True if we're testing + bridges with a pluggable transport; + False otherwise. + bridges['pt_type'] :: If :ivar:`bridges['use_pt'] is True, + this is a string containing the type + of pluggable transport to test. + :return: + :param:`state` """ log.msg("Current Bridge: %s" % bridges['current']) log.msg("We now have %d bridges remaining to test..." @@ -324,11 +353,11 @@ class BridgetTest(OONITest): raise PTNotFoundException
if controller_response == 'OK': - finish = reconfigure_done(state, bridges) + finish = yield reconfigure_done(state, bridges) else: log.err("SETCONF for %s responded with error:\n %s" % (bridges['current'], controller_response)) - finish = reconfigure_fail(state, bridges) + finish = yield reconfigure_fail(state, bridges)
defer.returnValue(finish)
@@ -365,51 +394,46 @@ class BridgetTest(OONITest): def state_attach_fail(state): log.err("Attaching custom circuit builder failed: %s" % state)
- log.msg("Bridget: initiating test ... ") ## Start the experiment
+ ## if we've at least one bridge, and our config has no 'Bridge' line if self.bridges['remaining']() >= 1 \ and not 'Bridge' in self.config.config: + + ## configure our first bridge line self.bridges['current'] = self.bridges['all'][0] self.config.Bridge = self.bridges['current'] ## avoid starting several self.config.save() ## processes - assert self.config.config.has_key('Bridge'), "NO BRIDGE" - - state = timeout(self.circuit_timeout)(start_tor( - reactor, self.config, self.control_port, - self.tor_binary, self.data_directory)) - state.addCallbacks(setup_done, setup_fail) - state.addCallback(remove_public_relays, self.bridges) - - #controller = singleton_semaphore(bootstrap) - #controller = x().addCallback(singleton_semaphore, tor) + assert self.config.config.has_key('Bridge'), "No Bridge Line" + + ## start tor and remove bridges which are public relays + from ooni.utils.onion import start_tor_filter_nodes + state = start_tor_filter_nodes(reactor, self.config, + self.control_port, self.tor_binary, + self.data_directory, self.bridges) + #controller = defer.Deferred() + #controller.addCallback(singleton_semaphore, tor) #controller.addErrback(setup_fail) + #bootstrap = defer.gatherResults([controller, filter_bridges], + # consumeErrors=True)
- #filter_bridges = remove_public_relays(self.bridges) - - #bootstrap = defer.gatherResults([controller, filter_bridges], - # consumeErrors=True) - log.debug("Current callbacks on TorState():\n%s" % state.callbacks) - log.debug("TorState():\n%s" % state) + if state is not None: + log.debug("state:\n%s" % state) + log.debug("Current callbacks on TorState():\n%s" + % state.callbacks)
+ ## if we've got more bridges if self.bridges['remaining']() >= 2: - all = [] + #all = [] for bridge in self.bridges['all'][1:]: self.bridges['current'] = bridge #new = defer.Deferred() - new = defer.waitForDeferred(state) - new.addCallback(reconfigure_bridge, state, self.bridges) - all.append(new) - - #state.chainDeferred(defer.DeferredList(all)) - #state.chainDeferred(defer.gatherResults(all, consumeErrors=True)) - check_remaining = defer.DeferredList(all, consumeErrors=True) - - #controller.chainDeferred(check_remaining) - #log.debug("Current callbacks on TorState():\n%s" - # % controller.callbacks) - state.chainDeferred(check_remaining) + #new.addCallback(reconfigure_bridge, state, self.bridges) + #all.append(new) + #check_remaining = defer.DeferredList(all, consumeErrors=True) + #state.chainDeferred(check_remaining) + state.addCallback(reconfigure_bridge, self.bridges)
if self.relays['remaining']() > 0: while self.relays['remaining']() >= 3: @@ -434,16 +458,26 @@ class BridgetTest(OONITest): continue
#state.callback(all) - self.reactor.run() + #self.reactor.run() return state
def startTest(self, args): + """ + Local override of :meth:`OONITest.startTest` to bypass calling + self.control. + + :param args: + The current line of :class:`Asset`, not used but kept for + compatibility reasons. + :return: + A fired deferred which callbacks :meth:`experiment` and + :meth:`OONITest.finished`. + """ self.start_time = date.now() - self.laboratory = defer.Deferred() - self.laboratory.addCallbacks(self.experiment, errback=log.err) - self.laboratory.addCallbacks(self.finished, errback=log.err) - self.laboratory.callback(args) - return self.laboratory + self.d = self.experiment(args) + self.d.addErrback(log.err) + self.d.addCallbacks(self.finished, log.err) + return self.d
## So that getPlugins() can register the Test: bridget = BridgetTest(None, None, None) @@ -453,11 +487,10 @@ bridget = BridgetTest(None, None, None) ## ----------- ## ## TODO: -## o cleanup documentation +## x cleanup documentation ## x add DataDirectory option ## x check if bridges are public relays ## o take bridge_desc file as input, also be able to give same ## format as output ## x Add asynchronous timeout for deferred, so that we don't wait ## forever for bridges that don't work. -## o Add mechanism for testing through another host diff --git a/ooni/utils/log.py b/ooni/utils/log.py index 54c59ea..fe737f5 100644 --- a/ooni/utils/log.py +++ b/ooni/utils/log.py @@ -36,9 +36,9 @@ class OONITestFailure(Failure): Can be given an Exception as an argument, else will use the most recent Exception from the current stack frame. """ - def __init__(self, exception=None, _type=None, + def __init__(self, _type=None, _traceback=None, _capture=False): - Failure.__init__(self, exc_value=exception, exc_type=_type, + Failure.__init__(self, exc_type=_type, exc_tb=_traceback, captureVars=_capture)
class OONILogObserver(log.FileLogObserver): @@ -96,7 +96,7 @@ def msg(message, level="info", **kw): log.msg(message, logLevel=level, **kw)
def err(message, level="err", **kw): - log.err(message, logLevel=level, **kw) + log.err(logLevel=level, **kw)
-def fail(message, exception=None, level="crit", **kw): +def fail(message, exception, level="crit", **kw): log.failure(message, OONITestFailure(exception, **kw), logLevel=level) diff --git a/ooni/utils/onion.py b/ooni/utils/onion.py index 6f6b355..3326c75 100644 --- a/ooni/utils/onion.py +++ b/ooni/utils/onion.py @@ -16,53 +16,22 @@ # XXX TODO add report keys for onion methods
import random +import sys
-from ooni.lib.txtorcon import CircuitListenerMixin, IStreamAttacher -from ooni.lib.txtorcon import TorState -from ooni.utils import log from twisted.internet import defer from zope.interface import implements
+from ooni.lib.txtorcon import CircuitListenerMixin, IStreamAttacher +from ooni.lib.txtorcon import TorState, TorConfig +from ooni.utils import log +from ooni.utils.timer import deferred_timeout, TimeoutError
-def setup_done(proto): - log.msg("Setup Complete") - state = TorState(proto.tor_protocol) - state.post_bootstrap.addCallback(state_complete) - state.post_bootstrap.addErrback(setup_fail) - -def setup_fail(proto): - log.err("Setup Failed: %s" % proto) - #report.update({'setup_fail': proto}) - reactor.stop() - -def state_complete(state): - """Called when we've got a TorState.""" - log.msg("We've completely booted up a Tor version %s at PID %d" - % (state.protocol.version, state.tor_pid)) - log.msg("This Tor has the following %d Circuits:" - % len(state.circuits)) - for circ in state.circuits.values(): - log.msg("%s" % circ) - return state - -def updates(_progress, _tag, _summary): - """Log updates on the Tor bootstrapping process.""" - log.msg("%d%%: %s" % (_progress, _summary)) - -def bootstrap(ctrl): - """ - Bootstrap Tor from an instance of - :class:`ooni.lib.txtorcon.TorControlProtocol`. - """ - conf = TorConfig(ctrl) - conf.post_bootstrap.addCallback(setup_done).addErrback(setup_fail) - log.msg("Tor process connected, bootstrapping ...")
def parse_data_dir(data_dir): """ Parse a string that a has been given as a DataDirectory and determine its absolute path on the filesystem. - + :param data_dir: A directory for Tor's DataDirectory, to be parsed. :return: @@ -90,7 +59,7 @@ def parse_data_dir(data_dir): assert path.isdir(data_dir), "Could not find %s" % data_dir except AssertionError, ae: log.err(ae) - sys.exit(1) + sys.exit() else: return data_dir
@@ -177,18 +146,55 @@ def remove_public_relays(state, bridges): if len(both) > 0: try: updated = map(lambda node: remove_node_from_list(node), both) - if not updated: - defer.returnValue(state) - else: - defer.returnValue(state) + log.debug("Bridges in both: %s" % both) + log.debug("Updated = %s" % updated) + #if not updated: + # defer.returnValue(state) + #else: + # defer.returnValue(state) + return state except Exception, e: log.err("Removing public relays %s from bridge list failed:\n%s" % (both, e))
-#@defer.inlineCallbacks +def setup_done(proto): + log.msg("Setup Complete") + state = TorState(proto.tor_protocol) + state.post_bootstrap.addCallback(state_complete) + state.post_bootstrap.addErrback(setup_fail) + + +def setup_fail(proto): + log.msg("Setup Failed:\n%s" % proto) + return proto + #reactor.stop() + +def state_complete(state): + """Called when we've got a TorState.""" + log.msg("We've completely booted up a Tor version %s at PID %d" + % (state.protocol.version, state.tor_pid)) + log.msg("This Tor has the following %d Circuits:" + % len(state.circuits)) + for circ in state.circuits.values(): + log.msg("%s" % circ) + return state + +def updates(_progress, _tag, _summary): + """Log updates on the Tor bootstrapping process.""" + log.msg("%d%%: %s" % (_progress, _summary)) + +def bootstrap(ctrl): + """ + Bootstrap Tor from an instance of + :class:`ooni.lib.txtorcon.TorControlProtocol`. + """ + conf = TorConfig(ctrl) + conf.post_bootstrap.addCallback(setup_done).addErrback(setup_fail) + log.msg("Tor process connected, bootstrapping ...") + def start_tor(reactor, config, control_port, tor_binary, data_dir, - report=None, progress=updates, process_cb=setup_done, - process_eb=setup_fail): + report=None, progress=updates, + process_cb=None, process_eb=None): """ Use a txtorcon.TorConfig() instance, config, to write a torrc to a tempfile in our DataDirectory, data_dir. If data_dir is None, a temp @@ -212,7 +218,7 @@ def start_tor(reactor, config, control_port, tor_binary, data_dir, :param data_dir: The directory to use as Tor's DataDirectory. :param report: - The class:`ooni.plugoo.reports.Report` instance to .update(). + The class:`ooni.plugoo.reports.Report` instance. :param progress: A non-blocking function to handle bootstrapping updates, which takes three parameters: _progress, _tag, and _summary. @@ -252,40 +258,179 @@ def start_tor(reactor, config, control_port, tor_binary, data_dir, process_protocol = TorProcessProtocol(connection_creator, progress) process_protocol.to_delete = to_delete
+ if process_cb is not None and process_eb is not None: + process_protocol.connected_cb.addCallbacks(process_cb, process_eb) + reactor.addSystemEventTrigger('before', 'shutdown', partial(delete_files_or_dirs, to_delete)) - #try: - # transport = yield reactor.spawnProcess(process_protocol, - # tor_binary, - # args=(tor_binary,'-f',torrc), - # env={'HOME': data_dir}, - # path=data_dir) - # if transport: - # transport.closeStdin() - #except RuntimeError as e: - # log.err("Starting Tor failed: %s" % e) - # process_protocol.connected_cb.errback(e) - #except NotImplementedError, e: - # url = "http://starship.python.net/crew/mhammond/win32/Downloads.html" - # log.err("Running bridget on Windows requires pywin32: %s" % url) - # process_protocol.connected_cb.errback(e) try: - transport = reactor.spawnProcess(process_protocol, - tor_binary, + transport = reactor.spawnProcess(process_protocol, + tor_binary, args=(tor_binary,'-f',torrc), - env={'HOME':data_dir}, + env={'HOME': data_dir}, path=data_dir) transport.closeStdin() except RuntimeError, e: - log.err(e) - process_protocol.connected_cb.errback(e) + log.err("Starting Tor failed:") + process_protocol.connected_cb.errback(e) + except NotImplementedError, e: + url = "http://starship.python.net/crew/mhammond/win32/Downloads.html" + log.msg("Running bridget on Windows requires pywin32: %s" % url) + process_protocol.connected_cb.errback(e)
return process_protocol.connected_cb
- #d = yield process_protocol.connected_cb - #defer.returnValue(d) +@defer.inlineCallbacks +def start_tor_filter_nodes(reactor, config, control_port, tor_binary, + data_dir, bridges): + """ + Bootstrap a Tor process and return a fully-setup + :class:`ooni.lib.txtorcon.TorState`. Then search for our bridges + to test in the list of known public relays, + :ivar:`ooni.lib.txtorcon.TorState.routers`, and remove any bridges + which are known public relays. + + :param reactor: + The :class:`twisted.internet.reactor`. + :param config: + An instance of :class:`ooni.lib.txtorcon.TorConfig`. + :param control_port: + The port to use for Tor's ControlPort. If already configured in + the TorConfig instance, this can be given as + TorConfig.config.ControlPort. + :param tor_binary: + The full path to the Tor binary to execute. + :param data_dir: + The full path to the directory to use as Tor's DataDirectory. + :param bridges: + A dictionary which has a key 'all' which is a list of bridges to + test connecting to, e.g.: + bridges['all'] = ['1.1.1.1:443', '22.22.22.22:9001'] + :return: + A fully initialized :class:`ooni.lib.txtorcon.TorState`. + """ + setup = yield start_tor(reactor, config, control_port, + tor_binary, data_dir, + process_cb=setup_done, process_eb=setup_fail) + filter_nodes = yield remove_public_relays(setup, bridges) + defer.returnValue(filter_nodes) + +@defer.inlineCallbacks +def start_tor_with_timer(reactor, config, control_port, tor_binary, data_dir, + bridges, timeout): + """ + Start bootstrapping a Tor process wrapped with an instance of the class + decorator :func:`ooni.utils.timer.deferred_timeout` and complete callbacks + to either :func:`setup_done` or :func:`setup_fail`. Return a fully-setup + :class:`ooni.lib.txtorcon.TorState`. Then search for our bridges to test + in the list of known public relays, + :ivar:`ooni.lib.txtorcon.TorState.routers`, and remove any bridges which + are listed as known public relays. + + :param reactor: + The :class:`twisted.internet.reactor`. + :param config: + An instance of :class:`ooni.lib.txtorcon.TorConfig`. + :param control_port: + The port to use for Tor's ControlPort. If already configured in + the TorConfig instance, this can be given as + TorConfig.config.ControlPort. + :param tor_binary: + The full path to the Tor binary to execute. + :param data_dir: + The full path to the directory to use as Tor's DataDirectory. + :param bridges: + A dictionary which has a key 'all' which is a list of bridges to + test connecting to, e.g.: + bridges['all'] = ['1.1.1.1:443', '22.22.22.22:9001'] + :param timeout: + The number of seconds to attempt to bootstrap the Tor process before + raising a :class:`ooni.utils.timer.TimeoutError`. + :return: + If the timeout limit is not exceeded, return a fully initialized + :class:`ooni.lib.txtorcon.TorState`, else return None. + """ + error_msg = "Bootstrapping has exceeded the timeout limit..." + with_timeout = deferred_timeout(timeout, e=error_msg)(start_tor) + try: + setup = yield with_timeout(reactor, config, control_port, tor_binary, + data_dir, process_cb=setup_done, + process_eb=setup_fail) + except TimeoutError, te: + log.err(te) + defer.returnValue(None) + #except Exception, e: + # log.err(e) + # defer.returnValue(None) + else: + state = yield remove_public_relays(setup, bridges) + defer.returnValue(state) + +@defer.inlineCallbacks +def start_tor_filter_nodes_with_timer(reactor, config, control_port, + tor_binary, data_dir, bridges, timeout): + """ + Start bootstrapping a Tor process wrapped with an instance of the class + decorator :func:`ooni.utils.timer.deferred_timeout` and complete callbacks + to either :func:`setup_done` or :func:`setup_fail`. Then, filter our list + of bridges to remove known public relays by calling back to + :func:`remove_public_relays`. Return a fully-setup + :class:`ooni.lib.txtorcon.TorState`. Then search for our bridges to test + in the list of known public relays, + :ivar:`ooni.lib.txtorcon.TorState.routers`, and remove any bridges which + are listed as known public relays. + + :param reactor: + The :class:`twisted.internet.reactor`. + :param config: + An instance of :class:`ooni.lib.txtorcon.TorConfig`. + :param control_port: + The port to use for Tor's ControlPort. If already configured in + the TorConfig instance, this can be given as + TorConfig.config.ControlPort. + :param tor_binary: + The full path to the Tor binary to execute. + :param data_dir: + The full path to the directory to use as Tor's DataDirectory. + :param bridges: + A dictionary which has a key 'all' which is a list of bridges to + test connecting to, e.g.: + bridges['all'] = ['1.1.1.1:443', '22.22.22.22:9001'] + :param timeout: + The number of seconds to attempt to bootstrap the Tor process before + raising a :class:`ooni.utils.timer.TimeoutError`. + :return: + If the timeout limit is not exceeded, return a fully initialized + :class:`ooni.lib.txtorcon.TorState`, else return None. + """ + error_msg = "Bootstrapping has exceeded the timeout limit..." + with_timeout = deferred_timeout(timeout, e=error_msg)(start_tor_filter_nodes) + try: + state = yield with_timeout(reactor, config, control_port, + tor_binary, data_dir, bridges) + except TimeoutError, te: + log.err(te) + defer.returnValue(None) + #except Exception, e: + # log.err(e) + # defer.returnValue(None) + else: + defer.returnValue(state)
class CustomCircuit(CircuitListenerMixin): + """ + Utility class for controlling circuit building. See + 'attach_streams_by_country.py' in the txtorcon documentation. + + :param state: + A fully bootstrapped instance of :class:`ooni.lib.txtorcon.TorState`. + :param relays: + A dictionary containing a key 'all', which is a list of relays to + test connecting to. + :ivar waiting_circuits: + The list of circuits which we are waiting to attach to. You shouldn't + need to touch this. + """ implements(IStreamAttacher)
def __init__(self, state, relays=None): @@ -294,6 +439,15 @@ class CustomCircuit(CircuitListenerMixin): self.relays = relays
def waiting_on(self, circuit): + """ + Whether or not we are waiting on the given circuit before attaching to + it. + + :param circuit: + An item from :ivar:`ooni.lib.txtorcon.TorState.circuits`. + :return: + True if we are waiting on the circuit, False if not waiting. + """ for (circid, d) in self.waiting_circuits: if circuit.id == circid: return True @@ -318,6 +472,17 @@ class CustomCircuit(CircuitListenerMixin): d.callback(circuit)
def circuit_failed(self, circuit, reason): + """ + If building a circuit has failed, try to remove it from our list of + :ivar:`waiting_circuits`, else request to build it. + + :param circuit: + An item from :ivar:`ooni.lib.txtorcon.TorState.circuits`. + :param reason: + A :class:`twisted.python.fail.Failure` instance. + :return: + None + """ if self.waiting_on(circuit): log.msg("Circuit %s failed for reason %s" % (circuit.id, reason)) circid, d = None, None @@ -335,22 +500,51 @@ class CustomCircuit(CircuitListenerMixin): """ Check if a relay is a hop in one of our already built circuits.
+ :param router: + An item from the list + :func:`ooni.lib.txtorcon.TorState.routers.values()`. """ for circ in self.state.circuits.values(): - if router in circuit.path: + if router in circ.path: #router.update() ## XXX can i use without args? no. TorInfo.dump(self)
def request_circuit_build(self, deferred, path=None): + """ + Request a custom circuit. + + :param deferred: + A :class:`twisted.internet.defer.Deferred` for this circuit. + :param path: + A list of router ids to build a circuit from. The length of this + list must be at least three. + """ if path is None: - if self.state.relays_remaining() > 0: - first, middle,last = (self.state.relays.pop() - for i in range(3)) + + pick = self.relays['all'].pop + n = self.state.entry_guards.values() + choose = random.choice + + first, middle, last = (None for i in range(3)) + + if self.relays['remaining']() >= 3: + first, middle, last = (pick() for i in range(3)) + elif self.relays['remaining']() < 3: + first = choose(n) + middle = pick() + if self.relays['remaining'] == 2: + middle, last = (pick() for i in range(2)) + elif self.relay['remaining'] == 1: + middle = pick() + last = choose(n) + else: + log.msg("Qu'est-que fuque?") else: - first = random.choice(self.state.entry_guards.values()) middle, last = (random.choice(self.state.routers.values()) for i in range(2)) + path = [first, middle, last] + else: assert isinstance(path, list), \ "Circuit path must be a list of relays!" @@ -379,7 +573,11 @@ class CustomCircuit(CircuitListenerMixin): log.err)
class TxtorconImportError(ImportError): - """Raised when /ooni/lib/txtorcon cannot be imported from.""" + """ + Raised when ooni.lib.txtorcon cannot be imported from. Checks our current + working directory and the path given to see if txtorcon has been + initialized via /ooni/lib/Makefile. + """ from os import getcwd, path
cwd, tx = getcwd(), 'lib/txtorcon/torconfig.py' @@ -410,3 +608,82 @@ class PTNotFoundException(Exception): log.msg("%s" % m) return sys.exit()
+@defer.inlineCallbacks +def __start_tor_with_timer__(reactor, config, control_port, tor_binary, + data_dir, bridges=None, relays=None, timeout=None, + retry=None): + """ + A wrapper for :func:`start_tor` which wraps the bootstrapping of a Tor + process and its connection to a reactor with a + :class:`twisted.internet.defer.Deferred` class decorator utility, + :func:`ooni.utils.timer.deferred_timeout`, and a mechanism for resets. + + ## XXX fill me in + """ + raise NotImplementedError + + class RetryException(Exception): + pass + + import sys + from ooni.utils.timer import deferred_timeout, TimeoutError + + def __make_var__(old, default, _type): + if old is not None: + assert isinstance(old, _type) + new = old + else: + new = default + return new + + reactor = reactor + timeout = __make_var__(timeout, 120, int) + retry = __make_var__(retry, 1, int) + + with_timeout = deferred_timeout(timeout)(start_tor) + + @defer.inlineCallbacks + def __start_tor__(rc=reactor, cf=config, cp=control_port, tb=tor_binary, + dd=data_dir, br=bridges, rl=relays, cb=setup_done, + eb=setup_fail, af=remove_public_relays, retry=retry): + try: + setup = yield with_timeout(rc,cf,cp,tb,dd) + except TimeoutError: + retry -= 1 + defer.returnValue(retry) + else: + if setup.callback: + setup = yield cb(setup) + elif setup.errback: + setup = yield eb(setup) + else: + setup = setup + + if br is not None: + state = af(setup,br) + else: + state = setup + defer.returnValue(state) + + @defer.inlineCallbacks + def __try_until__(tries): + result = yield __start_tor__() + try: + assert isinstance(result, int) + except AssertionError: + defer.returnValue(result) + else: + if result >= 0: + tried = yield __try_until__(result) + defer.returnValue(tried) + else: + raise RetryException + try: + tried = yield __try_until__(retry) + except RetryException: + log.msg("All retry attempts to bootstrap Tor have timed out.") + log.msg("Exiting ...") + defer.returnValue(sys.exit()) + else: + defer.returnValue(tried) + diff --git a/ooni/utils/timer.py b/ooni/utils/timer.py index 24074f9..e03fd74 100644 --- a/ooni/utils/timer.py +++ b/ooni/utils/timer.py @@ -17,17 +17,24 @@ def timeout(seconds, e=None): """ A decorator for blocking methods to cause them to timeout. Can be used like this: + @timeout(30) - def foo(): + def foo(arg1, kwarg="baz"): for x in xrange(1000000000): + print "%s %s" % (arg1, kwarg) print x + or like this: - ridiculous = timeout(30)(foo) + + ridiculous = timeout(30)(foo("bar"))
:param seconds: Number of seconds to wait before raising :class:`TimeoutError`. :param e: Error message to pass to :class:`TimeoutError`. Default None. + :return: + The result of the original function, or else an instance of + :class:`TimeoutError`. """ from signal import alarm, signal, SIGALRM from functools import wraps @@ -45,3 +52,56 @@ def timeout(seconds, e=None): return res return wraps(func)(wrapper) return decorator + +def deferred_timeout(seconds, e=None): + """ + Decorator for adding a timeout to an instance of a + :class:`twisted.internet.defer.Deferred`. Can be used like this: + + @deferred_timeout(30) + def foo(arg1, kwarg="baz"): + for x in xrange(1000000000): + print "%s %s" % (arg1, kwarg) + print x + + or like this: + + ridiculous = deferred_timeout(30)(foo("bar")) + + :param seconds: + Number of seconds to wait before raising :class:`TimeoutError`. + :param e: + Error message to pass to :class:`TimeoutError`. Default None. + :return: + The result of the orginal :class:`twisted.internet.defer.Deferred` + or else a :class:`TimeoutError`. + """ + from twisted.internet import defer, reactor + + def wrapper(func): + @defer.inlineCallbacks + def _timeout(*args, **kwargs): + d_original = func(*args, **kwargs) + if not isinstance(d_original, defer.Deferred): + defer.returnValue(d_original) ## fail gracefully + d_timeout = defer.Deferred() + timeup = reactor.callLater(seconds, d_timeout.callback, None) + try: + original_result, timeout_result = \ + yield defer.DeferredList([d_original, d_timeout], + fireOnOneCallback=True, + fireOnOneErrback=True, + consumeErrors=True) + except defer.FirstError, dfe: + assert dfe.index == 0 ## error in original + timeup.cancel() + dfe.subFailure.raiseException() + else: + if d_timeout.called: ## timeout + d_original.cancel() + raise TimeoutError, e + timeup.cancel() ## no timeout + defer.returnValue(d_original) + return _timeout + return wrapper +
tor-commits@lists.torproject.org