commit 1b1d48c37ae9092fdb93a76a3bc6201dad3113e4 Author: Damian Johnson atagar@torproject.org Date: Thu Feb 9 10:32:43 2012 -0800
ControlSocket listeners for state changes
Adding functions to let users subscribe for state change notifications. This is something that I've found highly useful in the past, and will also be needed for the BaseController implementation. --- stem/control.py | 28 +++++-- stem/socket.py | 140 ++++++++++++++++++++++++++++++++++- test/integ/socket/control_socket.py | 83 +++++++++++++++++++-- 3 files changed, 236 insertions(+), 15 deletions(-)
diff --git a/stem/control.py b/stem/control.py index c485cb9..ba18361 100644 --- a/stem/control.py +++ b/stem/control.py @@ -19,8 +19,8 @@ class BaseController(stem.socket.ControlSocket): controllers, providing basic process communication and event listing. Don't use this directly - subclasses provide higher level functionality.
- Attributes: - socket - ControlSocket from which this was constructed + Do not continue to directly interacte with the ControlSocket we're + constructed from - use our wrapper methods instead. """
# TODO: Convenience methods for the BaseController are pointless since @@ -66,22 +66,34 @@ class BaseController(stem.socket.ControlSocket): from_socket_file = staticmethod(from_socket_file)
def __init__(self, control_socket): - self.socket = control_socket + self._socket = control_socket + + def provide_self(): return self + self._socket._get_self = provide_self
def send(self, message, raw = False): - self.socket.send(message, raw) + self._socket.send(message, raw)
def recv(self): - return self.socket.recv() + return self._socket.recv()
def is_alive(self): - return self.socket.is_alive() + return self._socket.is_alive()
def connect(self): - self.socket.connect() + self._socket.connect()
def close(self): - self.socket.close() + self._socket.close() + + def get_socket(self): + return self._socket + + def add_status_listener(self, callback, spawn = True): + self._socket.add_status_listener(callback, spawn) + + def remove_status_listener(self, callback): + self._socket.remove_status_listener(callback)
def _make_socket(self): self._control_socket._make_socket() diff --git a/stem/socket.py b/stem/socket.py index 9379884..da46db2 100644 --- a/stem/socket.py +++ b/stem/socket.py @@ -15,7 +15,10 @@ ControlSocket - Socket wrapper that speaks the tor control protocol. |- recv - receives a ControlMessage from the socket |- is_alive - reports if the socket is known to be closed |- connect - connects a new socket - +- close - shuts down the socket + |- close - shuts down the socket + |- get_socket - provides socket providing base control communication + |- add_status_listener - notifies a callback of changes in the socket status + +- remove_status_listener - prevents further notification of status changes
ControlMessage - Message that's read from the control socket. |- content - provides the parsed message content @@ -44,11 +47,21 @@ ControllerError - Base exception raised when using the controller.
from __future__ import absolute_import import re +import time import socket +import thread import threading
+import stem.util.enum import stem.util.log as log
+# state changes a control socket can have +# INIT - new control connection +# RESET - received a reset/sighup signal +# CLOSED - control connection closed + +State = stem.util.enum.Enum("INIT", "RESET", "CLOSED") + KEY_ARG = re.compile("^(\S+)=")
# Escape sequences from the 'esc_for_log' function of tor's 'common/util.c'. @@ -88,6 +101,9 @@ class ControlSocket: self._socket, self._socket_file = None, None self._is_alive = False
+ self._status_listeners = [] # tuples of the form (callback, spawn_thread) + self._status_listeners_lock = threading.RLock() + # Tracks sending and receiving separately. This should be safe, and doing # so prevents deadlock where we block writes because we're waiting to read # a message that isn't coming. @@ -179,6 +195,8 @@ class ControlSocket: self._socket = self._make_socket() self._socket_file = self._socket.makefile() self._is_alive = True + + self._notify_status_listeners(State.INIT, True)
def close(self): """ @@ -186,9 +204,15 @@ class ControlSocket: """
with self._send_lock, self._recv_lock: + # Function is idempotent with one exception: we notify listeners if this + # is causing our is_alive() state to change. + + notify_listeners = self.is_alive() + if self._socket: # if we haven't yet established a connection then this raises an error # socket.error: [Errno 107] Transport endpoint is not connected + try: self._socket.shutdown(socket.SHUT_RDWR) except socket.error: pass
@@ -207,6 +231,120 @@ class ControlSocket: self._socket = None self._socket_file = None self._is_alive = False + + if notify_listeners: + self._notify_status_listeners(State.CLOSED, False) + + def get_socket(self): + """ + Provides our base ControlSocket that speaks with the socket. For standard + subclasses this is simply ourselves, but for wrapper instances this is the + ContorlSocket they wrap. + + Use of this, rather than going through its wrapper instance, isn't + generally a good idea. + + Returns: + ControlSocket for base controller communications + """ + + return self + + def add_status_listener(self, callback, spawn = True): + """ + Notifies a given function when the state of our socket changes. Functions + are expected to be of the form... + + my_function(control_socket, state, timestamp) + + The state is a value from stem.socket.State, functions *must* allow for + new values in this field. The timestamp is a float for the unix time when + the change occured. + + This class only provides State.INIT and State.CLOSED notifications. + Subclasses may provide others. + + If spawn is True then the callback is notified via a new daemon thread. If + false then the notice is under our locks, within the thread where the + change occured. In general this isn't advised, especially if your callback + could block for a while. + + Arguments: + callback (function) - function to be notified when our state changes + spawn (bool) - calls function via a new thread if True, otherwise + it's part of the connect/close method call + """ + + with self._status_listeners_lock: + self._status_listeners.append((callback, spawn)) + + def remove_status_listener(self, callback): + """ + Stops listener from being notified of further events. + + Arguments: + callback (function) - function to be removed from our listeners + + Returns: + bool that's True if we removed one or more occurances of the callback, + False otherwise + """ + + with self._status_listeners_lock: + new_listeners, is_changed = [], False + + for listener, spawn in self._status_listeners: + if listener != callback: + new_listeners.append((listener, spawn)) + else: is_changed = True + + self._status_listeners = new_listeners + return is_changed + + def _notify_status_listeners(self, state, expect_alive = None): + """ + Informs our status listeners that a state change occured. + + States imply that our socket is either alive or not, which may not hold + true when multiple events occure in quick succession. For instance, a + sighup could cause two events (State.RESET for the sighup and State.CLOSE + if it causes tor to crash). However, there's no guarentee of the order in + which they occure, and it would be bad if listeners got the State.RESET + last, implying that we were alive. + + If set, the expect_alive flag will discard our event if it conflicts with + our current is_alive() state. + + Arguments: + state (stem.socket.State) - state change that has occured + expect_alive (bool) - discard event if it conflicts with our + is_alive() state + """ + + # Our own calles (the connect() and close() methods) already acquire these + # locks. However, our subclasses that use this method probably won't have + # them, so locking to prevent those from conflicting with each other and + # connect() / close(). + + with self._send_lock, self._recv_lock: + change_timestamp = time.time() + + if expect_alive != None and expect_alive != self.is_alive(): + return + + for listener, spawn in self._status_listeners: + if spawn: + thread.start_new_thread(listener, (self._get_self(), state, change_timestamp)) + else: + listener(self._get_self(), state, change_timestamp) + + def _get_self(self): + """ + Provides our self reference. For basic subclasses this is ourselves, but + for wrappers this is the instance wrapping us. + """ + + return self
def __enter__(self): return self diff --git a/test/integ/socket/control_socket.py b/test/integ/socket/control_socket.py index 3a4aaeb..ec19d82 100644 --- a/test/integ/socket/control_socket.py +++ b/test/integ/socket/control_socket.py @@ -8,6 +8,7 @@ those focus on parsing and correctness of the content these are more concerned with the behavior of the socket itself. """
+import time import unittest
import stem.connection @@ -15,6 +16,26 @@ import stem.control import stem.socket import test.runner
+class StateObserver: + """ + Simple container for listening to ControlSocket state changes and + rembembering them for the test. + """ + + control_socket = None + state = None + timestamp = None + + def reset(self): + self.control_socket = None + self.state = None + self.timestamp = None + + def listener(self, control_socket, state, timestamp): + self.control_socket = control_socket + self.state = state + self.timestamp = timestamp + class TestControlSocket(unittest.TestCase): def setUp(self): test.runner.require_control(self) @@ -65,12 +86,7 @@ class TestControlSocket(unittest.TestCase): # If we send another message to a port based socket then it will seem to # succeed. However, a file based socket should report a failure.
- if isinstance(control_socket, stem.control.BaseController): - base_socket = control_socket.socket - else: - base_socket = control_socket - - if isinstance(base_socket, stem.socket.ControlPort): + if control_socket.get_socket().__class__ == stem.socket.ControlPort: control_socket.send("blarg") self.assertTrue(control_socket.is_alive()) else: @@ -120,4 +136,59 @@ class TestControlSocket(unittest.TestCase): control_socket.close() self.assertRaises(stem.socket.SocketClosed, control_socket.send, "PROTOCOLINFO 1") control_socket.connect() + + def test_status_notifications(self): + """ + Checks basic functionality of the add_status_listener() and + remove_status_listener() methods. + """ + + state_observer = StateObserver() + + with test.runner.get_runner().get_tor_socket(False) as control_socket: + control_socket.add_status_listener(state_observer.listener, False) + + control_socket.close() + self.assertEquals(control_socket, state_observer.control_socket) + self.assertEquals(stem.socket.State.CLOSED, state_observer.state) + self.assertTrue(state_observer.timestamp < time.time()) + self.assertTrue(state_observer.timestamp > time.time() - 1.0) + state_observer.reset() + + control_socket.connect() + self.assertEquals(control_socket, state_observer.control_socket) + self.assertEquals(stem.socket.State.INIT, state_observer.state) + self.assertTrue(state_observer.timestamp < time.time()) + self.assertTrue(state_observer.timestamp > time.time() - 1.0) + state_observer.reset() + + # cause the socket to shut down without calling close() + control_socket.send("Blarg!") + control_socket.recv() + self.assertRaises(stem.socket.SocketClosed, control_socket.recv) + self.assertEquals(control_socket, state_observer.control_socket) + self.assertEquals(stem.socket.State.CLOSED, state_observer.state) + self.assertTrue(state_observer.timestamp < time.time()) + self.assertTrue(state_observer.timestamp > time.time() - 1.0) + state_observer.reset() + + # remove listener and make sure we don't get further notices + control_socket.remove_status_listener(state_observer.listener) + control_socket.connect() + self.assertEquals(None, state_observer.control_socket) + self.assertEquals(None, state_observer.state) + self.assertEquals(None, state_observer.timestamp) + state_observer.reset() + + # add with spawn as true, we need a little delay on this since we then + # get the notice asynchronously + + control_socket.add_status_listener(state_observer.listener, True) + control_socket.close() + time.sleep(0.1) # not much work going on so this doesn't need to be much + self.assertEquals(control_socket, state_observer.control_socket) + self.assertEquals(stem.socket.State.CLOSED, state_observer.state) + self.assertTrue(state_observer.timestamp < time.time()) + self.assertTrue(state_observer.timestamp > time.time() - 1.0) + state_observer.reset()