commit 4ff7efe297f2a076e1b4dc06a3686fdcbf260f8a Author: Damian Johnson atagar@torproject.org Date: Mon Feb 13 08:59:11 2012 -0800
Separating BaseController into standalone class
The BaseController was previously a ControlSocket subclass because it was easier to use when several of its methods were accessable. However, from an implementation perspective the BaseController was a wrapper class, not a proper subclass.
After experimenting a bit more I realized that I don't want the BaseController to provide *all* of the ControlSocket methods. In particular, it doesn't make sense for callers to use the send() and recv() when there will be a msg() method similar to TorCtl's sendAndRecv(). The wrapper/subclass mix was also just plain old confusing as an object-oriented design.
I'm moving the notifications up to the controller so the ControlSocket is very similar to how it was a couple weeks ago. Also dropping the passthrough integ tests since most of them will break without send/recv - I'll need to add some more targeted tests for passthrough methods later. --- stem/control.py | 172 +++++++++++++++++++++++++++++---- stem/socket.py | 146 +++------------------------- test/integ/control/base_controller.py | 94 ++++++++++++++---- test/integ/socket/control_socket.py | 78 +--------------- 4 files changed, 242 insertions(+), 248 deletions(-)
diff --git a/stem/control.py b/stem/control.py index ba18361..9dfc56d 100644 --- a/stem/control.py +++ b/stem/control.py @@ -1,19 +1,36 @@ """ Classes for interacting with the tor control socket.
-Controllers are a wrapper around a ControlSocket, retaining its low-level -connection methods (send, recv, is_alive, etc) in addition to providing methods -for interacting at a higher level. +Controllers are a wrapper around a ControlSocket, retaining many of its methods +(send, recv, is_alive, etc) in addition to providing its own for interacting at +a higher level.
from_port - Provides a Controller based on a port connection. from_socket_file - Provides a Controller based on a socket file connection.
-BaseController - ControlSocket subclass providing asynchronous message handling. +BaseController - Base controller class asynchronous message handling. + |- is_alive - reports if the socket is known to be closed + |- connect - connects a new socket + |- close - shuts down the socket + |- get_socket - provides socket providing base control communication + |- add_status_listener - notifies a callback of changes in the socket status + +- remove_status_listener - prevents further notification of status changes """
+import time +import thread +import threading + import stem.socket
-class BaseController(stem.socket.ControlSocket): +# state changes a control socket can have +# INIT - new control connection +# RESET - received a reset/sighup signal +# CLOSED - control connection closed + +State = stem.util.enum.Enum("INIT", "RESET", "CLOSED") + +class BaseController: """ Controller for the tor process. This is a minimal base class for other controllers, providing basic process communication and event listing. Don't @@ -61,40 +78,159 @@ class BaseController(stem.socket.ControlSocket):
control_socket = stem.socket.ControlSocketFile(socket_path) return BaseController(control_socket) - + from_port = staticmethod(from_port) from_socket_file = staticmethod(from_socket_file)
def __init__(self, control_socket): self._socket = control_socket
- def provide_self(): return self - self._socket._get_self = provide_self - - def send(self, message, raw = False): - self._socket.send(message, raw) - - def recv(self): - return self._socket.recv() + self._status_listeners = [] # tuples of the form (callback, spawn_thread) + self._status_listeners_lock = threading.RLock() + + # saves our socket's prior _connect() and _close() methods so they can be + # called along with ours + + self._socket_connect = self._socket._connect + self._socket_close = self._socket._close + + self._socket._connect = self._connect + self._socket._close = self._close
def is_alive(self): + """ + Checks if our socket is currently connected. This is a passthrough for our + socket's is_alive() method. + + Returns: + bool that's True if we're shut down and False otherwise + """ + return self._socket.is_alive()
def connect(self): + """ + Reconnects our control socket. This is a passthrough for our socket's + connect() method. + + Raises: + stem.socket.SocketError if unable to make a socket + """ + self._socket.connect()
def close(self): + """ + Closes our socket connection. This is a passthrough for our socket's + close() method. + """ + self._socket.close()
def get_socket(self): + """ + Provides the socket used to speak with the tor process. Communicating with + the socket directly isn't advised since it may confuse the controller. + + Returns: + ControlSocket for process communications + """ + return self._socket
def add_status_listener(self, callback, spawn = True): - self._socket.add_status_listener(callback, spawn) + """ + Notifies a given function when the state of our socket changes. Functions + are expected to be of the form... + + my_function(controller, state, timestamp) + + The state is a value from stem.socket.State, functions *must* allow for + new values in this field. The timestamp is a float for the unix time when + the change occured. + + This class only provides State.INIT and State.CLOSED notifications. + Subclasses may provide others. + + If spawn is True then the callback is notified via a new daemon thread. If + false then the notice is under our locks, within the thread where the + change occured. In general this isn't advised, especially if your callback + could block for a while. + + Arguments: + callback (function) - function to be notified when our state changes + spawn (bool) - calls function via a new thread if True, otherwise + it's part of the connect/close method call + """ + + with self._status_listeners_lock: + self._status_listeners.append((callback, spawn))
def remove_status_listener(self, callback): - self._socket.remove_status_listener(callback) + """ + Stops listener from being notified of further events. + + Arguments: + callback (function) - function to be removed from our listeners + + Returns: + bool that's True if we removed one or more occurances of the callback, + False otherwise + """ + + with self._status_listeners_lock: + new_listeners, is_changed = [], False + + for listener, spawn in self._status_listeners: + if listener != callback: + new_listeners.append((listener, spawn)) + else: is_changed = True + + self._status_listeners = new_listeners + return is_changed + + def _connect(self): + self._notify_status_listeners(State.INIT, True) + self._socket_connect()
- def _make_socket(self): - self._control_socket._make_socket() + def _close(self): + self._notify_status_listeners(State.CLOSED, False) + self._socket_close() + + def _notify_status_listeners(self, state, expect_alive = None): + """ + Informs our status listeners that a state change occured. + + States imply that our socket is either alive or not, which may not hold + true when multiple events occure in quick succession. For instance, a + sighup could cause two events (State.RESET for the sighup and State.CLOSE + if it causes tor to crash). However, there's no guarentee of the order in + which they occure, and it would be bad if listeners got the State.RESET + last, implying that we were alive. + + If set, the expect_alive flag will discard our event if it conflicts with + our current is_alive() state. + + Arguments: + state (stem.socket.State) - state change that has occured + expect_alive (bool) - discard event if it conflicts with our + is_alive() state + """ + + # Our socket's calles (the connect() and close() methods) already acquire + # these locks. However, our subclasses that use this method probably won't + # have them, so locking to prevent those from conflicting with each other + # and connect() / close(). + + with self._socket._send_lock, self._socket._recv_lock, self._status_listeners_lock: + change_timestamp = time.time() + + if expect_alive != None and expect_alive != self.is_alive(): + return + + for listener, spawn in self._status_listeners: + if spawn: + thread.start_new_thread(listener, (self, state, change_timestamp)) + else: + listener(self, state, change_timestamp)
diff --git a/stem/socket.py b/stem/socket.py index da46db2..7e8a560 100644 --- a/stem/socket.py +++ b/stem/socket.py @@ -15,10 +15,7 @@ ControlSocket - Socket wrapper that speaks the tor control protocol. |- recv - receives a ControlMessage from the socket |- is_alive - reports if the socket is known to be closed |- connect - connects a new socket - |- close - shuts down the socket - |- get_socket - provides socket providing base control communication - |- add_status_listener - notifies a callback of changes in the socket status - +- remove_status_listener - prevents further notification of status changes + +- close - shuts down the socket
ControlMessage - Message that's read from the control socket. |- content - provides the parsed message content @@ -47,21 +44,12 @@ ControllerError - Base exception raised when using the controller.
from __future__ import absolute_import import re -import time import socket -import thread import threading
import stem.util.enum import stem.util.log as log
-# state changes a control socket can have -# INIT - new control connection -# RESET - received a reset/sighup signal -# CLOSED - control connection closed - -State = stem.util.enum.Enum("INIT", "RESET", "CLOSED") - KEY_ARG = re.compile("^(\S+)=")
# Escape sequences from the 'esc_for_log' function of tor's 'common/util.c'. @@ -101,9 +89,6 @@ class ControlSocket: self._socket, self._socket_file = None, None self._is_alive = False
- self._status_listeners = [] # tuples of the form (callback, spawn_thread) - self._status_listeners_lock = threading.RLock() - # Tracks sending and receiving separately. This should be safe, and doing # so prevents deadlock where we block writes because we're waiting to read # a message that isn't coming. @@ -196,7 +181,7 @@ class ControlSocket: self._socket_file = self._socket.makefile() self._is_alive = True
- self._notify_status_listeners(State.INIT, True) + self._connect()
def close(self): """ @@ -204,10 +189,10 @@ class ControlSocket: """
with self._send_lock, self._recv_lock: - # Function is idempotent with one exception: we notify listeners if this + # Function is idempotent with one exception: we notify _close() if this # is causing our is_alive() state to change.
- notify_listeners = self.is_alive() + is_change = self.is_alive()
if self._socket: # if we haven't yet established a connection then this raises an error @@ -232,125 +217,28 @@ class ControlSocket: self._socket_file = None self._is_alive = False
- if notify_listeners: - self._notify_status_listeners(State.CLOSED, False) + if is_change: + self._close()
- def get_socket(self): - """ - Provides our base ControlSocket that speaks with the socket. For standard - subclasses this is simply ourselves, but for wrapper instances this is the - ContorlSocket they wrap. - - Use of this, rather than going through its wrapper instance, isn't - generally a good idea. - - Returns: - ControlSocket for base controller communications - """ - + def __enter__(self): return self
- def add_status_listener(self, callback, spawn = True): - """ - Notifies a given function when the state of our socket changes. Functions - are expected to be of the form... - - my_function(control_socket, state, timestamp) - - The state is a value from stem.socket.State, functions *must* allow for - new values in this field. The timestamp is a float for the unix time when - the change occured. - - This class only provides State.INIT and State.CLOSED notifications. - Subclasses may provide others. - - If spawn is True then the callback is notified via a new daemon thread. If - false then the notice is under our locks, within the thread where the - change occured. In general this isn't advised, especially if your callback - could block for a while. - - Arguments: - callback (function) - function to be notified when our state changes - spawn (bool) - calls function via a new thread if True, otherwise - it's part of the connect/close method call - """ - - with self._status_listeners_lock: - self._status_listeners.append((callback, spawn)) - - def remove_status_listener(self, callback): - """ - Stops listener from being notified of further events. - - Arguments: - callback (function) - function to be removed from our listeners - - Returns: - bool that's True if we removed one or more occurances of the callback, - False otherwise - """ - - with self._status_listeners_lock: - new_listeners, is_changed = [], False - - for listener, spawn in self._status_listeners: - if listener != callback: - new_listeners.append((listener, spawn)) - else: is_changed = True - - self._status_listeners = new_listeners - return is_changed + def __exit__(self, type, value, traceback): + self.close()
- def _notify_status_listeners(self, state, expect_alive = None): + def _connect(self): """ - Informs our status listeners that a state change occured. - - States imply that our socket is either alive or not, which may not hold - true when multiple events occure in quick succession. For instance, a - sighup could cause two events (State.RESET for the sighup and State.CLOSE - if it causes tor to crash). However, there's no guarentee of the order in - which they occure, and it would be bad if listeners got the State.RESET - last, implying that we were alive. - - If set, the expect_alive flag will discard our event if it conflicts with - our current is_alive() state. - - Arguments: - state (stem.socket.State) - state change that has occured - expect_alive (bool) - discard event if it conflicts with our - is_alive() state + Connection callback that can be overwritten by subclasses and wrappers. """
- # Our own calles (the connect() and close() methods) already acquire these - # locks. However, our subclasses that use this method probably won't have - # them, so locking to prevent those from conflicting with each other and - # connect() / close(). - - with self._send_lock, self._recv_lock: - change_timestamp = time.time() - - if expect_alive != None and expect_alive != self.is_alive(): - return - - for listener, spawn in self._status_listeners: - if spawn: - thread.start_new_thread(listener, (self._get_self(), state, change_timestamp)) - else: - listener(self._get_self(), state, change_timestamp) + pass
- def _get_self(self): + def _close(self): """ - Provides our self reference. For basic subclasses this is ourselves, but - for wrappers this is the instance wrapping us. + Disconnection callback that can be overwritten by subclasses and wrappers. """
- return self - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - self.close() + pass
def _make_socket(self): """ @@ -801,10 +689,6 @@ def send_message(control_file, message, raw = False):
if not raw: message = send_formatting(message)
- # uses a newline divider if this is a multi-line message (more readable) - log_message = message.replace("\r\n", "\n").rstrip() - div = "\n" if "\n" in log_message else " " - try: control_file.write(message) control_file.flush() diff --git a/test/integ/control/base_controller.py b/test/integ/control/base_controller.py index e30964c..7fb6723 100644 --- a/test/integ/control/base_controller.py +++ b/test/integ/control/base_controller.py @@ -2,13 +2,33 @@ Integration tests for the stem.control.BaseController class. """
+import time import unittest
import stem.control import stem.socket import test.runner import test.mocking as mocking -import test.integ.socket.control_socket + +class StateObserver: + """ + Simple container for listening to ControlSocket state changes and + rembembering them for the test. + """ + + controller = None + state = None + timestamp = None + + def reset(self): + self.controller = None + self.state = None + self.timestamp = None + + def listener(self, controller, state, timestamp): + self.controller = controller + self.state = state + self.timestamp = timestamp
class TestBaseController(unittest.TestCase): def setUp(self): @@ -39,29 +59,59 @@ class TestBaseController(unittest.TestCase): else: self.assertRaises(stem.socket.SocketError, stem.control.BaseController.from_socket_file, test.runner.CONTROL_SOCKET_PATH)
- def test_socket_passthrough(self): + def test_status_notifications(self): """ - The BaseController is a passthrough for the socket it is built from, so - runs the ControlSocket integ tests again against it. + Checks basic functionality of the add_status_listener() and + remove_status_listener() methods. """
- # overwrites the Runner's get_tor_socket() to provide back a ControlSocket - # wrapped by a BaseContorller - - def mock_get_tor_socket(self, authenticate = True): - real_get_tor_socket = mocking.get_real_function(test.runner.Runner.get_tor_socket) - control_socket = real_get_tor_socket(self, authenticate) - return stem.control.BaseController(control_socket) - - mocking.mock_method(test.runner.Runner, "get_tor_socket", mock_get_tor_socket) - - # sanity check that the mocking is working - example_socket = test.runner.get_runner().get_tor_socket() - self.assertTrue(isinstance(example_socket, stem.control.BaseController)) + state_observer = StateObserver()
- # re-runs all of the control_socket tests - socket_test_class = test.integ.socket.control_socket.TestControlSocket - for method in socket_test_class.__dict__: - if method.startswith("test_"): - socket_test_class.__dict__[method](self) + with test.runner.get_runner().get_tor_socket(False) as control_socket: + controller = stem.control.BaseController(control_socket) + controller.add_status_listener(state_observer.listener, False) + + control_socket.close() + self.assertEquals(controller, state_observer.controller) + self.assertEquals(stem.control.State.CLOSED, state_observer.state) + self.assertTrue(state_observer.timestamp < time.time()) + self.assertTrue(state_observer.timestamp > time.time() - 1.0) + state_observer.reset() + + control_socket.connect() + self.assertEquals(controller, state_observer.controller) + self.assertEquals(stem.control.State.INIT, state_observer.state) + self.assertTrue(state_observer.timestamp < time.time()) + self.assertTrue(state_observer.timestamp > time.time() - 1.0) + state_observer.reset() + + # cause the socket to shut down without calling close() + control_socket.send("Blarg!") + control_socket.recv() + self.assertRaises(stem.socket.SocketClosed, control_socket.recv) + self.assertEquals(controller, state_observer.controller) + self.assertEquals(stem.control.State.CLOSED, state_observer.state) + self.assertTrue(state_observer.timestamp < time.time()) + self.assertTrue(state_observer.timestamp > time.time() - 1.0) + state_observer.reset() + + # remove listener and make sure we don't get further notices + controller.remove_status_listener(state_observer.listener) + control_socket.connect() + self.assertEquals(None, state_observer.controller) + self.assertEquals(None, state_observer.state) + self.assertEquals(None, state_observer.timestamp) + state_observer.reset() + + # add with spawn as true, we need a little delay on this since we then + # get the notice asynchronously + + controller.add_status_listener(state_observer.listener, True) + control_socket.close() + time.sleep(0.1) # not much work going on so this doesn't need to be much + self.assertEquals(controller, state_observer.controller) + self.assertEquals(stem.control.State.CLOSED, state_observer.state) + self.assertTrue(state_observer.timestamp < time.time()) + self.assertTrue(state_observer.timestamp > time.time() - 1.0) + state_observer.reset()
diff --git a/test/integ/socket/control_socket.py b/test/integ/socket/control_socket.py index ec19d82..366501f 100644 --- a/test/integ/socket/control_socket.py +++ b/test/integ/socket/control_socket.py @@ -8,7 +8,6 @@ those focus on parsing and correctness of the content these are more concerned with the behavior of the socket itself. """
-import time import unittest
import stem.connection @@ -16,26 +15,6 @@ import stem.control import stem.socket import test.runner
-class StateObserver: - """ - Simple container for listening to ControlSocket state changes and - rembembering them for the test. - """ - - control_socket = None - state = None - timestamp = None - - def reset(self): - self.control_socket = None - self.state = None - self.timestamp = None - - def listener(self, control_socket, state, timestamp): - self.control_socket = control_socket - self.state = state - self.timestamp = timestamp - class TestControlSocket(unittest.TestCase): def setUp(self): test.runner.require_control(self) @@ -86,7 +65,7 @@ class TestControlSocket(unittest.TestCase): # If we send another message to a port based socket then it will seem to # succeed. However, a file based socket should report a failure.
- if control_socket.get_socket().__class__ == stem.socket.ControlPort: + if isinstance(control_socket, stem.socket.ControlPort): control_socket.send("blarg") self.assertTrue(control_socket.is_alive()) else: @@ -136,59 +115,4 @@ class TestControlSocket(unittest.TestCase): control_socket.close() self.assertRaises(stem.socket.SocketClosed, control_socket.send, "PROTOCOLINFO 1") control_socket.connect() - - def test_status_notifications(self): - """ - Checks basic functionality of the add_status_listener() and - remove_status_listener() methods. - """ - - state_observer = StateObserver() - - with test.runner.get_runner().get_tor_socket(False) as control_socket: - control_socket.add_status_listener(state_observer.listener, False) - - control_socket.close() - self.assertEquals(control_socket, state_observer.control_socket) - self.assertEquals(stem.socket.State.CLOSED, state_observer.state) - self.assertTrue(state_observer.timestamp < time.time()) - self.assertTrue(state_observer.timestamp > time.time() - 1.0) - state_observer.reset() - - control_socket.connect() - self.assertEquals(control_socket, state_observer.control_socket) - self.assertEquals(stem.socket.State.INIT, state_observer.state) - self.assertTrue(state_observer.timestamp < time.time()) - self.assertTrue(state_observer.timestamp > time.time() - 1.0) - state_observer.reset() - - # cause the socket to shut down without calling close() - control_socket.send("Blarg!") - control_socket.recv() - self.assertRaises(stem.socket.SocketClosed, control_socket.recv) - self.assertEquals(control_socket, state_observer.control_socket) - self.assertEquals(stem.socket.State.CLOSED, state_observer.state) - self.assertTrue(state_observer.timestamp < time.time()) - self.assertTrue(state_observer.timestamp > time.time() - 1.0) - state_observer.reset() - - # remove listener and make sure we don't get further notices - control_socket.remove_status_listener(state_observer.listener) - control_socket.connect() - self.assertEquals(None, state_observer.control_socket) - self.assertEquals(None, state_observer.state) - self.assertEquals(None, state_observer.timestamp) - state_observer.reset() - - # add with spawn as true, we need a little delay on this since we then - # get the notice asynchronously - - control_socket.add_status_listener(state_observer.listener, True) - control_socket.close() - time.sleep(0.1) # not much work going on so this doesn't need to be much - self.assertEquals(control_socket, state_observer.control_socket) - self.assertEquals(stem.socket.State.CLOSED, state_observer.state) - self.assertTrue(state_observer.timestamp < time.time()) - self.assertTrue(state_observer.timestamp > time.time() - 1.0) - state_observer.reset()