[tor-commits] [stem/master] Adding event handling support to the Controller

atagar at torproject.org atagar at torproject.org
Mon Dec 3 02:35:43 UTC 2012


commit ba280f94da9962e1a2b1bfaf22d5fbd17ffa3103
Author: Damian Johnson <atagar at torproject.org>
Date:   Sun Nov 4 15:53:44 2012 -0800

    Adding event handling support to the Controller
    
    My initial plan was to add event listener support similar to TorCtl but with
    automated handling of the SETEVENT calls. While working on this meejah
    suggested accepting functors instead (like txtorcon). On reflection this is
    both much nicer for our callers and easier for us to support.
    
    Adding functions for adding and removing listeners, with a simple integ test.
    Next up will be the Event classes...
---
 stem/control.py                  |   97 ++++++++++++++++++++++++++++++++++++++
 test/integ/control/controller.py |   45 +++++++++++++++++
 2 files changed, 142 insertions(+), 0 deletions(-)

diff --git a/stem/control.py b/stem/control.py
index 1476598..ca3730b 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -13,6 +13,8 @@ providing its own for interacting at a higher level.
     | |- from_port - Provides a Controller based on a port connection.
     | +- from_socket_file - Provides a Controller based on a socket file connection.
     |
+    |- add_event_listener - attaches an event listener to be notified of tor events
+    |- remove_event_listener - removes a listener so it isn't notified of further events
     |- is_caching_enabled - true if the controller has enabled caching
     |- is_geoip_unavailable - true if we've discovered our geoip db to be unavailable
     |- clear_cache - clears any cached results
@@ -73,6 +75,34 @@ import stem.util.log as log
 
 State = stem.util.enum.Enum("INIT", "RESET", "CLOSED")
 
+EventType = stem.util.enum.UppercaseEnum(
+  "CIRC",
+  "STREAM",
+  "ORCONN",
+  "BW",
+  "DEBUG",
+  "INFO",
+  "NOTICE",
+  "WARN",
+  "ERR",
+  "NEWDESC",
+  "ADDRMAP",
+  "AUTHDIR_NEWDESCS",
+  "DESCCHANGED",
+  "STATUS_GENERAL",
+  "STATUS_CLIENT",
+  "STATUS_SERVER",
+  "GUARD",
+  "NS",
+  "STREAM_BW",
+  "CLIENTS_SEEN",
+  "NEWCONSENSUS",
+  "BUILDTIMEOUT_SET",
+  "SIGNAL",
+  "CONF_CHANGED",
+  "CIRC_MINOR",
+)
+
 # Constant to indicate an undefined argument default. Usually we'd use None for
 # this, but users will commonly provide None as the argument so need something
 # else fairly unique...
@@ -508,6 +538,11 @@ class Controller(BaseController):
     self._is_caching_enabled = enable_caching
     self._request_cache = {}
     
+    # mapping of event types to their listeners
+    
+    self._event_listeners = {}
+    self._event_listeners_lock = threading.RLock()
+    
     # number of sequential 'GETINFO ip-to-country/*' lookups that have failed
     self._geoip_failure_count = 0
     self.enabled_features = []
@@ -524,6 +559,57 @@ class Controller(BaseController):
     
     super(Controller, self).close()
   
