[tor-commits] [stem/master] Allow for blocking circuit build and extension

atagar at torproject.org atagar at torproject.org
Mon Dec 31 03:13:07 UTC 2012


commit f716fedc97ea3cc522571ad8bd505e847379c0d7
Author: Damian Johnson <atagar at torproject.org>
Date:   Sun Dec 30 18:59:11 2012 -0800

    Allow for blocking circuit build and extension
    
    Callers of new_circuit() and extend_circuit() will often (maybe usually?) want
    their call to block until the circuit is actually ready to be used.
    
    Unfortunately this is a little tricky for them to hack together on their own
    since it requires a listener (polling doesn't work reliably - after a couple of
    hours of hair pulling I now know that all too well). Hence doing this work for
    them.
    
    This includes a CircuitExtensionFailed exception so we can communicate the
    circuit event that reported the failure.
---
 stem/__init__.py                 |   12 +++++
 stem/control.py                  |  102 ++++++++++++++++++++++++++-----------
 test/integ/control/controller.py |   21 +-------
 3 files changed, 86 insertions(+), 49 deletions(-)

diff --git a/stem/__init__.py b/stem/__init__.py
index e17dc0f..2482299 100644
--- a/stem/__init__.py
+++ b/stem/__init__.py
@@ -9,6 +9,7 @@ Library for working with the tor process.
     |- ProtocolError - Malformed socket data.
     |- OperationFailed - Tor was unable to successfully complete the operation.
     |  |- UnsatisfiableRequest - Tor was unable to satisfy a valid request.
+    |  |  +- CircuitExtensionFailed - Attempt to make or extend a circuit failed.
     |  +- InvalidRequest - Invalid request.
     |     +- InvalidArguments - Invalid request parameters.
     +- SocketError - Communication with the socket failed.
@@ -421,6 +422,17 @@ class UnsatisfiableRequest(OperationFailed):
   Exception raised if Tor was unable to process our request.
   """
 
+class CircuitExtensionFailed(UnsatisfiableRequest):
+  """
+  An attempt to create or extend a circuit failed.
+  
+  :var stem.response.CircuitEvent circ: response notifying us of the failure
+  """
+  
+  def __init__(self, message, circ = None):
+    super(CircuitExtensionFailed, self).__init__(message = message)
+    self.circ = circ
+
 class InvalidRequest(OperationFailed):
   """
   Exception raised when the request was invalid or malformed.
diff --git a/stem/control.py b/stem/control.py
index efe1403..14d636e 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -140,6 +140,8 @@ import stem.util.connection
 import stem.util.enum
 import stem.version
 
+from stem import CircStatus
+
 from stem.util import log
 import stem.util.tor_tools
 
@@ -1537,20 +1539,23 @@ class Controller(BaseController):
       if default == UNDEFINED: raise exc
       else: return default
   
-  def new_circuit(self, path = None, purpose = "general"):
+  def new_circuit(self, path = None, purpose = "general", await_build = False):
     """
     Requests a new circuit. If the path isn't provided, one is automatically
     selected.
     
     :param list,str path: one or more relays to make a circuit through
     :param str purpose: "general" or "controller"
+    :param bool await_build: blocks until the circuit is built if **True**
     
     :returns: str of the circuit id of the newly created circuit
+    
+    :raises: :class:`stem.ControllerError` if the call fails
     """
     
-    return self.extend_circuit('0', path, purpose)
+    return self.extend_circuit('0', path, purpose, await_build)
   
-  def extend_circuit(self, circuit_id = "0", path = None, purpose = "general"):
+  def extend_circuit(self, circuit_id = "0", path = None, purpose = "general", await_build = False):
     """
     Either requests the creation of a new circuit or extends an existing one.
     
@@ -1561,7 +1566,7 @@ class Controller(BaseController):
     A python interpreter session used to create circuits could look like this...
     
     ::
-      
+    
       >>> control.extend_circuit('0', ["718BCEA286B531757ACAFF93AE04910EA73DE617", "30BAB8EE7606CBD12F3CC269AE976E0153E7A58D", "2765D8A8C4BBA3F89585A9FFE0E8575615880BEB"])
       19
       >>> control.extend_circuit('0')
@@ -1574,41 +1579,76 @@ class Controller(BaseController):
     :param list,str path: one or more relays to make a circuit through, this is
       required if the circuit id is non-zero
     :param str purpose: "general" or "controller"
+    :param bool await_build: blocks until the circuit is built if **True**
     
     :returns: str of the circuit id of the created or extended circuit
     
-    :raises: :class:`stem.InvalidRequest` if one of the parameters were invalid
+    :raises:
+      :class:`stem.InvalidRequest` if one of the parameters were invalid
+      :class:`stem.CircuitExtensionFailed` if we were waiting for the circuit
+        to build but it failed
+      :class:`stem.ControllerError` if the call fails
     """
     
-    # we might accidently get integer circuit ids
-    circuit_id = str(circuit_id)
-    
-    if path is None and circuit_id == '0':
-      path_opt_version = stem.version.Requirement.EXTENDCIRCUIT_PATH_OPTIONAL
-      
-      if not self.get_version().meets_requirements(path_opt_version):
-        raise stem.InvalidRequest(512, "EXTENDCIRCUIT requires the path prior to version %s" % path_opt_version)
+    # Attaches a temporary listener for CIRC events if we'll be waiting for it
+    # to build. This is icky, but we can't reliably do this via polling since
+    # we then can't get the failure if it can't be created.
     
