[or-cvs] r20512: {torctl} Add the ability to filter out streams from other sources. (torctl/trunk/python/TorCtl)

mikeperry at seul.org mikeperry at seul.org
Wed Sep 9 08:15:08 UTC 2009


Author: mikeperry
Date: 2009-09-09 04:15:07 -0400 (Wed, 09 Sep 2009)
New Revision: 20512

Modified:
   torctl/trunk/python/TorCtl/PathSupport.py
   torctl/trunk/python/TorCtl/ScanSupport.py
   torctl/trunk/python/TorCtl/StatsSupport.py
Log:

Add the ability to filter out streams from other
sources.



Modified: torctl/trunk/python/TorCtl/PathSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/PathSupport.py	2009-09-09 08:00:16 UTC (rev 20511)
+++ torctl/trunk/python/TorCtl/PathSupport.py	2009-09-09 08:15:07 UTC (rev 20512)
@@ -57,6 +57,8 @@
 import time
 import TorUtil
 import traceback
+import sets
+import threading
 from TorUtil import *
 
 __all__ = ["NodeRestrictionList", "PathRestrictionList",
@@ -73,7 +75,8 @@
 "CountryCodeRestriction", "CountryRestriction",
 "UniqueCountryRestriction", "SingleCountryRestriction",
 "ContinentRestriction", "ContinentJumperRestriction",
-"UniqueContinentRestriction", "MetaPathRestriction", "RateLimitedRestriction"]
+"UniqueContinentRestriction", "MetaPathRestriction", "RateLimitedRestriction",
+"SmartSocket"]
 
 #################### Path Support Interfaces #####################
 
@@ -1278,6 +1281,98 @@
     "Returns the age of the stream"
     return now-self.attached_at
 
+_origsocket = socket.socket
+class _SocketWrapper(socket.socket):
+  """ Ghetto wrapper to workaround python same_slots_added() and 
+      socket __base__ braindamage """
+  pass
+
+class SmartSocket(_SocketWrapper):
+  """ A SmartSocket is a socket that tracks global socket creation
+      for local ports. It has a member StreamSelector that can
+      be used as a PathBuilder stream StreamSelector (see below).
+
+      Most users will want to reset the base class of SocksiPy to
+      use this class:
+      __oldsocket = socket.socket
+      socket.socket = PathSupport.SmartSocket
+      import SocksiPy
+      socket.socket = __oldsocket
+   """
+  port_table = sets.Set()
+  _table_lock = threading.Lock()
+
+  def connect(self, args):
+    ret = super(SmartSocket, self).connect(args)
+    myaddr = self.getsockname()
+    self.__local_addr = myaddr[0]+":"+str(myaddr[1])
+    SmartSocket._table_lock.acquire()
+    assert(self.__local_addr not in SmartSocket.port_table)
+    SmartSocket.port_table.add(myaddr[0]+":"+str(myaddr[1]))
+    SmartSocket._table_lock.release()
+    plog("DEBUG", "Added "+self.__local_addr+" to our local port list")
+    return ret
+
+  def connect_ex(self, args):
+    ret = super(SmartSocket, self).connect_ex(args)
+    myaddr = ret.getsockname()
+    self.__local_addr = myaddr[0]+":"+str(myaddr[1])
+    SmartSocket._table_lock.acquire()
+    assert(self.__local_addr not in SmartSocket.port_table)
+    SmartSocket.port_table.add(myaddr[0]+":"+str(myaddr[1]))
+    SmartSocket._table_lock.release()
+    plog("DEBUG", "Added "+self.__local_addr+" to our local port list")
+    return ret
+
+  def __del__(self):
+    SmartSocket._table_lock.acquire()
+    SmartSocket.port_table.remove(self.__local_addr)
+    SmartSocket._table_lock.release()
+    plog("DEBUG", "Removed "+self.__local_addr+" from our local port list")
+
+  def table_size():
+    SmartSocket._table_lock.acquire()
+    ret = len(SmartSocket.port_table)
+    SmartSocket._table_lock.release()
+    return ret
+  table_size = Callable(table_size)
+
+  def clear_port_table():
+    """ WARNING: Calling this periodically is a *really good idea*.
+        Relying on __del__ can expose you to race conditions on garbage
+        collection between your processes. """
+    SmartSocket._table_lock.acquire()
+    for i in list(SmartSocket.port_table):
+      plog("DEBUG", "Cleared "+i+" from our local port list")
+      SmartSocket.port_table.remove(i)
+    SmartSocket._table_lock.release()
+  clear_port_table = Callable(clear_port_table)
+
+  def StreamSelector(host, port):
+    to_test = host+":"+str(port)
+    SmartSocket._table_lock.acquire()
+    ret = (to_test in SmartSocket.port_table)
+    SmartSocket._table_lock.release()
+    return ret
+  StreamSelector = Callable(StreamSelector)
+
+
+def StreamSelector(host, port):
+  """ A StreamSelector is a function that takes a host and a port as
+      arguments (parsed from Tor's SOURCE_ADDR field in STREAM NEW
+      events) and decides if it is a stream from this process or not.
+
+      This StreamSelector is just a placeholder that always returns True.
+      When you define your own, be aware that you MUST DO YOUR OWN
+      LOCKING inside this function, as it is called from the Eventhandler
+      thread.
+
+      See PathSupport.SmartSocket.StreamSelctor for an actual
+      implementation.
+
+  """
+  return True
+
 # TODO: Make passive "PathWatcher" so people can get aggregate 
 # node reliability stats for normal usage without us attaching streams
 # Can use __metaclass__ and type
