commit 8a065f42952c9dc9762eaebe4d3fa48210783998 Author: Damian Johnson atagar@torproject.org Date: Thu May 10 12:39:29 2018 -0700
Timeout support in controller methods that await something
Adding a timeout argument to all controller methods with an 'await' option.
https://trac.torproject.org/projects/tor/ticket/26056 --- docs/change_log.rst | 1 + stem/__init__.py | 26 +++++++++++++++-- stem/control.py | 65 +++++++++++++++++++++++++++++++++-------- test/unit/control/controller.py | 13 +++++++++ 4 files changed, 90 insertions(+), 15 deletions(-)
diff --git a/docs/change_log.rst b/docs/change_log.rst index 3836b8ad..720c636d 100644 --- a/docs/change_log.rst +++ b/docs/change_log.rst @@ -48,6 +48,7 @@ The following are only available within Stem's `git repository
* Documented v3 hidden service support (:trac:`25124`, :spec:`6bd0a69`) * Added support for limiting the maximum number of streams to :func:`~stem.control.Controller.create_ephemeral_hidden_service` (:spec:`2fcb1c2`) + * Added a timeout argument to :class:`~stem.control.Controller` methods that could await a response (:trac:`26056`) * Stacktrace if :func:`stem.connection.connect` had a string port argument * More reliable ExitPolicy resolution (:trac:`25739`) * Replaced socket's :func:`~stem.socket.ControlPort.get_address`, :func:`~stem.socket.ControlPort.get_port`, and :func:`~stem.socket.ControlSocketFile.get_socket_path` with attributes diff --git a/stem/__init__.py b/stem/__init__.py index 0104d3b2..e9ad8430 100644 --- a/stem/__init__.py +++ b/stem/__init__.py @@ -10,12 +10,17 @@ Library for working with the tor process.
ControllerError - Base exception raised when using the controller. |- ProtocolError - Malformed socket data. + | |- OperationFailed - Tor was unable to successfully complete the operation. | |- UnsatisfiableRequest - Tor was unable to satisfy a valid request. - | | +- CircuitExtensionFailed - Attempt to make or extend a circuit failed. - | |- DescriptorUnavailable - The given relay descriptor is unavailable. + | | |- CircuitExtensionFailed - Attempt to make or extend a circuit failed. + | | |- DescriptorUnavailable - The given relay descriptor is unavailable. + | | +- Timeout - Caller requested timeout was reached. + | | + | | | +- InvalidRequest - Invalid request. | +- InvalidArguments - Invalid request parameters. + | +- SocketError - Communication with the socket failed. +- SocketClosed - Socket has been shut down.
@@ -504,6 +509,7 @@ __all__ = [ 'UnsatisfiableRequest', 'CircuitExtensionFailed', 'DescriptorUnavailable', + 'Timeout', 'InvalidRequest', 'InvalidArguments', 'SocketError', @@ -582,15 +588,29 @@ class CircuitExtensionFailed(UnsatisfiableRequest): self.circ = circ
-class DescriptorUnavailable(OperationFailed): +class DescriptorUnavailable(UnsatisfiableRequest): """ Tor was unable to provide a descriptor for the given relay. + + .. versionchanged:: 1.7.0 + Subclassed under UnsatisfiableRequest rather than OperationFailed. """
def __init__(self, message): super(DescriptorUnavailable, self).__init__(message = message)
+class Timeout(UnsatisfiableRequest): + """ + Timeout requested by the caller was reached. + + .. versionadded:: 1.7.0 + """ + + def __init__(self, message): + super(Timeout, self).__init__(message = message) + + class InvalidRequest(OperationFailed): """ Exception raised when the request was invalid or malformed. diff --git a/stem/control.py b/stem/control.py index fd9aa8dc..244aed0b 100644 --- a/stem/control.py +++ b/stem/control.py @@ -1992,7 +1992,7 @@ class Controller(BaseController): yield desc
@with_default() - def get_hidden_service_descriptor(self, address, default = UNDEFINED, servers = None, await_result = True): + def get_hidden_service_descriptor(self, address, default = UNDEFINED, servers = None, await_result = True, timeout = None): """ get_hidden_service_descriptor(address, default = UNDEFINED, servers = None, await_result = True)
@@ -2007,9 +2007,13 @@ class Controller(BaseController):
.. versionadded:: 1.4.0
+ .. versionchanged:: 1.7.0 + Added the timeout argument. + :param str address: address of the hidden service descriptor, the '.onion' suffix is optional :param object default: response if the query fails :param list servers: requrest the descriptor from these specific servers + :param float timeout: seconds to wait when **await_result** is **True**
:returns: :class:`~stem.descriptor.hidden_service_descriptor.HiddenServiceDescriptor` for the given service if **await_result** is **True**, or **None** otherwise @@ -2017,6 +2021,7 @@ class Controller(BaseController): :raises: * :class:`stem.DescriptorUnavailable` if **await_result** is **True** and unable to provide a descriptor for the given service + * :class:`stem.Timeout` if **timeout** was reached * :class:`stem.ControllerError` if unable to query the descriptor * **ValueError** if **address** doesn't conform with the pattern of a hidden service address @@ -2035,6 +2040,7 @@ class Controller(BaseController):
hs_desc_queue, hs_desc_listener = queue.Queue(), None hs_desc_content_queue, hs_desc_content_listener = queue.Queue(), None + start_time = time.time()
if await_result: def hs_desc_listener(event): @@ -2062,7 +2068,7 @@ class Controller(BaseController): return None # not waiting, so nothing to provide back else: while True: - event = hs_desc_content_queue.get() + event = _get_with_timeout(hs_desc_content_queue, timeout, start_time)
if event.address == address: if event.descriptor: @@ -2071,7 +2077,7 @@ class Controller(BaseController): # no descriptor, looking through HS_DESC to figure out why
while True: - event = hs_desc_queue.get() + event = _get_with_timeout(hs_desc_queue, timeout, start_time)
if event.address == address and event.action == stem.HSDescAction.FAILED: if event.reason == stem.HSDescReason.NOT_FOUND: @@ -2823,7 +2829,7 @@ class Controller(BaseController):
return [r for r in result if r] # drop any empty responses (GETINFO is blank if unset)
- def create_ephemeral_hidden_service(self, ports, key_type = 'NEW', key_content = 'BEST', discard_key = False, detached = False, await_publication = False, basic_auth = None, max_streams = None): + def create_ephemeral_hidden_service(self, ports, key_type = 'NEW', key_content = 'BEST', discard_key = False, detached = False, await_publication = False, timeout = None, basic_auth = None, max_streams = None): """ Creates a new hidden service. Unlike :func:`~stem.control.Controller.create_hidden_service` this style of @@ -2899,7 +2905,7 @@ class Controller(BaseController): your torrc.
.. versionchanged:: 1.7.0 - Added the max_streams argument. + Added the timeout and max_streams arguments.
:param int,list,dict ports: hidden service port(s) or mapping of hidden service ports to their targets @@ -2913,19 +2919,23 @@ class Controller(BaseController): connection is closed if **True** :param bool await_publication: blocks until our descriptor is successfully published if **True** + :param float timeout: seconds to wait when **await_result** is **True** :param dict basic_auth: required user credentials to access this service :param int max_streams: maximum number of streams the hidden service will accept, unlimited if zero or not set
:returns: :class:`~stem.response.add_onion.AddOnionResponse` with the response
- :raises: :class:`stem.ControllerError` if the call fails + :raises: + * :class:`stem.ControllerError` if the call fails + * :class:`stem.Timeout` if **timeout** was reached """
if self.get_version() < stem.version.Requirement.ADD_ONION: raise stem.UnsatisfiableRequest(message = 'Ephemeral hidden services were added in tor version %s' % stem.version.Requirement.ADD_ONION)
hs_desc_queue, hs_desc_listener = queue.Queue(), None + start_time = time.time()
if await_publication: def hs_desc_listener(event): @@ -2997,7 +3007,7 @@ class Controller(BaseController):
try: while True: - event = hs_desc_queue.get() + event = _get_with_timeout(hs_desc_queue, timeout, start_time)
if event.action == stem.HSDescAction.UPLOAD and event.address == response.service_id: directories_uploaded_to.append(event.directory_fingerprint) @@ -3389,23 +3399,29 @@ class Controller(BaseController):
return circuits
- def new_circuit(self, path = None, purpose = 'general', await_build = False): + def new_circuit(self, path = None, purpose = 'general', await_build = False, timeout = None): """ Requests a new circuit. If the path isn't provided, one is automatically selected.
+ .. versionchanged:: 1.7.0 + Added the timeout argument. + :param list,str path: one or more relays to make a circuit through :param str purpose: 'general' or 'controller' :param bool await_build: blocks until the circuit is built if **True** + :param float timeout: seconds to wait when **await_build** is **True**
:returns: str of the circuit id of the newly created circuit
- :raises: :class:`stem.ControllerError` if the call fails + :raises: + * :class:`stem.ControllerError` if the call fails + * :class:`stem.Timeout` if **timeout** was reached """
- return self.extend_circuit('0', path, purpose, await_build) + return self.extend_circuit('0', path, purpose, await_build, timeout)
- def extend_circuit(self, circuit_id = '0', path = None, purpose = 'general', await_build = False): + def extend_circuit(self, circuit_id = '0', path = None, purpose = 'general', await_build = False, timeout = None): """ Either requests the creation of a new circuit or extends an existing one.
@@ -3425,11 +3441,15 @@ class Controller(BaseController): 20 EXTENDED $718BCEA286B531757ACAFF93AE04910EA73DE617=KsmoinOK,$649F2D0ACF418F7CFC6539AB2257EB2D5297BAFA=Eskimo BUILD_FLAGS=NEED_CAPACITY PURPOSE=GENERAL TIME_CREATED=2012-12-06T13:51:11.433755 19 BUILT $718BCEA286B531757ACAFF93AE04910EA73DE617=KsmoinOK,$30BAB8EE7606CBD12F3CC269AE976E0153E7A58D=Pascal1,$2765D8A8C4BBA3F89585A9FFE0E8575615880BEB=Anthracite PURPOSE=GENERAL TIME_CREATED=2012-12-06T13:50:56.969938
+ .. versionchanged:: 1.7.0 + Added the timeout argument. + :param str circuit_id: id of a circuit to be extended :param list,str path: one or more relays to make a circuit through, this is required if the circuit id is non-zero :param str purpose: 'general' or 'controller' :param bool await_build: blocks until the circuit is built if **True** + :param float timeout: seconds to wait when **await_build** is **True**
:returns: str of the circuit id of the created or extended circuit
@@ -3437,6 +3457,7 @@ class Controller(BaseController): * :class:`stem.InvalidRequest` if one of the parameters were invalid * :class:`stem.CircuitExtensionFailed` if we were waiting for the circuit to build but it failed + * :class:`stem.Timeout` if **timeout** was reached * :class:`stem.ControllerError` if the call fails """
@@ -3445,6 +3466,7 @@ class Controller(BaseController): # we then can't get the failure if it can't be created.
circ_queue, circ_listener = queue.Queue(), None + start_time = time.time()
if await_build: def circ_listener(event): @@ -3488,7 +3510,7 @@ class Controller(BaseController):
if await_build: while True: - circ = circ_queue.get() + circ = _get_with_timeout(circ_queue, timeout, start_time)
if circ.id == new_circuit: if circ.status == CircStatus.BUILT: @@ -3995,3 +4017,22 @@ def _case_insensitive_lookup(entries, key, default = UNDEFINED): return entry
raise ValueError("key '%s' doesn't exist in dict: %s" % (key, entries)) + + +def _get_with_timeout(event_queue, timeout, start_time): + """ + Pulls an item from a queue with a given timeout. + """ + + if timeout: + time_left = time.time() - start_time - timeout + + if time_left <= 0: + raise stem.Timeout('Reached our %0.1f second timeout' % timeout) + + try: + return event_queue.get(time_left) + except event_queue.Queue.Empty: + raise stem.Timeout('Reached our %0.1f second timeout' % timeout) + else: + return event_queue.get() diff --git a/test/unit/control/controller.py b/test/unit/control/controller.py index e7b014b9..d8e1cc9d 100644 --- a/test/unit/control/controller.py +++ b/test/unit/control/controller.py @@ -606,8 +606,21 @@ class TestControl(unittest.TestCase): self.controller.add_event_listener(Mock(), EventType.BW)
# EventType.SIGNAL was added in tor version 0.2.3.1-alpha + self.assertRaises(InvalidRequest, self.controller.add_event_listener, Mock(), EventType.SIGNAL)
+ @patch('stem.control.Controller.get_version', Mock(return_value = stem.version.Version('0.5.0.14'))) + @patch('stem.control.Controller.msg', Mock(return_value = ControlMessage.from_str('250 OK\r\n'))) + @patch('stem.control.Controller.add_event_listener', Mock()) + @patch('stem.control.Controller.remove_event_listener', Mock()) + def test_timeout(self): + """ + Methods that have an 'await' argument also have an optional timeout. Check + that we raise a Timeout exception when it's elapsed. + """ + + self.assertRaisesRegexp(stem.Timeout, 'Reached our 0.1 second timeout', self.controller.get_hidden_service_descriptor, '5g2upl4pq6kufc4m', await_result = True, timeout = 0.1) + def test_get_streams(self): """ Exercises the get_streams() method.
tor-commits@lists.torproject.org