[tor-commits] [stem/master] Timeout support in controller methods that await something

atagar at torproject.org atagar at torproject.org
Thu May 10 19:41:52 UTC 2018


commit 8a065f42952c9dc9762eaebe4d3fa48210783998
Author: Damian Johnson <atagar at torproject.org>
Date:   Thu May 10 12:39:29 2018 -0700

    Timeout support in controller methods that await something
    
    Adding a timeout argument to all controller methods with an 'await' option.
    
      https://trac.torproject.org/projects/tor/ticket/26056
---
 docs/change_log.rst             |  1 +
 stem/__init__.py                | 26 +++++++++++++++--
 stem/control.py                 | 65 +++++++++++++++++++++++++++++++++--------
 test/unit/control/controller.py | 13 +++++++++
 4 files changed, 90 insertions(+), 15 deletions(-)

diff --git a/docs/change_log.rst b/docs/change_log.rst
index 3836b8ad..720c636d 100644
--- a/docs/change_log.rst
+++ b/docs/change_log.rst
@@ -48,6 +48,7 @@ The following are only available within Stem's `git repository
 
   * Documented v3 hidden service support (:trac:`25124`, :spec:`6bd0a69`)
   * Added support for limiting the maximum number of streams to :func:`~stem.control.Controller.create_ephemeral_hidden_service` (:spec:`2fcb1c2`)
+  * Added a timeout argument to :class:`~stem.control.Controller` methods that could await a response (:trac:`26056`)
   * Stacktrace if :func:`stem.connection.connect` had a string port argument
   * More reliable ExitPolicy resolution (:trac:`25739`)
   * Replaced socket's :func:`~stem.socket.ControlPort.get_address`, :func:`~stem.socket.ControlPort.get_port`, and :func:`~stem.socket.ControlSocketFile.get_socket_path` with attributes
diff --git a/stem/__init__.py b/stem/__init__.py
index 0104d3b2..e9ad8430 100644
--- a/stem/__init__.py
+++ b/stem/__init__.py
@@ -10,12 +10,17 @@ Library for working with the tor process.
 
   ControllerError - Base exception raised when using the controller.
     |- ProtocolError - Malformed socket data.
+    |
     |- OperationFailed - Tor was unable to successfully complete the operation.
     |  |- UnsatisfiableRequest - Tor was unable to satisfy a valid request.
-    |  |  +- CircuitExtensionFailed - Attempt to make or extend a circuit failed.
-    |  |- DescriptorUnavailable - The given relay descriptor is unavailable.
+    |  |  |- CircuitExtensionFailed - Attempt to make or extend a circuit failed.
+    |  |  |- DescriptorUnavailable - The given relay descriptor is unavailable.
+    |  |  +- Timeout - Caller requested timeout was reached.
+    |  |
+    |  |
     |  +- InvalidRequest - Invalid request.
     |     +- InvalidArguments - Invalid request parameters.
+    |
     +- SocketError - Communication with the socket failed.
        +- SocketClosed - Socket has been shut down.
 
