commit ba280f94da9962e1a2b1bfaf22d5fbd17ffa3103 Author: Damian Johnson atagar@torproject.org Date: Sun Nov 4 15:53:44 2012 -0800
Adding event handling support to the Controller
My initial plan was to add event listener support similar to TorCtl but with automated handling of the SETEVENT calls. While working on this meejah suggested accepting functors instead (like txtorcon). On reflection this is both much nicer for our callers and easier for us to support.
Adding functions for adding and removing listeners, with a simple integ test. Next up will be the Event classes... --- stem/control.py | 97 ++++++++++++++++++++++++++++++++++++++ test/integ/control/controller.py | 45 +++++++++++++++++ 2 files changed, 142 insertions(+), 0 deletions(-)
diff --git a/stem/control.py b/stem/control.py index 1476598..ca3730b 100644 --- a/stem/control.py +++ b/stem/control.py @@ -13,6 +13,8 @@ providing its own for interacting at a higher level. | |- from_port - Provides a Controller based on a port connection. | +- from_socket_file - Provides a Controller based on a socket file connection. | + |- add_event_listener - attaches an event listener to be notified of tor events + |- remove_event_listener - removes a listener so it isn't notified of further events |- is_caching_enabled - true if the controller has enabled caching |- is_geoip_unavailable - true if we've discovered our geoip db to be unavailable |- clear_cache - clears any cached results @@ -73,6 +75,34 @@ import stem.util.log as log
State = stem.util.enum.Enum("INIT", "RESET", "CLOSED")
+EventType = stem.util.enum.UppercaseEnum( + "CIRC", + "STREAM", + "ORCONN", + "BW", + "DEBUG", + "INFO", + "NOTICE", + "WARN", + "ERR", + "NEWDESC", + "ADDRMAP", + "AUTHDIR_NEWDESCS", + "DESCCHANGED", + "STATUS_GENERAL", + "STATUS_CLIENT", + "STATUS_SERVER", + "GUARD", + "NS", + "STREAM_BW", + "CLIENTS_SEEN", + "NEWCONSENSUS", + "BUILDTIMEOUT_SET", + "SIGNAL", + "CONF_CHANGED", + "CIRC_MINOR", +) + # Constant to indicate an undefined argument default. Usually we'd use None for # this, but users will commonly provide None as the argument so need something # else fairly unique... @@ -508,6 +538,11 @@ class Controller(BaseController): self._is_caching_enabled = enable_caching self._request_cache = {}
+ # mapping of event types to their listeners + + self._event_listeners = {} + self._event_listeners_lock = threading.RLock() + # number of sequential 'GETINFO ip-to-country/*' lookups that have failed self._geoip_failure_count = 0 self.enabled_features = [] @@ -524,6 +559,57 @@ class Controller(BaseController):
super(Controller, self).close()
+ def add_event_listener(self, listener, *events): + """ + Directs further tor controller events to a given function. The function is + expected to take a single argument, which is a + :class:`~stem.response.events.Event` subclass. + + If a new control connection is initialized then this listener will be + reattached. + + :param functor listener: function to be called when an event is received + :param stem.control.EventType events: event types to be listened for + + :raises: :class:`stem.socket.ControllerError` if unable to set the events + """ + + with self._event_listeners_lock: + for event_type in events: + self._event_listeners.setdefault(event_type, []).append(listener) + + if self.is_alive(): + response = self.msg("SETEVENTS %s" % " ".join(self._event_listeners.keys())) + + if not response.is_ok(): + raise stem.socket.ProtocolError("SETEVENTS received unexpected response\n%s" % response) + + def remove_event_listener(self, listener): + """ + Stops a listener from being notified of further tor events. + + :param stem.control.EventListener listener: listener to be removed + + :raises: :class:`stem.socket.ControllerError` if unable to set the events + """ + + with self._event_listeners_lock: + event_types_changed = False + + for event_type, event_listeners in self._event_listeners.items(): + if listener in event_listeners: + event_listeners.remove(listener) + + if len(event_listeners) == 0: + event_types_changed = True + del self._event_listeners[event_type] + + if event_types_changed: + response = self.msg("SETEVENTS %s" % " ".join(self._event_listeners.keys())) + + if not response.is_ok(): + raise stem.socket.ProtocolError("SETEVENTS received unexpected response\n%s" % response) + def is_caching_enabled(self): """ **True** if caching has been enabled, **False** otherwise. @@ -1320,6 +1406,17 @@ class Controller(BaseController): stem.response.convert("MAPADDRESS", response)
return response.entries + + def _handle_event(self, event_message): + # TODO: parse the event_message into a stem.response.events.Event class + + event_message_type = str(event_message).split()[0] + + with self._event_listeners_lock: + for event_type, event_listeners in self._event_listeners.items(): + if event_type == event_message_type: + for listener in event_listeners: + listener(event_message)
def _case_insensitive_lookup(entries, key, default = UNDEFINED): """ diff --git a/test/integ/control/controller.py b/test/integ/control/controller.py index 1ba06bb..e5d7782 100644 --- a/test/integ/control/controller.py +++ b/test/integ/control/controller.py @@ -6,6 +6,7 @@ from __future__ import with_statement
import os import re +import time import shutil import socket import unittest @@ -20,6 +21,8 @@ import test.util import stem.descriptor.router_status_entry import stem.descriptor.reader
+from stem.control import EventType + class TestController(unittest.TestCase): def test_from_port(self): """ @@ -47,6 +50,48 @@ class TestController(unittest.TestCase): else: self.assertRaises(stem.SocketError, stem.control.Controller.from_socket_file, test.runner.CONTROL_SOCKET_PATH)
+ def test_event_handling(self): + """ + Add a couple listeners for various events and make sure that they receive + them. Then remove the listeners. + """ + + if test.runner.require_control(self): return + + event_buffer1, event_buffer2 = [], [] + + def listener1(event): + event_buffer1.append(event) + + def listener2(event): + event_buffer2.append(event) + + runner = test.runner.get_runner() + with runner.get_tor_controller() as controller: + controller.add_event_listener(listener1, EventType.BW) + controller.add_event_listener(listener2, EventType.BW, EventType.DEBUG) + + # BW events occure at the rate of one per second, so wait a bit to let + # some accumulate. + + # TODO: check that the type of events in event_buffer1 are BandwidthEvent + # instances when we have proper event types + + time.sleep(3) + + self.assertTrue(len(event_buffer1) >= 2) + self.assertTrue(len(event_buffer2) >= 2) + + # Checking that a listener's no longer called after being removed. + + controller.remove_event_listener(listener2) + + buffer2_size = len(event_buffer2) + time.sleep(2) + + self.assertTrue(len(event_buffer1) >= 4) + self.assertEqual(buffer2_size, len(event_buffer2)) + def test_getinfo(self): """ Exercises GETINFO with valid and invalid queries.