commit 4ff7efe297f2a076e1b4dc06a3686fdcbf260f8a
Author: Damian Johnson <atagar(a)torproject.org>
Date: Mon Feb 13 08:59:11 2012 -0800
Separating BaseController into standalone class
The BaseController was previously a ControlSocket subclass because it was
easier to use when several of its methods were accessable. However, from an
implementation perspective the BaseController was a wrapper class, not a proper
subclass.
After experimenting a bit more I realized that I don't want the BaseController
to provide *all* of the ControlSocket methods. In particular, it doesn't make
sense for callers to use the send() and recv() when there will be a msg()
method similar to TorCtl's sendAndRecv(). The wrapper/subclass mix was also
just plain old confusing as an object-oriented design.
I'm moving the notifications up to the controller so the ControlSocket is very
similar to how it was a couple weeks ago. Also dropping the passthrough integ
tests since most of them will break without send/recv - I'll need to add some
more targeted tests for passthrough methods later.
---
stem/control.py | 172 +++++++++++++++++++++++++++++----
stem/socket.py | 146 +++-------------------------
test/integ/control/base_controller.py | 94 ++++++++++++++----
test/integ/socket/control_socket.py | 78 +---------------
4 files changed, 242 insertions(+), 248 deletions(-)
diff --git a/stem/control.py b/stem/control.py
index ba18361..9dfc56d 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -1,19 +1,36 @@
"""
Classes for interacting with the tor control socket.
-Controllers are a wrapper around a ControlSocket, retaining its low-level
-connection methods (send, recv, is_alive, etc) in addition to providing methods
-for interacting at a higher level.
+Controllers are a wrapper around a ControlSocket, retaining many of its methods
+(send, recv, is_alive, etc) in addition to providing its own for interacting at
+a higher level.
from_port - Provides a Controller based on a port connection.
from_socket_file - Provides a Controller based on a socket file connection.
-BaseController - ControlSocket subclass providing asynchronous message handling.
+BaseController - Base controller class asynchronous message handling.
+ |- is_alive - reports if the socket is known to be closed
+ |- connect - connects a new socket
+ |- close - shuts down the socket
+ |- get_socket - provides socket providing base control communication
+ |- add_status_listener - notifies a callback of changes in the socket status
+ +- remove_status_listener - prevents further notification of status changes
"""
+import time
+import thread
+import threading
+
import stem.socket
-class BaseController(stem.socket.ControlSocket):
+# state changes a control socket can have
+# INIT - new control connection
+# RESET - received a reset/sighup signal
+# CLOSED - control connection closed
+
+State = stem.util.enum.Enum("INIT", "RESET", "CLOSED")
+
+class BaseController:
"""
Controller for the tor process. This is a minimal base class for other
controllers, providing basic process communication and event listing. Don't
@@ -61,40 +78,159 @@ class BaseController(stem.socket.ControlSocket):
control_socket = stem.socket.ControlSocketFile(socket_path)
return BaseController(control_socket)
-
+
from_port = staticmethod(from_port)
from_socket_file = staticmethod(from_socket_file)
def __init__(self, control_socket):
self._socket = control_socket
- def provide_self(): return self
- self._socket._get_self = provide_self
-
- def send(self, message, raw = False):
- self._socket.send(message, raw)
-
- def recv(self):
- return self._socket.recv()
+ self._status_listeners = [] # tuples of the form (callback, spawn_thread)
+ self._status_listeners_lock = threading.RLock()
+
+ # saves our socket's prior _connect() and _close() methods so they can be
+ # called along with ours
+
+ self._socket_connect = self._socket._connect
+ self._socket_close = self._socket._close
+
+ self._socket._connect = self._connect
+ self._socket._close = self._close
def is_alive(self):
+ """
+ Checks if our socket is currently connected. This is a passthrough for our
+ socket's is_alive() method.
+
+ Returns:
+ bool that's True if we're shut down and False otherwise
+ """
+
return self._socket.is_alive()
def connect(self):
+ """
+ Reconnects our control socket. This is a passthrough for our socket's
+ connect() method.
+
+ Raises:
+ stem.socket.SocketError if unable to make a socket
+ """
+
self._socket.connect()
def close(self):
+ """
+ Closes our socket connection. This is a passthrough for our socket's
+ close() method.
+ """
+
self._socket.close()
def get_socket(self):
+ """
+ Provides the socket used to speak with the tor process. Communicating with
+ the socket directly isn't advised since it may confuse the controller.
+
+ Returns:
+ ControlSocket for process communications
+ """
+
return self._socket
def add_status_listener(self, callback, spawn = True):
- self._socket.add_status_listener(callback, spawn)
+ """
+ Notifies a given function when the state of our socket changes. Functions
+ are expected to be of the form...
+
+ my_function(controller, state, timestamp)
+
+ The state is a value from stem.socket.State, functions *must* allow for
+ new values in this field. The timestamp is a float for the unix time when
+ the change occured.
+
+ This class only provides State.INIT and State.CLOSED notifications.
+ Subclasses may provide others.
+
+ If spawn is True then the callback is notified via a new daemon thread. If
+ false then the notice is under our locks, within the thread where the
+ change occured. In general this isn't advised, especially if your callback
+ could block for a while.
+
+ Arguments:
+ callback (function) - function to be notified when our state changes
+ spawn (bool) - calls function via a new thread if True, otherwise
+ it's part of the connect/close method call
+ """
+
+ with self._status_listeners_lock:
+ self._status_listeners.append((callback, spawn))
def remove_status_listener(self, callback):
- self._socket.remove_status_listener(callback)
+ """
+ Stops listener from being notified of further events.
+
+ Arguments:
+ callback (function) - function to be removed from our listeners
+
+ Returns:
+ bool that's True if we removed one or more occurances of the callback,
+ False otherwise
+ """
+
+ with self._status_listeners_lock:
+ new_listeners, is_changed = [], False
+
+ for listener, spawn in self._status_listeners:
+ if listener != callback:
+ new_listeners.append((listener, spawn))
+ else: is_changed = True
+
+ self._status_listeners = new_listeners
+ return is_changed
+
+ def _connect(self):
+ self._notify_status_listeners(State.INIT, True)
+ self._socket_connect()
- def _make_socket(self):
- self._control_socket._make_socket()
+ def _close(self):
+ self._notify_status_listeners(State.CLOSED, False)
+ self._socket_close()
+
+ def _notify_status_listeners(self, state, expect_alive = None):
+ """
+ Informs our status listeners that a state change occured.
+
+ States imply that our socket is either alive or not, which may not hold
+ true when multiple events occure in quick succession. For instance, a
+ sighup could cause two events (State.RESET for the sighup and State.CLOSE
+ if it causes tor to crash). However, there's no guarentee of the order in
+ which they occure, and it would be bad if listeners got the State.RESET
+ last, implying that we were alive.
+
+ If set, the expect_alive flag will discard our event if it conflicts with
+ our current is_alive() state.
+
+ Arguments:
+ state (stem.socket.State) - state change that has occured
+ expect_alive (bool) - discard event if it conflicts with our
+ is_alive() state
+ """
+
+ # Our socket's calles (the connect() and close() methods) already acquire
+ # these locks. However, our subclasses that use this method probably won't
+ # have them, so locking to prevent those from conflicting with each other
+ # and connect() / close().
+
+ with self._socket._send_lock, self._socket._recv_lock, self._status_listeners_lock:
+ change_timestamp = time.time()
+
+ if expect_alive != None and expect_alive != self.is_alive():
+ return
+
+ for listener, spawn in self._status_listeners:
+ if spawn:
+ thread.start_new_thread(listener, (self, state, change_timestamp))
+ else:
+ listener(self, state, change_timestamp)
diff --git a/stem/socket.py b/stem/socket.py
index da46db2..7e8a560 100644
--- a/stem/socket.py
+++ b/stem/socket.py
@@ -15,10 +15,7 @@ ControlSocket - Socket wrapper that speaks the tor control protocol.
|- recv - receives a ControlMessage from the socket
|- is_alive - reports if the socket is known to be closed
|- connect - connects a new socket
- |- close - shuts down the socket
- |- get_socket - provides socket providing base control communication
- |- add_status_listener - notifies a callback of changes in the socket status
- +- remove_status_listener - prevents further notification of status changes
+ +- close - shuts down the socket
ControlMessage - Message that's read from the control socket.
|- content - provides the parsed message content
@@ -47,21 +44,12 @@ ControllerError - Base exception raised when using the controller.
from __future__ import absolute_import
import re
-import time
import socket
-import thread
import threading
import stem.util.enum
import stem.util.log as log
-# state changes a control socket can have
-# INIT - new control connection
-# RESET - received a reset/sighup signal
-# CLOSED - control connection closed
-
-State = stem.util.enum.Enum("INIT", "RESET", "CLOSED")
-
KEY_ARG = re.compile("^(\S+)=")
# Escape sequences from the 'esc_for_log' function of tor's 'common/util.c'.
@@ -101,9 +89,6 @@ class ControlSocket:
self._socket, self._socket_file = None, None
self._is_alive = False
- self._status_listeners = [] # tuples of the form (callback, spawn_thread)
- self._status_listeners_lock = threading.RLock()
-
# Tracks sending and receiving separately. This should be safe, and doing
# so prevents deadlock where we block writes because we're waiting to read
# a message that isn't coming.
@@ -196,7 +181,7 @@ class ControlSocket:
self._socket_file = self._socket.makefile()
self._is_alive = True
- self._notify_status_listeners(State.INIT, True)
+ self._connect()
def close(self):
"""
@@ -204,10 +189,10 @@ class ControlSocket:
"""
with self._send_lock, self._recv_lock:
- # Function is idempotent with one exception: we notify listeners if this
+ # Function is idempotent with one exception: we notify _close() if this
# is causing our is_alive() state to change.
- notify_listeners = self.is_alive()
+ is_change = self.is_alive()
if self._socket:
# if we haven't yet established a connection then this raises an error
@@ -232,125 +217,28 @@ class ControlSocket:
self._socket_file = None
self._is_alive = False
- if notify_listeners:
- self._notify_status_listeners(State.CLOSED, False)
+ if is_change:
+ self._close()
- def get_socket(self):
- """
- Provides our base ControlSocket that speaks with the socket. For standard
- subclasses this is simply ourselves, but for wrapper instances this is the
- ContorlSocket they wrap.
-
- Use of this, rather than going through its wrapper instance, isn't
- generally a good idea.
-
- Returns:
- ControlSocket for base controller communications
- """
-
+ def __enter__(self):
return self
- def add_status_listener(self, callback, spawn = True):
- """
- Notifies a given function when the state of our socket changes. Functions
- are expected to be of the form...
-
- my_function(control_socket, state, timestamp)
-
- The state is a value from stem.socket.State, functions *must* allow for
- new values in this field. The timestamp is a float for the unix time when
- the change occured.
-
- This class only provides State.INIT and State.CLOSED notifications.
- Subclasses may provide others.
-
- If spawn is True then the callback is notified via a new daemon thread. If
- false then the notice is under our locks, within the thread where the
- change occured. In general this isn't advised, especially if your callback
- could block for a while.
-
- Arguments:
- callback (function) - function to be notified when our state changes
- spawn (bool) - calls function via a new thread if True, otherwise
- it's part of the connect/close method call
- """
-
- with self._status_listeners_lock:
- self._status_listeners.append((callback, spawn))
-
- def remove_status_listener(self, callback):
- """
- Stops listener from being notified of further events.
-
- Arguments:
- callback (function) - function to be removed from our listeners
-
- Returns:
- bool that's True if we removed one or more occurances of the callback,
- False otherwise
- """
-
- with self._status_listeners_lock:
- new_listeners, is_changed = [], False
-
- for listener, spawn in self._status_listeners:
- if listener != callback:
- new_listeners.append((listener, spawn))
- else: is_changed = True
-
- self._status_listeners = new_listeners
- return is_changed
+ def __exit__(self, type, value, traceback):
+ self.close()
- def _notify_status_listeners(self, state, expect_alive = None):
+ def _connect(self):
"""
- Informs our status listeners that a state change occured.
-
- States imply that our socket is either alive or not, which may not hold
- true when multiple events occure in quick succession. For instance, a
- sighup could cause two events (State.RESET for the sighup and State.CLOSE
- if it causes tor to crash). However, there's no guarentee of the order in
- which they occure, and it would be bad if listeners got the State.RESET
- last, implying that we were alive.
-
- If set, the expect_alive flag will discard our event if it conflicts with
- our current is_alive() state.
-
- Arguments:
- state (stem.socket.State) - state change that has occured
- expect_alive (bool) - discard event if it conflicts with our
- is_alive() state
+ Connection callback that can be overwritten by subclasses and wrappers.
"""
- # Our own calles (the connect() and close() methods) already acquire these
- # locks. However, our subclasses that use this method probably won't have
- # them, so locking to prevent those from conflicting with each other and
- # connect() / close().
-
- with self._send_lock, self._recv_lock:
- change_timestamp = time.time()
-
- if expect_alive != None and expect_alive != self.is_alive():
- return
-
- for listener, spawn in self._status_listeners:
- if spawn:
- thread.start_new_thread(listener, (self._get_self(), state, change_timestamp))
- else:
- listener(self._get_self(), state, change_timestamp)
+ pass
- def _get_self(self):
+ def _close(self):
"""
- Provides our self reference. For basic subclasses this is ourselves, but
- for wrappers this is the instance wrapping us.
+ Disconnection callback that can be overwritten by subclasses and wrappers.
"""
- return self
-
- def __enter__(self):
- return self
-
- def __exit__(self, type, value, traceback):
- self.close()
+ pass
def _make_socket(self):
"""
@@ -801,10 +689,6 @@ def send_message(control_file, message, raw = False):
if not raw: message = send_formatting(message)
- # uses a newline divider if this is a multi-line message (more readable)
- log_message = message.replace("\r\n", "\n").rstrip()
- div = "\n" if "\n" in log_message else " "
-
try:
control_file.write(message)
control_file.flush()
diff --git a/test/integ/control/base_controller.py b/test/integ/control/base_controller.py
index e30964c..7fb6723 100644
--- a/test/integ/control/base_controller.py
+++ b/test/integ/control/base_controller.py
@@ -2,13 +2,33 @@
Integration tests for the stem.control.BaseController class.
"""
+import time
import unittest
import stem.control
import stem.socket
import test.runner
import test.mocking as mocking
-import test.integ.socket.control_socket
+
+class StateObserver:
+ """
+ Simple container for listening to ControlSocket state changes and
+ rembembering them for the test.
+ """
+
+ controller = None
+ state = None
+ timestamp = None
+
+ def reset(self):
+ self.controller = None
+ self.state = None
+ self.timestamp = None
+
+ def listener(self, controller, state, timestamp):
+ self.controller = controller
+ self.state = state
+ self.timestamp = timestamp
class TestBaseController(unittest.TestCase):
def setUp(self):
@@ -39,29 +59,59 @@ class TestBaseController(unittest.TestCase):
else:
self.assertRaises(stem.socket.SocketError, stem.control.BaseController.from_socket_file, test.runner.CONTROL_SOCKET_PATH)
- def test_socket_passthrough(self):
+ def test_status_notifications(self):
"""
- The BaseController is a passthrough for the socket it is built from, so
- runs the ControlSocket integ tests again against it.
+ Checks basic functionality of the add_status_listener() and
+ remove_status_listener() methods.
"""
- # overwrites the Runner's get_tor_socket() to provide back a ControlSocket
- # wrapped by a BaseContorller
-
- def mock_get_tor_socket(self, authenticate = True):
- real_get_tor_socket = mocking.get_real_function(test.runner.Runner.get_tor_socket)
- control_socket = real_get_tor_socket(self, authenticate)
- return stem.control.BaseController(control_socket)
-
- mocking.mock_method(test.runner.Runner, "get_tor_socket", mock_get_tor_socket)
-
- # sanity check that the mocking is working
- example_socket = test.runner.get_runner().get_tor_socket()
- self.assertTrue(isinstance(example_socket, stem.control.BaseController))
+ state_observer = StateObserver()
- # re-runs all of the control_socket tests
- socket_test_class = test.integ.socket.control_socket.TestControlSocket
- for method in socket_test_class.__dict__:
- if method.startswith("test_"):
- socket_test_class.__dict__[method](self)
+ with test.runner.get_runner().get_tor_socket(False) as control_socket:
+ controller = stem.control.BaseController(control_socket)
+ controller.add_status_listener(state_observer.listener, False)
+
+ control_socket.close()
+ self.assertEquals(controller, state_observer.controller)
+ self.assertEquals(stem.control.State.CLOSED, state_observer.state)
+ self.assertTrue(state_observer.timestamp < time.time())
+ self.assertTrue(state_observer.timestamp > time.time() - 1.0)
+ state_observer.reset()
+
+ control_socket.connect()
+ self.assertEquals(controller, state_observer.controller)
+ self.assertEquals(stem.control.State.INIT, state_observer.state)
+ self.assertTrue(state_observer.timestamp < time.time())
+ self.assertTrue(state_observer.timestamp > time.time() - 1.0)
+ state_observer.reset()
+
+ # cause the socket to shut down without calling close()
+ control_socket.send("Blarg!")
+ control_socket.recv()
+ self.assertRaises(stem.socket.SocketClosed, control_socket.recv)
+ self.assertEquals(controller, state_observer.controller)
+ self.assertEquals(stem.control.State.CLOSED, state_observer.state)
+ self.assertTrue(state_observer.timestamp < time.time())
+ self.assertTrue(state_observer.timestamp > time.time() - 1.0)
+ state_observer.reset()
+
+ # remove listener and make sure we don't get further notices
+ controller.remove_status_listener(state_observer.listener)
+ control_socket.connect()
+ self.assertEquals(None, state_observer.controller)
+ self.assertEquals(None, state_observer.state)
+ self.assertEquals(None, state_observer.timestamp)
+ state_observer.reset()
+
+ # add with spawn as true, we need a little delay on this since we then
+ # get the notice asynchronously
+
+ controller.add_status_listener(state_observer.listener, True)
+ control_socket.close()
+ time.sleep(0.1) # not much work going on so this doesn't need to be much
+ self.assertEquals(controller, state_observer.controller)
+ self.assertEquals(stem.control.State.CLOSED, state_observer.state)
+ self.assertTrue(state_observer.timestamp < time.time())
+ self.assertTrue(state_observer.timestamp > time.time() - 1.0)
+ state_observer.reset()
diff --git a/test/integ/socket/control_socket.py b/test/integ/socket/control_socket.py
index ec19d82..366501f 100644
--- a/test/integ/socket/control_socket.py
+++ b/test/integ/socket/control_socket.py
@@ -8,7 +8,6 @@ those focus on parsing and correctness of the content these are more concerned
with the behavior of the socket itself.
"""
-import time
import unittest
import stem.connection
@@ -16,26 +15,6 @@ import stem.control
import stem.socket
import test.runner
-class StateObserver:
- """
- Simple container for listening to ControlSocket state changes and
- rembembering them for the test.
- """
-
- control_socket = None
- state = None
- timestamp = None
-
- def reset(self):
- self.control_socket = None
- self.state = None
- self.timestamp = None
-
- def listener(self, control_socket, state, timestamp):
- self.control_socket = control_socket
- self.state = state
- self.timestamp = timestamp
-
class TestControlSocket(unittest.TestCase):
def setUp(self):
test.runner.require_control(self)
@@ -86,7 +65,7 @@ class TestControlSocket(unittest.TestCase):
# If we send another message to a port based socket then it will seem to
# succeed. However, a file based socket should report a failure.
- if control_socket.get_socket().__class__ == stem.socket.ControlPort:
+ if isinstance(control_socket, stem.socket.ControlPort):
control_socket.send("blarg")
self.assertTrue(control_socket.is_alive())
else:
@@ -136,59 +115,4 @@ class TestControlSocket(unittest.TestCase):
control_socket.close()
self.assertRaises(stem.socket.SocketClosed, control_socket.send, "PROTOCOLINFO 1")
control_socket.connect()
-
- def test_status_notifications(self):
- """
- Checks basic functionality of the add_status_listener() and
- remove_status_listener() methods.
- """
-
- state_observer = StateObserver()
-
- with test.runner.get_runner().get_tor_socket(False) as control_socket:
- control_socket.add_status_listener(state_observer.listener, False)
-
- control_socket.close()
- self.assertEquals(control_socket, state_observer.control_socket)
- self.assertEquals(stem.socket.State.CLOSED, state_observer.state)
- self.assertTrue(state_observer.timestamp < time.time())
- self.assertTrue(state_observer.timestamp > time.time() - 1.0)
- state_observer.reset()
-
- control_socket.connect()
- self.assertEquals(control_socket, state_observer.control_socket)
- self.assertEquals(stem.socket.State.INIT, state_observer.state)
- self.assertTrue(state_observer.timestamp < time.time())
- self.assertTrue(state_observer.timestamp > time.time() - 1.0)
- state_observer.reset()
-
- # cause the socket to shut down without calling close()
- control_socket.send("Blarg!")
- control_socket.recv()
- self.assertRaises(stem.socket.SocketClosed, control_socket.recv)
- self.assertEquals(control_socket, state_observer.control_socket)
- self.assertEquals(stem.socket.State.CLOSED, state_observer.state)
- self.assertTrue(state_observer.timestamp < time.time())
- self.assertTrue(state_observer.timestamp > time.time() - 1.0)
- state_observer.reset()
-
- # remove listener and make sure we don't get further notices
- control_socket.remove_status_listener(state_observer.listener)
- control_socket.connect()
- self.assertEquals(None, state_observer.control_socket)
- self.assertEquals(None, state_observer.state)
- self.assertEquals(None, state_observer.timestamp)
- state_observer.reset()
-
- # add with spawn as true, we need a little delay on this since we then
- # get the notice asynchronously
-
- control_socket.add_status_listener(state_observer.listener, True)
- control_socket.close()
- time.sleep(0.1) # not much work going on so this doesn't need to be much
- self.assertEquals(control_socket, state_observer.control_socket)
- self.assertEquals(stem.socket.State.CLOSED, state_observer.state)
- self.assertTrue(state_observer.timestamp < time.time())
- self.assertTrue(state_observer.timestamp > time.time() - 1.0)
- state_observer.reset()