commit f716fedc97ea3cc522571ad8bd505e847379c0d7 Author: Damian Johnson atagar@torproject.org Date: Sun Dec 30 18:59:11 2012 -0800
Allow for blocking circuit build and extension
Callers of new_circuit() and extend_circuit() will often (maybe usually?) want their call to block until the circuit is actually ready to be used.
Unfortunately this is a little tricky for them to hack together on their own since it requires a listener (polling doesn't work reliably - after a couple of hours of hair pulling I now know that all too well). Hence doing this work for them.
This includes a CircuitExtensionFailed exception so we can communicate the circuit event that reported the failure. --- stem/__init__.py | 12 +++++ stem/control.py | 102 ++++++++++++++++++++++++++----------- test/integ/control/controller.py | 21 +------- 3 files changed, 86 insertions(+), 49 deletions(-)
diff --git a/stem/__init__.py b/stem/__init__.py index e17dc0f..2482299 100644 --- a/stem/__init__.py +++ b/stem/__init__.py @@ -9,6 +9,7 @@ Library for working with the tor process. |- ProtocolError - Malformed socket data. |- OperationFailed - Tor was unable to successfully complete the operation. | |- UnsatisfiableRequest - Tor was unable to satisfy a valid request. + | | +- CircuitExtensionFailed - Attempt to make or extend a circuit failed. | +- InvalidRequest - Invalid request. | +- InvalidArguments - Invalid request parameters. +- SocketError - Communication with the socket failed. @@ -421,6 +422,17 @@ class UnsatisfiableRequest(OperationFailed): Exception raised if Tor was unable to process our request. """
+class CircuitExtensionFailed(UnsatisfiableRequest): + """ + An attempt to create or extend a circuit failed. + + :var stem.response.CircuitEvent circ: response notifying us of the failure + """ + + def __init__(self, message, circ = None): + super(CircuitExtensionFailed, self).__init__(message = message) + self.circ = circ + class InvalidRequest(OperationFailed): """ Exception raised when the request was invalid or malformed. diff --git a/stem/control.py b/stem/control.py index efe1403..14d636e 100644 --- a/stem/control.py +++ b/stem/control.py @@ -140,6 +140,8 @@ import stem.util.connection import stem.util.enum import stem.version
+from stem import CircStatus + from stem.util import log import stem.util.tor_tools
@@ -1537,20 +1539,23 @@ class Controller(BaseController): if default == UNDEFINED: raise exc else: return default
- def new_circuit(self, path = None, purpose = "general"): + def new_circuit(self, path = None, purpose = "general", await_build = False): """ Requests a new circuit. If the path isn't provided, one is automatically selected.
:param list,str path: one or more relays to make a circuit through :param str purpose: "general" or "controller" + :param bool await_build: blocks until the circuit is built if **True**
:returns: str of the circuit id of the newly created circuit + + :raises: :class:`stem.ControllerError` if the call fails """
- return self.extend_circuit('0', path, purpose) + return self.extend_circuit('0', path, purpose, await_build)
- def extend_circuit(self, circuit_id = "0", path = None, purpose = "general"): + def extend_circuit(self, circuit_id = "0", path = None, purpose = "general", await_build = False): """ Either requests the creation of a new circuit or extends an existing one.
@@ -1561,7 +1566,7 @@ class Controller(BaseController): A python interpreter session used to create circuits could look like this...
:: - + >>> control.extend_circuit('0', ["718BCEA286B531757ACAFF93AE04910EA73DE617", "30BAB8EE7606CBD12F3CC269AE976E0153E7A58D", "2765D8A8C4BBA3F89585A9FFE0E8575615880BEB"]) 19 >>> control.extend_circuit('0') @@ -1574,41 +1579,76 @@ class Controller(BaseController): :param list,str path: one or more relays to make a circuit through, this is required if the circuit id is non-zero :param str purpose: "general" or "controller" + :param bool await_build: blocks until the circuit is built if **True**
:returns: str of the circuit id of the created or extended circuit
- :raises: :class:`stem.InvalidRequest` if one of the parameters were invalid + :raises: + :class:`stem.InvalidRequest` if one of the parameters were invalid + :class:`stem.CircuitExtensionFailed` if we were waiting for the circuit + to build but it failed + :class:`stem.ControllerError` if the call fails """
- # we might accidently get integer circuit ids - circuit_id = str(circuit_id) - - if path is None and circuit_id == '0': - path_opt_version = stem.version.Requirement.EXTENDCIRCUIT_PATH_OPTIONAL - - if not self.get_version().meets_requirements(path_opt_version): - raise stem.InvalidRequest(512, "EXTENDCIRCUIT requires the path prior to version %s" % path_opt_version) + # Attaches a temporary listener for CIRC events if we'll be waiting for it + # to build. This is icky, but we can't reliably do this via polling since + # we then can't get the failure if it can't be created.
- args = [circuit_id] - if type(path) == str: path = [path] - if path: args.append(",".join(path)) - if purpose: args.append("purpose=%s" % purpose) + circ_queue, circ_listener = None, None
- response = self.msg("EXTENDCIRCUIT %s" % " ".join(args)) - stem.response.convert("SINGLELINE", response) - - if response.is_ok(): - try: - extended, new_circuit = response.message.split(" ") - assert extended == "EXTENDED" - except: - raise stem.ProtocolError("EXTENDCIRCUIT response invalid:\n%s", str(response)) - elif response.code in ('512', '552'): - raise stem.InvalidRequest(response.code, response.message) - else: - raise stem.ProtocolError("EXTENDCIRCUIT returned unexpected response code: %s" % response.code) + if await_build: + circ_queue = Queue.Queue() + + def circ_listener(event): + circ_queue.put(event) + + self.add_event_listener(circ_listener, EventType.CIRC)
- return new_circuit + try: + # we might accidently get integer circuit ids + circuit_id = str(circuit_id) + + if path is None and circuit_id == '0': + path_opt_version = stem.version.Requirement.EXTENDCIRCUIT_PATH_OPTIONAL + + if not self.get_version().meets_requirements(path_opt_version): + raise stem.InvalidRequest(512, "EXTENDCIRCUIT requires the path prior to version %s" % path_opt_version) + + args = [circuit_id] + if type(path) == str: path = [path] + if path: args.append(",".join(path)) + if purpose: args.append("purpose=%s" % purpose) + + response = self.msg("EXTENDCIRCUIT %s" % " ".join(args)) + stem.response.convert("SINGLELINE", response) + + if response.is_ok(): + try: + extended, new_circuit = response.message.split(" ") + assert extended == "EXTENDED" + except: + raise stem.ProtocolError("EXTENDCIRCUIT response invalid:\n%s", str(response)) + elif response.code in ('512', '552'): + raise stem.InvalidRequest(response.code, response.message) + else: + raise stem.ProtocolError("EXTENDCIRCUIT returned unexpected response code: %s" % response.code) + + if await_build: + while True: + circ = circ_queue.get() + + if circ.id == new_circuit: + if circ.status == CircStatus.BUILT: + break + elif circ.status == CircStatus.FAILED: + raise stem.CircuitExtensionFailed("Circuit failed to be created: %s" % circ.reason, circ) + elif circ.status == CircStatus.CLOSED: + raise stem.CircuitExtensionFailed("Circuit was closed prior to build", circ) + + return new_circuit + finally: + if circ_listener: + self.remove_event_listener(circ_listener)
def repurpose_circuit(self, circuit_id, purpose): """ diff --git a/test/integ/control/controller.py b/test/integ/control/controller.py index fb14120..c8055ab 100644 --- a/test/integ/control/controller.py +++ b/test/integ/control/controller.py @@ -13,8 +13,6 @@ import threading import time import unittest
-from Queue import Queue - import stem.connection import stem.control import stem.descriptor.reader @@ -725,31 +723,19 @@ class TestController(unittest.TestCase): if test.runner.require_control(self): return elif test.runner.require_online(self): return
- circuit_id, circ_status_q = None, Queue() + circuit_id = None
def handle_streamcreated(stream): - if stream.status == "NEW": + if stream.status == "NEW" and circuit_id: controller.attach_stream(stream.id, circuit_id)
- def handle_circ(circuit): - circ_status_q.put(circuit) - with test.runner.get_runner().get_tor_controller() as controller: controller.set_conf("__LeaveStreamsUnattached", "1") - controller.add_event_listener(handle_circ, stem.control.EventType.CIRC) controller.add_event_listener(handle_streamcreated, stem.control.EventType.STREAM)
try: + circuit_id = controller.new_circuit(await_build = True) socksport = controller.get_socks_listeners()[0][1] - circ_status = "" - - while circ_status != "BUILT": - circuit_id = controller.new_circuit() - - while not circ_status in ("BUILT", "FAILED"): - circ_event = circ_status_q.get() - if circ_event.id == circuit_id: - circ_status = circ_event.status
ip = test.util.external_ip('127.0.0.1', socksport) exit_circuit = controller.get_circuit(circuit_id) @@ -758,7 +744,6 @@ class TestController(unittest.TestCase):
self.assertEquals(exit_ip, ip) finally: - controller.remove_event_listener(handle_circ) controller.remove_event_listener(handle_streamcreated) controller.reset_conf("__LeaveStreamsUnattached")