@@ -1291,7 +1386,8 @@
   schedule_* functions to schedule work to be done in the thread
   of the EventHandler.
   """
-  def __init__(self, c, selmgr, RouterClass=TorCtl.Router):
+  def __init__(self, c, selmgr, RouterClass=TorCtl.Router,
+               strm_selector=StreamSelector):
     """Constructor. 'c' is a Connection, 'selmgr' is a SelectionManager,
     and 'RouterClass' is a class that inherits from Router and is used
     to create annotated Routers."""
@@ -1308,6 +1404,7 @@
     self.low_prio_jobs = Queue.Queue()
     self.run_all_jobs = False
     self.do_reconfigure = False
+    self.strm_selector = strm_selector
     plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(self.ns_map))+" routers")
 
   def schedule_immediate(self, job):
@@ -1542,11 +1639,12 @@
     if s.reason: output.append("REASON=" + s.reason)
     if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
     if s.purpose: output.append("PURPOSE=" + s.purpose)
+    if s.source_addr: output.append("SOURCE_ADDR="+s.source_addr)
     plog("DEBUG", " ".join(output))
     if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
       s.target_host = "255.255.255.255" # ignore DNS for exit policy check
 
-    # Hack to ignore Tor-handled streams (Currently only directory streams)
+    # Hack to ignore Tor-handled streams
     if s.strm_id in self.streams and self.streams[s.strm_id].ignored:
       if s.status == "CLOSED":
         plog("DEBUG", "Deleting ignored stream: " + str(s.strm_id))
@@ -1563,11 +1661,19 @@
       if s.circ_id == 0:
         self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, s.status)
       # Remember Tor-handled streams (Currently only directory streams)
+
       if s.purpose and s.purpose.find("DIR_") == 0:
         self.streams[s.strm_id].ignored = True
         plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
         return
-      elif s.circ_id == 0:
+      elif s.source_addr:
+        src_addr = s.source_addr.split(":")
+        src_addr[1] = int(src_addr[1])
+        if not self.strm_selector(*src_addr):
+          self.streams[s.strm_id].ignored = True
+          plog("INFO", "Ignoring foreign stream: " + str(s.strm_id))
+          return
+      if s.circ_id == 0:
         self.attach_stream_any(self.streams[s.strm_id],
                    self.streams[s.strm_id].detached_from)
     elif s.status == "DETACHED":
@@ -1583,9 +1689,12 @@
           plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit with reason: "+str(s.reason))
       else:
         self.streams[s.strm_id].detached_from.append(s.circ_id)
-      
-      if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
-        self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+
+      if self.streams[s.strm_id].pending_circ and \
+           self.streams[s.strm_id] in \
+                  self.streams[s.strm_id].pending_circ.pending_streams:
+        self.streams[s.strm_id].pending_circ.pending_streams.remove(
+                                                self.streams[s.strm_id])
       self.streams[s.strm_id].pending_circ = None
       self.attach_stream_any(self.streams[s.strm_id],
                    self.streams[s.strm_id].detached_from)

Modified: torctl/trunk/python/TorCtl/ScanSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/ScanSupport.py	2009-09-09 08:00:16 UTC (rev 20511)
+++ torctl/trunk/python/TorCtl/ScanSupport.py	2009-09-09 08:15:07 UTC (rev 20512)
@@ -245,4 +245,3 @@
     plog("INFO", "Consensus OK")
 
 
-

Modified: torctl/trunk/python/TorCtl/StatsSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/StatsSupport.py	2009-09-09 08:00:16 UTC (rev 20511)
+++ torctl/trunk/python/TorCtl/StatsSupport.py	2009-09-09 08:15:07 UTC (rev 20512)
@@ -192,14 +192,14 @@
   def __init__(self, router): # Promotion constructor :)
     """'Promotion Constructor' that converts a Router directly into a 
     StatsRouter without a copy."""
-    # TODO: Use __metaclass__ and type to do this instead?
+    # TODO: Use __bases__ to do this instead?
     self.__dict__ = router.__dict__
     self.reset()
     # StatsRouters should not be destroyed when Tor forgets about them
     # Give them an extra refcount:
     self.refcount += 1
     plog("DEBUG", "Stats refcount "+str(self.refcount)+" for "+self.idhex)
-  
+
   def reset(self):
     "Reset all stats on this Router"
     self.circ_uncounted = 0



More information about the tor-commits mailing list