commit d5162f4e369f5d2226f0e5262f2b63025f9a68ec Author: Damian Johnson atagar@torproject.org Date: Sat Feb 18 18:15:07 2012 -0800
Fixing deadlock in BaseController
Found two concurrency bugs which were causing deadlock issues, and adding a test that's more likely to trigger connect() and close() concurrency issues.
The issues were... * The recv() method calls close if the socket is still flagged as being alive. Unfortunately this can cause deadlock if the closing thread joins on the recv thread.
* For some reason using a Condition rather than an Event caused the event loop to sometimes miss the notice that caused the event thread to close, causing its join() call to get stuck. --- stem/control.py | 15 +++++---------- stem/socket.py | 31 ++++++++++++++++++++++--------- test/integ/control/base_controller.py | 25 +++++++++++++------------ 3 files changed, 40 insertions(+), 31 deletions(-)
diff --git a/stem/control.py b/stem/control.py index d5596f6..55c6e7a 100644 --- a/stem/control.py +++ b/stem/control.py @@ -99,7 +99,7 @@ class BaseController: self._reader_thread = None
# thread to pull from the _event_queue and call handle_event - self._event_cond = threading.Condition() + self._event_notice = threading.Event() self._event_thread = None
# saves our socket's prior _connect() and _close() methods so they can be @@ -291,9 +291,7 @@ class BaseController: # awake from recv() raising a closure exception. Wake up the event thread # too so it can end.
- self._event_cond.acquire() - self._event_cond.notifyAll() - self._event_cond.release() + self._event_notice.set()
# joins on our threads if it's safe to do so
@@ -379,10 +377,8 @@ class BaseController:
if control_message.content()[-1][0] == "650": # asynchronous message, adds to the event queue and wakes up its handler - self._event_cond.acquire() self._event_queue.put(control_message) - self._event_cond.notifyAll() - self._event_cond.release() + self._event_notice.set() else: # response to a msg() call self._reply_queue.put(control_message) @@ -407,7 +403,6 @@ class BaseController: event_message = self._event_queue.get_nowait() self._handle_event(event_message) except Queue.Empty: - self._event_cond.acquire() - self._event_cond.wait() - self._event_cond.release() + self._event_notice.wait() + self._event_notice.clear()
diff --git a/stem/socket.py b/stem/socket.py index 166c348..afea436 100644 --- a/stem/socket.py +++ b/stem/socket.py @@ -89,6 +89,9 @@ class ControlSocket: self._socket, self._socket_file = None, None self._is_alive = False
+ # indicates that we're in the midst of calling close() + self._handling_close = False + # Tracks sending and receiving separately. This should be safe, and doing # so prevents deadlock where we block writes because we're waiting to read # a message that isn't coming. @@ -135,15 +138,22 @@ class ControlSocket: complete message """
- try: - with self._recv_lock: - if not self.is_alive(): raise SocketClosed() - return recv_message(self._socket_file) - except SocketClosed, exc: - # if recv_message raises a SocketClosed then we should properly shut - # everything down - if self.is_alive(): self.close() - raise exc + with self._recv_lock: + try: + # makes a temporary reference to the _socket_file because connect() + # and close() may set or unset it + + socket_file = self._socket_file + + if not socket_file: raise SocketClosed() + return recv_message(socket_file) + except SocketClosed, exc: + # If recv_message raises a SocketClosed then we should properly shut + # everything down. However, if this was caused *from* a close call + # and it's joining on our thread then this would risk deadlock. + + if self.is_alive() and not self._handling_close: self.close() + raise exc
def is_alive(self): """ @@ -197,6 +207,7 @@ class ControlSocket: # is causing our is_alive() state to change.
is_change = self.is_alive() + self._handling_close = True
if self._socket: # if we haven't yet established a connection then this raises an error @@ -223,6 +234,8 @@ class ControlSocket:
if is_change: self._close() + + self._handling_close = False
def _get_send_lock(self): """ diff --git a/test/integ/control/base_controller.py b/test/integ/control/base_controller.py index 2648496..4c6260d 100644 --- a/test/integ/control/base_controller.py +++ b/test/integ/control/base_controller.py @@ -38,8 +38,6 @@ class TestBaseController(unittest.TestCase): Basic sanity check for the from_port constructor. """
- self.skipTest("work in progress") - if test.runner.Torrc.PORT in test.runner.get_runner().get_options(): controller = stem.control.BaseController.from_port(control_port = test.runner.CONTROL_PORT) self.assertTrue(isinstance(controller, stem.control.BaseController)) @@ -51,21 +49,30 @@ class TestBaseController(unittest.TestCase): Basic sanity check for the from_socket_file constructor. """
- self.skipTest("work in progress") - if test.runner.Torrc.SOCKET in test.runner.get_runner().get_options(): controller = stem.control.BaseController.from_socket_file(test.runner.CONTROL_SOCKET_PATH) self.assertTrue(isinstance(controller, stem.control.BaseController)) else: self.assertRaises(stem.socket.SocketError, stem.control.BaseController.from_socket_file, test.runner.CONTROL_SOCKET_PATH)
+ def test_connect_repeatedly(self): + """ + Connects and closes the socket repeatedly. This is a simple attempt to + trigger concurrency issues. + """ + + with test.runner.get_runner().get_tor_socket() as control_socket: + controller = stem.control.BaseController(control_socket) + + for i in xrange(250): + controller.connect() + controller.close() + def test_msg(self): """ Tests a basic query with the msg() method. """
- self.skipTest("work in progress") - runner = test.runner.get_runner() with runner.get_tor_socket() as control_socket: controller = stem.control.BaseController(control_socket) @@ -79,8 +86,6 @@ class TestBaseController(unittest.TestCase): Tests the msg() method against an invalid controller command. """
- self.skipTest("work in progress") - with test.runner.get_runner().get_tor_socket() as control_socket: controller = stem.control.BaseController(control_socket) response = controller.msg("invalid") @@ -91,8 +96,6 @@ class TestBaseController(unittest.TestCase): Tests the msg() method against a non-existant GETINFO option. """
- self.skipTest("work in progress") - with test.runner.get_runner().get_tor_socket() as control_socket: controller = stem.control.BaseController(control_socket) response = controller.msg("GETINFO blarg") @@ -104,8 +107,6 @@ class TestBaseController(unittest.TestCase): remove_status_listener() methods. """
- self.skipTest("work in progress") - state_observer = StateObserver()
with test.runner.get_runner().get_tor_socket(False) as control_socket: