commit f716fedc97ea3cc522571ad8bd505e847379c0d7
Author: Damian Johnson <atagar(a)torproject.org>
Date: Sun Dec 30 18:59:11 2012 -0800
Allow for blocking circuit build and extension
Callers of new_circuit() and extend_circuit() will often (maybe usually?) want
their call to block until the circuit is actually ready to be used.
Unfortunately this is a little tricky for them to hack together on their own
since it requires a listener (polling doesn't work reliably - after a couple of
hours of hair pulling I now know that all too well). Hence doing this work for
them.
This includes a CircuitExtensionFailed exception so we can communicate the
circuit event that reported the failure.
---
stem/__init__.py | 12 +++++
stem/control.py | 102 ++++++++++++++++++++++++++-----------
test/integ/control/controller.py | 21 +-------
3 files changed, 86 insertions(+), 49 deletions(-)
diff --git a/stem/__init__.py b/stem/__init__.py
index e17dc0f..2482299 100644
--- a/stem/__init__.py
+++ b/stem/__init__.py
@@ -9,6 +9,7 @@ Library for working with the tor process.
|- ProtocolError - Malformed socket data.
|- OperationFailed - Tor was unable to successfully complete the operation.
| |- UnsatisfiableRequest - Tor was unable to satisfy a valid request.
+ | | +- CircuitExtensionFailed - Attempt to make or extend a circuit failed.
| +- InvalidRequest - Invalid request.
| +- InvalidArguments - Invalid request parameters.
+- SocketError - Communication with the socket failed.
@@ -421,6 +422,17 @@ class UnsatisfiableRequest(OperationFailed):
Exception raised if Tor was unable to process our request.
"""
+class CircuitExtensionFailed(UnsatisfiableRequest):
+ """
+ An attempt to create or extend a circuit failed.
+
+ :var stem.response.CircuitEvent circ: response notifying us of the failure
+ """
+
+ def __init__(self, message, circ = None):
+ super(CircuitExtensionFailed, self).__init__(message = message)
+ self.circ = circ
+
class InvalidRequest(OperationFailed):
"""
Exception raised when the request was invalid or malformed.
diff --git a/stem/control.py b/stem/control.py
index efe1403..14d636e 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -140,6 +140,8 @@ import stem.util.connection
import stem.util.enum
import stem.version
+from stem import CircStatus
+
from stem.util import log
import stem.util.tor_tools
@@ -1537,20 +1539,23 @@ class Controller(BaseController):
if default == UNDEFINED: raise exc
else: return default
- def new_circuit(self, path = None, purpose = "general"):
+ def new_circuit(self, path = None, purpose = "general", await_build = False):
"""
Requests a new circuit. If the path isn't provided, one is automatically
selected.
:param list,str path: one or more relays to make a circuit through
:param str purpose: "general" or "controller"
+ :param bool await_build: blocks until the circuit is built if **True**
:returns: str of the circuit id of the newly created circuit
+
+ :raises: :class:`stem.ControllerError` if the call fails
"""
- return self.extend_circuit('0', path, purpose)
+ return self.extend_circuit('0', path, purpose, await_build)
- def extend_circuit(self, circuit_id = "0", path = None, purpose = "general"):
+ def extend_circuit(self, circuit_id = "0", path = None, purpose = "general", await_build = False):
"""
Either requests the creation of a new circuit or extends an existing one.
@@ -1561,7 +1566,7 @@ class Controller(BaseController):
A python interpreter session used to create circuits could look like this...
::
-
+
>>> control.extend_circuit('0', ["718BCEA286B531757ACAFF93AE04910EA73DE617", "30BAB8EE7606CBD12F3CC269AE976E0153E7A58D", "2765D8A8C4BBA3F89585A9FFE0E8575615880BEB"])
19
>>> control.extend_circuit('0')
@@ -1574,41 +1579,76 @@ class Controller(BaseController):
:param list,str path: one or more relays to make a circuit through, this is
required if the circuit id is non-zero
:param str purpose: "general" or "controller"
+ :param bool await_build: blocks until the circuit is built if **True**
:returns: str of the circuit id of the created or extended circuit
- :raises: :class:`stem.InvalidRequest` if one of the parameters were invalid
+ :raises:
+ :class:`stem.InvalidRequest` if one of the parameters were invalid
+ :class:`stem.CircuitExtensionFailed` if we were waiting for the circuit
+ to build but it failed
+ :class:`stem.ControllerError` if the call fails
"""
- # we might accidently get integer circuit ids
- circuit_id = str(circuit_id)
-
- if path is None and circuit_id == '0':
- path_opt_version = stem.version.Requirement.EXTENDCIRCUIT_PATH_OPTIONAL
-
- if not self.get_version().meets_requirements(path_opt_version):
- raise stem.InvalidRequest(512, "EXTENDCIRCUIT requires the path prior to version %s" % path_opt_version)
+ # Attaches a temporary listener for CIRC events if we'll be waiting for it
+ # to build. This is icky, but we can't reliably do this via polling since
+ # we then can't get the failure if it can't be created.
- args = [circuit_id]
- if type(path) == str: path = [path]
- if path: args.append(",".join(path))
- if purpose: args.append("purpose=%s" % purpose)
+ circ_queue, circ_listener = None, None
- response = self.msg("EXTENDCIRCUIT %s" % " ".join(args))
- stem.response.convert("SINGLELINE", response)
-
- if response.is_ok():
- try:
- extended, new_circuit = response.message.split(" ")
- assert extended == "EXTENDED"
- except:
- raise stem.ProtocolError("EXTENDCIRCUIT response invalid:\n%s", str(response))
- elif response.code in ('512', '552'):
- raise stem.InvalidRequest(response.code, response.message)
- else:
- raise stem.ProtocolError("EXTENDCIRCUIT returned unexpected response code: %s" % response.code)
+ if await_build:
+ circ_queue = Queue.Queue()
+
+ def circ_listener(event):
+ circ_queue.put(event)
+
+ self.add_event_listener(circ_listener, EventType.CIRC)
- return new_circuit
+ try:
+ # we might accidently get integer circuit ids
+ circuit_id = str(circuit_id)
+
+ if path is None and circuit_id == '0':
+ path_opt_version = stem.version.Requirement.EXTENDCIRCUIT_PATH_OPTIONAL
+
+ if not self.get_version().meets_requirements(path_opt_version):
+ raise stem.InvalidRequest(512, "EXTENDCIRCUIT requires the path prior to version %s" % path_opt_version)
+
+ args = [circuit_id]
+ if type(path) == str: path = [path]
+ if path: args.append(",".join(path))
+ if purpose: args.append("purpose=%s" % purpose)
+
+ response = self.msg("EXTENDCIRCUIT %s" % " ".join(args))
+ stem.response.convert("SINGLELINE", response)
+
+ if response.is_ok():
+ try:
+ extended, new_circuit = response.message.split(" ")
+ assert extended == "EXTENDED"
+ except:
+ raise stem.ProtocolError("EXTENDCIRCUIT response invalid:\n%s", str(response))
+ elif response.code in ('512', '552'):
+ raise stem.InvalidRequest(response.code, response.message)
+ else:
+ raise stem.ProtocolError("EXTENDCIRCUIT returned unexpected response code: %s" % response.code)
+
+ if await_build:
+ while True:
+ circ = circ_queue.get()
+
+ if circ.id == new_circuit:
+ if circ.status == CircStatus.BUILT:
+ break
+ elif circ.status == CircStatus.FAILED:
+ raise stem.CircuitExtensionFailed("Circuit failed to be created: %s" % circ.reason, circ)
+ elif circ.status == CircStatus.CLOSED:
+ raise stem.CircuitExtensionFailed("Circuit was closed prior to build", circ)
+
+ return new_circuit
+ finally:
+ if circ_listener:
+ self.remove_event_listener(circ_listener)
def repurpose_circuit(self, circuit_id, purpose):
"""
diff --git a/test/integ/control/controller.py b/test/integ/control/controller.py
index fb14120..c8055ab 100644
--- a/test/integ/control/controller.py
+++ b/test/integ/control/controller.py
@@ -13,8 +13,6 @@ import threading
import time
import unittest
-from Queue import Queue
-
import stem.connection
import stem.control
import stem.descriptor.reader
@@ -725,31 +723,19 @@ class TestController(unittest.TestCase):
if test.runner.require_control(self): return
elif test.runner.require_online(self): return
- circuit_id, circ_status_q = None, Queue()
+ circuit_id = None
def handle_streamcreated(stream):
- if stream.status == "NEW":
+ if stream.status == "NEW" and circuit_id:
controller.attach_stream(stream.id, circuit_id)
- def handle_circ(circuit):
- circ_status_q.put(circuit)
-
with test.runner.get_runner().get_tor_controller() as controller:
controller.set_conf("__LeaveStreamsUnattached", "1")
- controller.add_event_listener(handle_circ, stem.control.EventType.CIRC)
controller.add_event_listener(handle_streamcreated, stem.control.EventType.STREAM)
try:
+ circuit_id = controller.new_circuit(await_build = True)
socksport = controller.get_socks_listeners()[0][1]
- circ_status = ""
-
- while circ_status != "BUILT":
- circuit_id = controller.new_circuit()
-
- while not circ_status in ("BUILT", "FAILED"):
- circ_event = circ_status_q.get()
- if circ_event.id == circuit_id:
- circ_status = circ_event.status
ip = test.util.external_ip('127.0.0.1', socksport)
exit_circuit = controller.get_circuit(circuit_id)
@@ -758,7 +744,6 @@ class TestController(unittest.TestCase):
self.assertEquals(exit_ip, ip)
finally:
- controller.remove_event_listener(handle_circ)
controller.remove_event_listener(handle_streamcreated)
controller.reset_conf("__LeaveStreamsUnattached")