@@ -504,6 +509,7 @@ __all__ = [
   'UnsatisfiableRequest',
   'CircuitExtensionFailed',
   'DescriptorUnavailable',
+  'Timeout',
   'InvalidRequest',
   'InvalidArguments',
   'SocketError',
@@ -582,15 +588,29 @@ class CircuitExtensionFailed(UnsatisfiableRequest):
     self.circ = circ
 
 
-class DescriptorUnavailable(OperationFailed):
+class DescriptorUnavailable(UnsatisfiableRequest):
   """
   Tor was unable to provide a descriptor for the given relay.
+
+  .. versionchanged:: 1.7.0
+     Subclassed under UnsatisfiableRequest rather than OperationFailed.
   """
 
   def __init__(self, message):
     super(DescriptorUnavailable, self).__init__(message = message)
 
 
+class Timeout(UnsatisfiableRequest):
+  """
+  Timeout requested by the caller was reached.
+
+  .. versionadded:: 1.7.0
+  """
+
+  def __init__(self, message):
+    super(Timeout, self).__init__(message = message)
+
+
 class InvalidRequest(OperationFailed):
   """
   Exception raised when the request was invalid or malformed.
diff --git a/stem/control.py b/stem/control.py
index fd9aa8dc..244aed0b 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -1992,7 +1992,7 @@ class Controller(BaseController):
       yield desc
 
   @with_default()
-  def get_hidden_service_descriptor(self, address, default = UNDEFINED, servers = None, await_result = True):
+  def get_hidden_service_descriptor(self, address, default = UNDEFINED, servers = None, await_result = True, timeout = None):
     """
     get_hidden_service_descriptor(address, default = UNDEFINED, servers = None, await_result = True)
 
@@ -2007,9 +2007,13 @@ class Controller(BaseController):
 
     .. versionadded:: 1.4.0
 
+    .. versionchanged:: 1.7.0
+       Added the timeout argument.
+
     :param str address: address of the hidden service descriptor, the '.onion' suffix is optional
     :param object default: response if the query fails
     :param list servers: requrest the descriptor from these specific servers
+    :param float timeout: seconds to wait when **await_result** is **True**
 
     :returns: :class:`~stem.descriptor.hidden_service_descriptor.HiddenServiceDescriptor`
       for the given service if **await_result** is **True**, or **None** otherwise
@@ -2017,6 +2021,7 @@ class Controller(BaseController):
     :raises:
       * :class:`stem.DescriptorUnavailable` if **await_result** is **True** and
         unable to provide a descriptor for the given service
+      * :class:`stem.Timeout` if **timeout** was reached
       * :class:`stem.ControllerError` if unable to query the descriptor
       * **ValueError** if **address** doesn't conform with the pattern of a
         hidden service address
@@ -2035,6 +2040,7 @@ class Controller(BaseController):
 
     hs_desc_queue, hs_desc_listener = queue.Queue(), None
     hs_desc_content_queue, hs_desc_content_listener = queue.Queue(), None
+    start_time = time.time()
 
     if await_result:
       def hs_desc_listener(event):
@@ -2062,7 +2068,7 @@ class Controller(BaseController):
         return None  # not waiting, so nothing to provide back
       else:
         while True:
-          event = hs_desc_content_queue.get()
+          event = _get_with_timeout(hs_desc_content_queue, timeout, start_time)
 
           if event.address == address:
             if event.descriptor:
@@ -2071,7 +2077,7 @@ class Controller(BaseController):
               # no descriptor, looking through HS_DESC to figure out why
 
               while True:
-                event = hs_desc_queue.get()
+                event = _get_with_timeout(hs_desc_queue, timeout, start_time)
 
                 if event.address == address and event.action == stem.HSDescAction.FAILED:
                   if event.reason == stem.HSDescReason.NOT_FOUND:
@@ -2823,7 +2829,7 @@ class Controller(BaseController):
 
     return [r for r in result if r]  # drop any empty responses (GETINFO is blank if unset)
 
-  def create_ephemeral_hidden_service(self, ports, key_type = 'NEW', key_content = 'BEST', discard_key = False, detached = False, await_publication = False, basic_auth = None, max_streams = None):
+  def create_ephemeral_hidden_service(self, ports, key_type = 'NEW', key_content = 'BEST', discard_key = False, detached = False, await_publication = False, timeout = None, basic_auth = None, max_streams = None):
     """
     Creates a new hidden service. Unlike
     :func:`~stem.control.Controller.create_hidden_service` this style of
@@ -2899,7 +2905,7 @@ class Controller(BaseController):
        your torrc.
 
     .. versionchanged:: 1.7.0
-       Added the max_streams argument.
+       Added the timeout and max_streams arguments.
 
     :param int,list,dict ports: hidden service port(s) or mapping of hidden
       service ports to their targets
@@ -2913,19 +2919,23 @@ class Controller(BaseController):
       connection is closed if **True**
     :param bool await_publication: blocks until our descriptor is successfully
       published if **True**
+    :param float timeout: seconds to wait when **await_result** is **True**
     :param dict basic_auth: required user credentials to access this service
     :param int max_streams: maximum number of streams the hidden service will
       accept, unlimited if zero or not set
 
     :returns: :class:`~stem.response.add_onion.AddOnionResponse` with the response
 
-    :raises: :class:`stem.ControllerError` if the call fails
+    :raises:
+      * :class:`stem.ControllerError` if the call fails
+      * :class:`stem.Timeout` if **timeout** was reached
     """
 
     if self.get_version() < stem.version.Requirement.ADD_ONION:
       raise stem.UnsatisfiableRequest(message = 'Ephemeral hidden services were added in tor version %s' % stem.version.Requirement.ADD_ONION)
 
     hs_desc_queue, hs_desc_listener = queue.Queue(), None
+    start_time = time.time()
 
     if await_publication:
       def hs_desc_listener(event):
@@ -2997,7 +3007,7 @@ class Controller(BaseController):
 
       try:
         while True:
-          event = hs_desc_queue.get()
+          event = _get_with_timeout(hs_desc_queue, timeout, start_time)
 
           if event.action == stem.HSDescAction.UPLOAD and event.address == response.service_id:
             directories_uploaded_to.append(event.directory_fingerprint)
@@ -3389,23 +3399,29 @@ class Controller(BaseController):
 
     return circuits
 
-  def new_circuit(self, path = None, purpose = 'general', await_build = False):
+  def new_circuit(self, path = None, purpose = 'general', await_build = False, timeout = None):
     """
     Requests a new circuit. If the path isn't provided, one is automatically
     selected.
 
+    .. versionchanged:: 1.7.0
+       Added the timeout argument.
+
     :param list,str path: one or more relays to make a circuit through
     :param str purpose: 'general' or 'controller'
     :param bool await_build: blocks until the circuit is built if **True**
+    :param float timeout: seconds to wait when **await_build** is **True**
 
     :returns: str of the circuit id of the newly created circuit
 
-    :raises: :class:`stem.ControllerError` if the call fails
+    :raises:
+      * :class:`stem.ControllerError` if the call fails
+      * :class:`stem.Timeout` if **timeout** was reached
     """
 
-    return self.extend_circuit('0', path, purpose, await_build)
+    return self.extend_circuit('0', path, purpose, await_build, timeout)
 
-  def extend_circuit(self, circuit_id = '0', path = None, purpose = 'general', await_build = False):
+  def extend_circuit(self, circuit_id = '0', path = None, purpose = 'general', await_build = False, timeout = None):
     """
     Either requests the creation of a new circuit or extends an existing one.
 
@@ -3425,11 +3441,15 @@ class Controller(BaseController):
       20 EXTENDED $718BCEA286B531757ACAFF93AE04910EA73DE617=KsmoinOK,$649F2D0ACF418F7CFC6539AB2257EB2D5297BAFA=Eskimo BUILD_FLAGS=NEED_CAPACITY PURPOSE=GENERAL TIME_CREATED=2012-12-06T13:51:11.433755
       19 BUILT $718BCEA286B531757ACAFF93AE04910EA73DE617=KsmoinOK,$30BAB8EE7606CBD12F3CC269AE976E0153E7A58D=Pascal1,$2765D8A8C4BBA3F89585A9FFE0E8575615880BEB=Anthracite PURPOSE=GENERAL TIME_CREATED=2012-12-06T13:50:56.969938
 
+    .. versionchanged:: 1.7.0
+       Added the timeout argument.
+
     :param str circuit_id: id of a circuit to be extended
     :param list,str path: one or more relays to make a circuit through, this is
       required if the circuit id is non-zero
     :param str purpose: 'general' or 'controller'
     :param bool await_build: blocks until the circuit is built if **True**
+    :param float timeout: seconds to wait when **await_build** is **True**
 
     :returns: str of the circuit id of the created or extended circuit
 
@@ -3437,6 +3457,7 @@ class Controller(BaseController):
       * :class:`stem.InvalidRequest` if one of the parameters were invalid
       * :class:`stem.CircuitExtensionFailed` if we were waiting for the circuit
         to build but it failed
+      * :class:`stem.Timeout` if **timeout** was reached
       * :class:`stem.ControllerError` if the call fails
     """
 
@@ -3445,6 +3466,7 @@ class Controller(BaseController):
     # we then can't get the failure if it can't be created.
 
     circ_queue, circ_listener = queue.Queue(), None
+    start_time = time.time()
 
     if await_build:
       def circ_listener(event):
@@ -3488,7 +3510,7 @@ class Controller(BaseController):
 
       if await_build:
         while True:
-          circ = circ_queue.get()
+          circ = _get_with_timeout(circ_queue, timeout, start_time)
 
           if circ.id == new_circuit:
             if circ.status == CircStatus.BUILT:
@@ -3995,3 +4017,22 @@ def _case_insensitive_lookup(entries, key, default = UNDEFINED):
           return entry
 
   raise ValueError("key '%s' doesn't exist in dict: %s" % (key, entries))
+
+
+def _get_with_timeout(event_queue, timeout, start_time):
+  """
+  Pulls an item from a queue with a given timeout.
+  """
+
+  if timeout:
+    time_left = time.time() - start_time - timeout
+
+    if time_left <= 0:
+      raise stem.Timeout('Reached our %0.1f second timeout' % timeout)
+
+    try:
+      return event_queue.get(time_left)
+    except event_queue.Queue.Empty:
+      raise stem.Timeout('Reached our %0.1f second timeout' % timeout)
+  else:
+    return event_queue.get()
diff --git a/test/unit/control/controller.py b/test/unit/control/controller.py
index e7b014b9..d8e1cc9d 100644
--- a/test/unit/control/controller.py
+++ b/test/unit/control/controller.py
@@ -606,8 +606,21 @@ class TestControl(unittest.TestCase):
     self.controller.add_event_listener(Mock(), EventType.BW)
 
     # EventType.SIGNAL was added in tor version 0.2.3.1-alpha
+
     self.assertRaises(InvalidRequest, self.controller.add_event_listener, Mock(), EventType.SIGNAL)
 
+  @patch('stem.control.Controller.get_version', Mock(return_value = stem.version.Version('0.5.0.14')))
+  @patch('stem.control.Controller.msg', Mock(return_value = ControlMessage.from_str('250 OK\r\n')))
+  @patch('stem.control.Controller.add_event_listener', Mock())
+  @patch('stem.control.Controller.remove_event_listener', Mock())
+  def test_timeout(self):
+    """
+    Methods that have an 'await' argument also have an optional timeout. Check
+    that we raise a Timeout exception when it's elapsed.
+    """
+
+    self.assertRaisesRegexp(stem.Timeout, 'Reached our 0.1 second timeout', self.controller.get_hidden_service_descriptor, '5g2upl4pq6kufc4m', await_result = True, timeout = 0.1)
+
   def test_get_streams(self):
     """
     Exercises the get_streams() method.



More information about the tor-commits mailing list