+  def add_event_listener(self, listener, *events):
+    """
+    Directs further tor controller events to a given function. The function is
+    expected to take a single argument, which is a
+    :class:`~stem.response.events.Event` subclass.
+    
+    If a new control connection is initialized then this listener will be
+    reattached.
+    
+    :param functor listener: function to be called when an event is received
+    :param stem.control.EventType events: event types to be listened for
+    
+    :raises: :class:`stem.socket.ControllerError` if unable to set the events
+    """
+    
+    with self._event_listeners_lock:
+      for event_type in events:
+        self._event_listeners.setdefault(event_type, []).append(listener)
+      
+      if self.is_alive():
+        response = self.msg("SETEVENTS %s" % " ".join(self._event_listeners.keys()))
+        
+        if not response.is_ok():
+          raise stem.socket.ProtocolError("SETEVENTS received unexpected response\n%s" % response)
+  
+  def remove_event_listener(self, listener):
+    """
+    Stops a listener from being notified of further tor events.
+    
+    :param stem.control.EventListener listener: listener to be removed
+    
+    :raises: :class:`stem.socket.ControllerError` if unable to set the events
+    """
+    
+    with self._event_listeners_lock:
+      event_types_changed = False
+      
+      for event_type, event_listeners in self._event_listeners.items():
+        if listener in event_listeners:
+          event_listeners.remove(listener)
+          
+          if len(event_listeners) == 0:
+            event_types_changed = True
+            del self._event_listeners[event_type]
+      
+      if event_types_changed:
+        response = self.msg("SETEVENTS %s" % " ".join(self._event_listeners.keys()))
+        
+        if not response.is_ok():
+          raise stem.socket.ProtocolError("SETEVENTS received unexpected response\n%s" % response)
+  
   def is_caching_enabled(self):
     """
     **True** if caching has been enabled, **False** otherwise.
@@ -1320,6 +1406,17 @@ class Controller(BaseController):
     stem.response.convert("MAPADDRESS", response)
     
     return response.entries
+  
+  def _handle_event(self, event_message):
+    # TODO: parse the event_message into a stem.response.events.Event class
+    
+    event_message_type = str(event_message).split()[0]
+    
+    with self._event_listeners_lock:
+      for event_type, event_listeners in self._event_listeners.items():
+        if event_type == event_message_type:
+          for listener in event_listeners:
+            listener(event_message)
 
 def _case_insensitive_lookup(entries, key, default = UNDEFINED):
   """
diff --git a/test/integ/control/controller.py b/test/integ/control/controller.py
index 1ba06bb..e5d7782 100644
--- a/test/integ/control/controller.py
+++ b/test/integ/control/controller.py
@@ -6,6 +6,7 @@ from __future__ import with_statement
 
 import os
 import re
+import time
 import shutil
 import socket
 import unittest
@@ -20,6 +21,8 @@ import test.util
 import stem.descriptor.router_status_entry
 import stem.descriptor.reader
 
+from stem.control import EventType
+
 class TestController(unittest.TestCase):
   def test_from_port(self):
     """
@@ -47,6 +50,48 @@ class TestController(unittest.TestCase):
     else:
       self.assertRaises(stem.SocketError, stem.control.Controller.from_socket_file, test.runner.CONTROL_SOCKET_PATH)
   
+  def test_event_handling(self):
+    """
+    Add a couple listeners for various events and make sure that they receive
+    them. Then remove the listeners.
+    """
+    
+    if test.runner.require_control(self): return
+    
+    event_buffer1, event_buffer2 = [], []
+    
+    def listener1(event):
+      event_buffer1.append(event)
+    
+    def listener2(event):
+      event_buffer2.append(event)
+    
+    runner = test.runner.get_runner()
+    with runner.get_tor_controller() as controller:
+      controller.add_event_listener(listener1, EventType.BW)
+      controller.add_event_listener(listener2, EventType.BW, EventType.DEBUG)
+      
+      # BW events occure at the rate of one per second, so wait a bit to let
+      # some accumulate.
+      
+      # TODO: check that the type of events in event_buffer1 are BandwidthEvent
+      # instances when we have proper event types
+      
+      time.sleep(3)
+      
+      self.assertTrue(len(event_buffer1) >= 2)
+      self.assertTrue(len(event_buffer2) >= 2)
+      
+      # Checking that a listener's no longer called after being removed.
+      
+      controller.remove_event_listener(listener2)
+      
+      buffer2_size = len(event_buffer2)
+      time.sleep(2)
+      
+      self.assertTrue(len(event_buffer1) >= 4)
+      self.assertEqual(buffer2_size, len(event_buffer2))
+  
   def test_getinfo(self):
     """
     Exercises GETINFO with valid and invalid queries.





More information about the tor-commits mailing list