-    args = [circuit_id]
-    if type(path) == str: path = [path]
-    if path: args.append(",".join(path))
-    if purpose: args.append("purpose=%s" % purpose)
+    circ_queue, circ_listener = None, None
     
-    response = self.msg("EXTENDCIRCUIT %s" % " ".join(args))
-    stem.response.convert("SINGLELINE", response)
-    
-    if response.is_ok():
-      try:
-        extended, new_circuit = response.message.split(" ")
-        assert extended == "EXTENDED"
-      except:
-        raise stem.ProtocolError("EXTENDCIRCUIT response invalid:\n%s", str(response))
-    elif response.code in ('512', '552'):
-      raise stem.InvalidRequest(response.code, response.message)
-    else:
-      raise stem.ProtocolError("EXTENDCIRCUIT returned unexpected response code: %s" % response.code)
+    if await_build:
+      circ_queue = Queue.Queue()
+      
+      def circ_listener(event):
+        circ_queue.put(event)
+      
+      self.add_event_listener(circ_listener, EventType.CIRC)
     
-    return new_circuit
+    try:
+      # we might accidently get integer circuit ids
+      circuit_id = str(circuit_id)
+      
+      if path is None and circuit_id == '0':
+        path_opt_version = stem.version.Requirement.EXTENDCIRCUIT_PATH_OPTIONAL
+        
+        if not self.get_version().meets_requirements(path_opt_version):
+          raise stem.InvalidRequest(512, "EXTENDCIRCUIT requires the path prior to version %s" % path_opt_version)
+      
+      args = [circuit_id]
+      if type(path) == str: path = [path]
+      if path: args.append(",".join(path))
+      if purpose: args.append("purpose=%s" % purpose)
+      
+      response = self.msg("EXTENDCIRCUIT %s" % " ".join(args))
+      stem.response.convert("SINGLELINE", response)
+      
+      if response.is_ok():
+        try:
+          extended, new_circuit = response.message.split(" ")
+          assert extended == "EXTENDED"
+        except:
+          raise stem.ProtocolError("EXTENDCIRCUIT response invalid:\n%s", str(response))
+      elif response.code in ('512', '552'):
+        raise stem.InvalidRequest(response.code, response.message)
+      else:
+        raise stem.ProtocolError("EXTENDCIRCUIT returned unexpected response code: %s" % response.code)
+      
+      if await_build:
+        while True:
+          circ = circ_queue.get()
+          
+          if circ.id == new_circuit:
+            if circ.status == CircStatus.BUILT:
+              break
+            elif circ.status == CircStatus.FAILED:
+              raise stem.CircuitExtensionFailed("Circuit failed to be created: %s" % circ.reason, circ)
+            elif circ.status == CircStatus.CLOSED:
+              raise stem.CircuitExtensionFailed("Circuit was closed prior to build", circ)
+      
+      return new_circuit
+    finally:
+      if circ_listener:
+        self.remove_event_listener(circ_listener)
   
   def repurpose_circuit(self, circuit_id, purpose):
     """
diff --git a/test/integ/control/controller.py b/test/integ/control/controller.py
index fb14120..c8055ab 100644
--- a/test/integ/control/controller.py
+++ b/test/integ/control/controller.py
@@ -13,8 +13,6 @@ import threading
 import time
 import unittest
 
-from Queue import Queue
-
 import stem.connection
 import stem.control
 import stem.descriptor.reader
@@ -725,31 +723,19 @@ class TestController(unittest.TestCase):
     if test.runner.require_control(self): return
     elif test.runner.require_online(self): return
     
-    circuit_id, circ_status_q = None, Queue()
+    circuit_id = None
     
     def handle_streamcreated(stream):
-      if stream.status == "NEW":
+      if stream.status == "NEW" and circuit_id:
         controller.attach_stream(stream.id, circuit_id)
     
-    def handle_circ(circuit):
-      circ_status_q.put(circuit)
-    
     with test.runner.get_runner().get_tor_controller() as controller:
       controller.set_conf("__LeaveStreamsUnattached", "1")
-      controller.add_event_listener(handle_circ, stem.control.EventType.CIRC)
       controller.add_event_listener(handle_streamcreated, stem.control.EventType.STREAM)
       
       try:
+        circuit_id = controller.new_circuit(await_build = True)
         socksport = controller.get_socks_listeners()[0][1]
-        circ_status = ""
-        
-        while circ_status != "BUILT":
-          circuit_id = controller.new_circuit()
-          
-          while not circ_status in ("BUILT", "FAILED"):
-            circ_event = circ_status_q.get()
-            if circ_event.id == circuit_id:
-              circ_status = circ_event.status
         
         ip = test.util.external_ip('127.0.0.1', socksport)
         exit_circuit = controller.get_circuit(circuit_id)
@@ -758,7 +744,6 @@ class TestController(unittest.TestCase):
         
         self.assertEquals(exit_ip, ip)
       finally:
-        controller.remove_event_listener(handle_circ)
         controller.remove_event_listener(handle_streamcreated)
         controller.reset_conf("__LeaveStreamsUnattached")
   





More information about the tor-commits mailing list