[or-cvs] r10417: Added missing countries to GeoIPSupport.py and removed excep (in torflow/trunk: . TorCtl)

renner at seul.org renner at seul.org
Thu May 31 13:53:02 UTC 2007


Author: renner
Date: 2007-05-31 09:53:02 -0400 (Thu, 31 May 2007)
New Revision: 10417

Modified:
   torflow/trunk/TorCtl/GeoIPSupport.py
   torflow/trunk/TorCtl/PathSupport.py
   torflow/trunk/TorCtl/TorUtil.py
   torflow/trunk/op-addon.py
Log:

  Added missing countries to GeoIPSupport.py and removed exception for US. Moved sort_list(list, key) to TorUtil because 
  it's often needed. Introduced functionality to measure & compute RTTs of single links between routers and a NetworkModel 
  to store these. Began refactoring the event handler code following the hierarchical approach.



Modified: torflow/trunk/TorCtl/GeoIPSupport.py
===================================================================
--- torflow/trunk/TorCtl/GeoIPSupport.py	2007-05-31 07:20:44 UTC (rev 10416)
+++ torflow/trunk/TorCtl/GeoIPSupport.py	2007-05-31 13:53:02 UTC (rev 10417)
@@ -4,6 +4,7 @@
 import socket
 import GeoIP
 import TorCtl
+
 from TorUtil import plog
 
 # GeoIP data object: choose database here
@@ -19,10 +20,13 @@
   def contains(self, country_code):
     return country_code in self.countries
 
-# The continents
+# Setup the continents
 africa = Continent("AF")
-# TODO: Add more countries
-africa.countries = ["CI"]
+africa.countries = ["AO","BF","BI","BJ","BV","BW","CD","CF","CG","CI","CM","CV","DJ","DZ",
+                    "EG","EH","ER","ET","GA","GH","GM","GN","GQ","GW","HM","KE","KM","LR",
+		    "LS","LY","MA","MG","ML","MR","MU","MW","MZ","NA","NE","NG","RE","RW",
+		    "SC","SD","SH","SL","SN","SO","ST","SZ","TD","TF","TG","TN","TZ","UG",
+		    "YT","ZA","ZM","ZR","ZW"]
 
 asia = Continent("AS")
 asia.countries = ["AP","AE","AF","AM","AZ","BD","BH","BN","BT","CC","CN","CX","CY","GE",

Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py	2007-05-31 07:20:44 UTC (rev 10416)
+++ torflow/trunk/TorCtl/PathSupport.py	2007-05-31 13:53:02 UTC (rev 10417)
@@ -326,8 +326,6 @@
   def r_is_ok(self, path, router):
     for r in path:
       if router.country_code == r.country_code:
-        # Exceptionally allow US because of so many states
-        if router.country_code == "US": return True	  
         return False
     return True
 

Modified: torflow/trunk/TorCtl/TorUtil.py
===================================================================
--- torflow/trunk/TorCtl/TorUtil.py	2007-05-31 07:20:44 UTC (rev 10416)
+++ torflow/trunk/TorCtl/TorUtil.py	2007-05-31 13:53:02 UTC (rev 10417)
@@ -15,9 +15,9 @@
 import sha
 import math
 
-__all__ = ["Enum", "Enum2", "quote", "escape_dots", "unescape_dots",
-      "BufSock", "secret_to_key", "urandom_rng", "s2k_gen", "s2k_check",
-      "plog", "ListenSocket", "zprob"]
+__all__ = ["Enum", "Enum2", "sort_list", "quote", "escape_dots", "unescape_dots",
+      "BufSock", "secret_to_key", "urandom_rng", "s2k_gen", "s2k_check", "plog", 
+      "ListenSocket", "zprob"]
 
 class Enum:
   # Helper: define an ordered dense name-to-number 1-1 mapping.
@@ -37,6 +37,11 @@
     for k,v in args.items():
       self.nameOf[v] = k
 
+def sort_list(list, key):
+  """ Sort a list by a specified key """
+  list.sort(lambda x,y: cmp(key(x), key(y))) # Python < 2.4 hack
+  return list
+
 def quote(s):
   return re.sub(r'([\r\n\\\"])', r'\\\1', s)
 

Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py	2007-05-31 07:20:44 UTC (rev 10416)
+++ torflow/trunk/op-addon.py	2007-05-31 13:53:02 UTC (rev 10417)
@@ -13,22 +13,26 @@
 # TODO: import 'with'-statement for Lock objects (Python 2.5: "with some_lock: do something")
 import re
 import sys
+import copy
 import math
 import time
-import sched
 import socket
 import threading
 import Queue
+
 # Non-standard packages
 import socks
-#import networkx
+import networkx
+
+# TorCtl
 import TorCtl.PathSupport
 import TorCtl.GeoIPSupport
 
+# From .. import ..
 from TorCtl import *
-from TorCtl.TorUtil import plog
+from TorCtl.TorUtil import plog, sort_list
 
-# Move these to config file
+# TODO: Move these to config-file
 control_host = "127.0.0.1"
 control_port = 9051
 socks_host = "127.0.0.1"
@@ -38,22 +42,34 @@
 ping_dummy_host = "127.0.0.1"
 ping_dummy_port = 100
 
-# Close circ after n timeouts or slownesses
-timeout_limit = 2
+# Close circ after n timeouts or avg measured slownesses
+timeout_limit = 1
+slowness_limit = 3
 # Slow RTT := x seconds 
-slow = 1
-# Set interval between work loads in sec
-sleep_interval = 30
+slow = 1.5
+# Note: Tor-internal lifetime of a circuit is 10 min --> 600/sleep_interval = max-age
+# Set interval between working loads in sec
+sleep_interval = 10
 # No of idle circuits to build preemptively
+# TODO: Also configure ports to use
 idle_circuits = 6
 
-# Lock object for regulating access to the circuit list
+# Set to True if we want to measure partial circuits
+measure_partial_circs = False
+
+# Still needed: Lock object for regulating access to the circuit list
 circs_lock = threading.Lock()
 
-# Infos about this proxy TODO: Save in some class
+# Infos about this proxy TODO: Save in some class: Router?
 my_ip = None
 my_country = None
 
+# Do configuration here TODO: use my_country for src
+path_config = GeoIPSupport.GeoIPConfig(unique_countries = False,
+                                       src_country = None,
+				       crossings = 1,
+				       excludes = ["FR"])
+
 # Configure Selection Manager here!!
 # Do NOT modify this object directly after it is handed to 
 # PathBuilder, Use PathBuilder.schedule_selmgr instead.
@@ -66,11 +82,12 @@
       use_all_exits=False,
       uniform=True,
       use_exit=None,
-      use_guards=False,
-      use_geoip=True)
+      use_guards=True,
+      geoip_config=path_config)
 
 ######################################### BEGIN: Connection         #####################
 
+# Circuit building code here
 class Connection(TorCtl.Connection):
   def build_circuit(self, pathlen, path_sel):
     circ = Circuit()
@@ -87,158 +104,226 @@
       circ.circ_id = self.extend_circuit(0, circ.id_path())
     return circ
 
+  def build_circuit_80(self, selmgr):
+    """ Set target port to 80 """
+    selmgr.set_target("255.255.255.255", 80)
+    return self.build_circuit(selmgr.pathlen, selmgr.path_selector)
+
+  def build_circuit_from_path(self, path):
+    """ Build circuit using a given path """
+    circ = Circuit()
+    if len(path) > 0:
+      circ.circ_id = self.extend_circuit(0, path)
+
 ######################################### END: Connection          #####################
-######################################### Router, Circuit, Stream  #####################
+######################################### Circuit, Stream          #####################
 
 # Circuit class extended to RTTs
 class Circuit(PathSupport.Circuit):  
   def __init__(self):
     PathSupport.Circuit.__init__(self)
-    self.total_rtt = None      # double (sec), substitute with..
-    self.rtts = {}             # dict of partial rtts, for pathlen 3: 1-2-None
-    self.timeout_counter = 0   # timeout limit
-    self.slowness_counter = 0  # slowness limit
-    self.closed = False        # Mark circuits closed
+    # RTT stuff
+    self.current_rtt = None	# double (sec): current value
+    self.part_rtts = {}		# dict of partial rtts, pathlen 3: 1-2-None
+    self.rtt_history = []	# rtt history for computing stats:
+    self.avg_rtt = 0		# avg rtt value		
+    self.dev_rtt = 0		# standard deviation
+    # Counters and flags
+    self.age = 0		# age in rounds
+    self.timeout_counter = 0	# timeout limit
+    self.slowness_counter = 0 	# slowness limit
+    self.closed = False		# mark circuit closed
 
-# Stream class extended to isPing and hop
+  def get_avg_rtt(self):
+    """ Compute average from history """
+    if len(self.rtt_history) > 0:
+      sum = reduce(lambda x, y: x+y, self.rtt_history, 0.0)
+      return sum/len(self.rtt_history)
+    else:
+      return 0.0
+
+  def get_dev_rtt(self):
+    """ Return the stddev of measured rtts """
+    if len(self.rtt_history) > 0:
+      avg = self.get_avg_rtt()
+      sum = reduce(lambda x, y: x + ((y-avg)**2.0), self.rtt_history, 0.0)
+      return math.sqrt(sum/len(self.rtt_history))
+    else:
+      return 0.0
+
+  def refresh_stats(self):
+    """ Refresh the stats """
+    self.avg_rtt = self.get_avg_rtt()
+    self.dev_rtt = self.get_dev_rtt()
+
 class Stream(PathSupport.Stream):
+  """ Stream class extended to isPing and hop """
   def __init__(self, sid, host, port, kind):
     PathSupport.Stream.__init__(self, sid, host, port, kind)
-    self.isPing = False
-    self.hop = None		# Save hop if this is a ping, None = complete circ
+    self.isPing = False		# set this to mark as a ping-stream
+    self.hop = None		# save hop if this is a ping, hop=None means complete circ
 
-######################################### Router, Circuit, Stream  #####################
-######################################### BEGIN: Pinger            #####################
+######################################### Circuit, Stream          #####################
+######################################### BEGIN: NetworkModel      #####################
 
-# A simple "Pinger": try to connect 
-# to somewhere via Tor using Socks4a
-class Pinger:
-  # Constructor
-  def __init__(self, host, port):
-    self.connect_host = host
-    self.connect_port = port
+class LinkInfo:
+  """ This class contains infos about a link: source, destination, RTT
+      plus: rtt_history, methods to compute stats, etc. """
+  def __init__(self, src, dest, rtt=0):
+    # Set src and dest
+    self.src = src
+    self.dest = dest
+    # Setup the history and record RTT
+    self.rtt_history = []
+    self.set_rtt(rtt)
 
-  # Hmm, there is no "try .. except .. finally .." in Python < 2.5 !!  
-  def ping(self):
-    s = None
-    try:
-      try:
-        s = socks.socksocket()
-        s.setproxy(socks.PROXY_TYPE_SOCKS4, socks_host, socks_port)
-        s.connect((self.connect_host, self.connect_port))
-      except socks.Socks4Error, e:
-	# Don't do nothing, this will actually happen
-	# print("Got Exception: " + str(e))
-	pass
-    finally:
-      # Close the socket if open
-      if s:
-        s.close()
+  def set_rtt(self, rtt):
+    self.rtt = rtt
+    self.rtt_history.append(rtt)
 
-######################################### END: Pinger              #####################
-######################################### BEGIN: NetworkModel      #####################
+class PathProposal:
+  """ Instances of this class are path-proposals """
+  def __init__(self, links):
+    # This is a list of LinkInfo objects
+    self.links = links
+    # Compute the expected RTT
+    self.rtt = reduce(lambda x,y: x + y.rtt, self.links, 0.0)
+  
+  def to_string(self):
+    """ Create a string for printing out information """
+    s = ""
+    for l in self.links:
+      # Get the single objects
+      s += l.src.nickname + "--" + l.dest.nickname + " (" + str(l.rtt) + ") " + ", "
+    return "Route proposal: " + s + "--> " + str(self.rtt) + " sec" 
 
-# This will be used to record measured RTTs
-# of single links and to find fast routes
 class NetworkModel:  
-  def __init__(self):
-    # TODO: Use XDiGraph()
-    self.graph = networkx.XGraph(selfloops=False, multiedges=False)
-    # Add this OP to the model
-    self.addRouter("ROOT")
-    plog("DEBUG", "NetworkModel initiated")
+  """ This class is used to record measured RTTs for single links in a model of the 
+      'currently explored subnet' (currently this is an undirected graph!) """  
+  def __init__(self, rooter):
+    """ Constructor: pass the root of all our circuits """
+    # Use XDiGraph() (= directed)?
+    self.graph = networkx.XGraph(name="Explored Tor Subnet", selfloops=False, multiedges=False)
+    # Initially add THIS proxy to the model
+    self.root = rooter
+    self.graph.add_node(self.root)
+    plog("DEBUG", "NetworkModel initiated: added " + self.root.nickname)
 
-  def addRouter(self, router):
-    self.graph.add_node(router)
+  def add_link(self, src, dest, rtt):
+    """ Add a link to the graph given src, dest & rtt """
+    self.graph.add_edge(src, dest, LinkInfo(src, dest, rtt))
 
-  def addLink(self, source, dest):
-    self.graph.add_edge(source, dest)
-    
-######################################### END: NetworkModel        #####################
-######################################### BEGIN: EventHandler      #####################
+  def get_link_info(self, path):
+    """ From a path given as list of routers, return link-infos """
+    links = []
+    for i in xrange(0, len(path)-1):
+      # TODO: Check if edge exists
+      links.append(self.graph.get_edge(path[i], path[i+1]))
+    return links
 
-# DRAFT for a new CircuitManager
-class NewCircuitManager:
-  def __init__(self, c):
-    self.conn = c		# connection to Tor
-    self.circuits = {}		# dict mapping id:circuit
-    self.circs_sorted = []	# list of circs sorted for rtt
+  def find_circuits(self):
+    # Reset list of proposals and prefixes for DFS
+    self.proposals = []
+    self.prefixes = {}
+    # Start the search
+    self.visit(self.root, [])
+    # Sort proposals for their RTTs
+    sort_list(self.proposals, lambda x: x.rtt)
+    # Print all of them for debugging/info
+    for p in self.proposals:
+      print(p.to_string())
 
-  # Sort a list by a specified key
-  def sort_list(self, list, key):
-    list.sort(lambda x,y: cmp(key(x), key(y))) # Python < 2.4 hack
-    return list
+  def visit(self, node, path, i=1):
+    """ Recursive Depth-First-Search: Maybe use some existing method? """
+    if node not in path:
+      path.append(node)
+      # Root -- Exit
+      if len(path) == 4:
+        self.proposals.append(PathProposal(self.get_link_info(path)))
+      else:
+        self.prefixes[i] = path
+	# G is also a dict
+        for n in self.graph[node]:
+	  if n not in self.prefixes[i]:
+	    self.visit(n, copy.copy(self.prefixes[i]), i+1)
 
-  def refresh_sorted_list(self):
-    self.circs_sorted = self.sort_list(self.circuits.values(), lambda x: x.total_rtt)
+  def print_graph(self):
+    """ Print current info about the graph """
+    print(self.graph.info())
+    # Print edges
+    #for e in self.graph.edges():
+    #  src, dest, link = e
+    #  plog("INFO", "Edge: " + src.nickname + " -- " + dest.nickname + ", RTT = " + str(link.rtt) + " sec")
 
-  def add_circuit(self, circ):
-    self.circuits[circ.circ_id] = circ
+######################################### END: NetworkModel        #####################
+######################################### BEGIN: EventHandlers     #####################
 
-  def del_circuit(self, circ_id):
-    # TODO: Test
-    del self.circuits[circ_id]
+# CircuitHandler extending PathBuilder
+class CircuitHandler(PathSupport.PathBuilder):
+  def __init__(self, c, selmgr):
+    # Init the PathBuilder
+    PathSupport.PathBuilder.__init__(self, c, selmgr, GeoIPSupport.GeoIPRouter)    
+    # Maintain a sorted list of circs that gets regenerated regularly
+    self.circs_sorted = []    
+    # Bring up the circuits
+    self.check_circuit_pool()
+ 
+  def check_circuit_pool(self):
+    """ Init or check the status of our pool of circuits """
+    # Get number of circuits
+    circs_lock.acquire()
+    n = len(self.circuits.values())
+    circs_lock.release() 
+    # Some Logging
+    plog("INFO", "Checked circuit-pool: building " + str(idle_circuits-n) + " circuits")
+    # Schedule (idle_circuits-n) circuit-buildups
+    while (n < idle_circuits):      
+      self.build_idle_circuit()
+      plog("DEBUG", "Scheduled circuit No. " + str(n+1))
+      n += 1
+    self.print_circuits()
 
-  def new_circuit(self):
+  def build_idle_circuit(self):
+    """ Build an idle circuit """
     circ = None
     while circ == None:
       try:
         # Build the circuit
-        circ = self.conn.build_circuit(self.selmgr.pathlen, self.selmgr.path_selector)
-	self.add_circuit(circ)
+        circ = self.c.build_circuit_80(self.selmgr)
+        # Using lock:
+	circs_lock.acquire()
+	self.circuits[circ.circ_id] = circ
+        circs_lock.release()
       except TorCtl.ErrorReply, e:
         # FIXME: How come some routers are non-existant? Shouldn't
         # we have gotten an NS event to notify us they disappeared?
-        plog("NOTICE", "Error building circ: " + str(e.args))
-  
-  def close_circuit(self, circ_id):
-    # try .. except
-    self.conn.close_circuit(circ_id)
-  
-  def attach_stream(self, stream):
-   pass 
+        plog("NOTICE", "Error building circuit: " + str(e.args))
 
-###########################################
+  def print_circuits(self):
+    """ Print out our circuits plus some info """
+    circs_lock.acquire()
+    #circs = self.circs_sorted
+    circs = self.circuits.values()
+    plog("INFO", "We have " + str(len(circs)) + " circuits")
+    for c in circs:
+      out = "+ Circuit " + str(c.circ_id) + ": "
+      for r in c.path: out += " " + r.nickname + "(" + str(r.country_code) + ")"
+      if not c.built: out += " (not yet built)"
+      else: out += " (age=" + str(c.age) + ")"
+      if c.current_rtt: out += ": " "RTT last/avg/dev: " + str(c.current_rtt) + "/" + str(c.avg_rtt) + "/"+ str(c.dev_rtt) + ""
+      print(out)
+    circs_lock.release()
 
-# We need an EventHandler, this one extends PathBuilder
-class EventHandler(PathSupport.PathBuilder):  
-  def __init__(self, c, selmgr):    
-    # Call constructor of superclass
-    PathSupport.PathBuilder.__init__(self, c, selmgr, GeoIPSupport.GeoIPRouter)
-    # Additional stuff
-    self.ping_circs = Queue.Queue()  # (circ_id, hop)-pairs
-    self.start_times = {}            # dict mapping (circ_id, hop):start_time TODO: cleanup
-    self.circs_sorted = []           # sorted list of circs, generated regularly
-    # Set up the CircuitManager, only pass self.circuits instead of self?
-    self.circ_manager = CircuitManager(selmgr, c, self)
-    self.circ_manager.setDaemon(True)
-    self.circ_manager.start()
- 
-  # Add a circuit to ping, ping_info is (circ_id, hop)
-  def queue_ping_circ(self, ping_info):
-    self.ping_circs.put(ping_info)
-
-  # Send signal "CLEARDNSCACHE"
-  def clear_dns_cache(self):
-    lines = self.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
-    for _, msg, more in lines:
-      plog("DEBUG", "CLEARDNSCACHE: " + msg)
-  
-  # Sort a list by a specified key
-  def sort_list(self, list, key):
-    list.sort(lambda x,y: cmp(key(x), key(y))) # Python < 2.4 hack
-    return list
-
   # Call after each measuring
   def refresh_sorted_list(self):
-    # Sort the list for RTTs
+    # Sort the list for average RTTs
     circs_lock.acquire()
-    self.circs_sorted = self.sort_list(self.circuits.values(), lambda x: x.total_rtt)
+    self.circs_sorted = sort_list(self.circuits.values(), lambda x: x.avg_rtt)
     circs_lock.release()
     plog("DEBUG", "Refreshed sorted list of circuits")
-  
-  # Do something when circuit-events occur
+ 
   def circ_status_event(self, c):
+    """ Handle circuit status events """
     # Construct output for logging
     output = [c.event_name, str(c.circ_id), c.status]
     if c.path: output.append(",".join(c.path))
@@ -252,9 +337,13 @@
       plog("DEBUG", "Ignoring circuit " + str(c.circ_id) + " (controlled by Tor or not yet in the list)")
       circs_lock.release()
       return
+    
+    # EXTENDED
     if c.status == "EXTENDED":
       self.circuits[c.circ_id].last_extended_at = c.arrived_at
       circs_lock.release()
+    
+    # FAILED & CLOSED
     elif c.status == "FAILED" or c.status == "CLOSED":
       # XXX: Can still get a STREAM FAILED for this circ after this
       circ = self.circuits[c.circ_id]
@@ -265,16 +354,19 @@
       for stream in circ.pending_streams:
         if stream.isPing:
 	  #plog("DEBUG", "Finding new circ for ping stream " + str(stream.strm_id))
+	  # Close the stream?
 	  pass
 	if not stream.isPing:
 	  plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
           self.attach_stream_any(stream, stream.detached_from)
       # Refresh the list 
       self.refresh_sorted_list()
+      # Check if there are enough circs
+      self.check_circuit_pool()
       return
-      # TODO: Check if there are enough circs?
+    
+    # BUILT
     elif c.status == "BUILT":
-      # TODO: Perform a measuring directly?
       self.circuits[c.circ_id].built = True
       try:
         for stream in self.circuits[c.circ_id].pending_streams:
@@ -286,15 +378,34 @@
         circs_lock.release()
 	return
       circs_lock.release()
+    
+    # OTHER?
     else:
       # If this was e.g. a LAUNCHED
       circs_lock.release()
 
+######################################### END: CircuitHandler       #####################
+######################################### BEGIN: StreamHandler      #####################
+
+# StreamHandler that extends CircuitHandler
+class StreamHandler(CircuitHandler):  
+  def __init__(self, c, selmgr):    
+    # Call constructor of superclass
+    CircuitHandler.__init__(self, c, selmgr)
+ 
+  # Send signal "CLEARDNSCACHE"
+  def clear_dns_cache(self):
+    lines = self.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
+    for _, msg, more in lines:
+      plog("DEBUG", "CLEARDNSCACHE: " + msg)
+
   # Attach a regular user stream
   def attach_stream_any(self, stream, badcircs):
+    
     # To be able to always choose the fastest:
     # slows down attaching?
     self.clear_dns_cache()
+    
     # Newnym, and warn if not built plus pending
     unattached_streams = [stream]
     if self.new_nym:
@@ -314,7 +425,8 @@
 
     # Choose from the sorted list!  
     for circ in self.circs_sorted:
-      if circ.built and circ.total_rtt and circ.circ_id not in badcircs and not circ.closed:
+      # Only attach if we already measured
+      if circ.built and circ.current_rtt and circ.circ_id not in badcircs and not circ.closed:
         if circ.exit.will_exit_to(stream.host, stream.port):
           try:
             self.c.attach_stream(stream.strm_id, circ.circ_id)
@@ -348,11 +460,83 @@
       circs_lock.release()
     self.last_exit = circ.exit
 
+  # Catch user stream events
+  def handle_other_events(self, s):
+    # SUCCEEDED
+    if s.status == "SUCCEEDED":
+      if s.strm_id not in self.streams:
+        plog("NOTICE", "Succeeded stream " + str(s.strm_id) + " not found")
+        return
+      if s.circ_id and self.streams[s.strm_id].pending_circ.circ_id != s.circ_id:
+        # Hrmm.. this can happen on a new-nym.. Very rare, putting warn
+        # in because I'm still not sure this is correct
+        plog("WARN", "Mismatch of pending: "
+          + str(self.streams[s.strm_id].pending_circ.circ_id) + " vs "
+          + str(s.circ_id))
+        circs_lock.acquire()
+	self.streams[s.strm_id].circ = self.circuits[s.circ_id]
+        circs_lock.release()
+      else:
+        self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
+      self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+      self.streams[s.strm_id].pending_circ = None
+      self.streams[s.strm_id].attached_at = s.arrived_at
+
+    # FAILED or CLOSED
+    elif s.status == "FAILED" or s.status == "CLOSED":
+      if s.strm_id not in self.streams:
+        plog("NOTICE", "Failed stream " + str(s.strm_id) + " not found")
+        return
+      #if not s.circ_id: plog("WARN", "Stream " + str(s.strm_id) + " closed/failed from no circuit")
+      # We get failed and closed for each stream. OK to return 
+      # and let the CLOSED do the cleanup
+      if s.status == "FAILED":
+        # Avoid busted circuits that will not resolve or carry traffic
+        self.streams[s.strm_id].failed = True
+        circs_lock.acquire()
+	if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
+        elif self.streams[s.strm_id].attached_at != 0: 
+	  plog("WARN","Failed stream on unknown circuit " + str(s.circ_id))
+        circs_lock.release()
+	return
+      # CLOSED
+      if self.streams[s.strm_id].pending_circ:
+        self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+      # Actual removal of the stream
+      del self.streams[s.strm_id]
+
+    # REMAP
+    elif s.status == "REMAP":
+      if s.strm_id not in self.streams:
+        plog("WARN", "Remap id "+str(s.strm_id)+" not found")
+      else:
+        if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+          s.target_host = "255.255.255.255"
+          plog("NOTICE", "Non-IP remap for "+str(s.strm_id) + " to " + s.target_host)		   
+        self.streams[s.strm_id].host = s.target_host
+        self.streams[s.strm_id].port = s.target_port
+
+######################################### END: StreamHandler       #####################
+######################################### BEGIN: PingHandler       #####################
+
+# This class extends the StreamHandler
+class PingHandler(StreamHandler):
+  def __init__(self, c, selmgr, router, partial):
+    # Init the StreamHandler
+    StreamHandler.__init__(self, c, selmgr)    
+    # Anything ping-related
+    self.ping_queue = Queue.Queue()	# (circ_id, hop)-pairs
+    self.start_times = {}		# dict mapping (circ_id, hop):start_time TODO: cleanup
+    # Start the Pinger that schedules the ping-connections
+    self.ping_manager = Pinger(self, router, partial)
+    self.ping_manager.setDaemon(True)
+    self.ping_manager.start()
+
   # Attach a ping stream to its circuit
-  def attach_ping(self, stream):
+  def attach_ping(self, stream, arrived_at):
     plog("DEBUG", "New ping request")
     # Get info from the Queue TODO: check if empty
-    ping_info = self.ping_circs.get()
+    ping_info = self.ping_queue.get()
     # Extract ping-info
     circ_id = ping_info[0]
     hop = ping_info[1]
@@ -363,10 +547,11 @@
       # Get the circuit 
       if circ_id in self.circuits:
         circ = self.circuits[circ_id]
+	# TODO: and not circ.busy
         if circ.built and not circ.closed:        
           self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
           # Measure here or move to before attaching?
-          self.start_times[(circ_id, hop)] = time.time()
+          self.start_times[(circ_id, hop)] = arrived_at
 	  stream.hop = hop
           stream.pending_circ = circ # Only one possible here
           circ.pending_streams.append(stream)
@@ -380,14 +565,15 @@
     except TorCtl.ErrorReply, e:
       plog("WARN", "Error attaching stream: " + str(e.args))
 
-  # Catch stream status events
   def stream_status_event(self, s):
+    """ Catch stream status events: Handle NEW and DETACHED here, 
+        pass other events to StreamHandler """
     # Construct debugging output
     output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), s.target_host, str(s.target_port)]
     if s.reason: output.append("REASON=" + s.reason)
     if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
     plog("DEBUG", " ".join(output))
-    
+     
     # If target_host is not an IP-address
     if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
       s.target_host = "255.255.255.255" # ignore DNS for exit policy check
@@ -403,7 +589,7 @@
       if (stream.host == ping_dummy_host) & (stream.port == ping_dummy_port):
         # Set isPing
 	stream.isPing = True
-        self.attach_ping(stream)
+        self.attach_ping(stream, s.arrived_at)
       else:
         self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
     
@@ -430,29 +616,37 @@
 	    # Close the circuit
 	    plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
 	    self.circuits[s.circ_id].closed = True
-	    self.c.close_circuit(s.circ_id)
+	    try: self.c.close_circuit(s.circ_id)
+	    except TorCtl.ErrorReply, e: 
+	      plog("ERROR", "Failed closing circuit " + str(s.circ_id) + ": " + str(e))	    
 	  # Set RTT for circ to None
-	  self.circuits[s.circ_id].total_rtt = None
+	  self.circuits[s.circ_id].current_rtt = None
 	  circs_lock.release()
 	  # Only close the stream
           self.c.close_stream(s.strm_id, 7)
 	  return
         # This is a successful ping: measure here
-	now = time.time()
 	hop = self.streams[s.strm_id].hop
-	rtt = now - self.start_times[(s.circ_id, hop)]
+	# Compute RTT using arrived_at 
+	rtt = s.arrived_at - self.start_times[(s.circ_id, hop)]
         plog("INFO", "Measured RTT: " + str(rtt) + " sec")
 	# Save RTT to circuit
-	self.circuits[s.circ_id].rtts[hop] = rtt
-	# Additionally save total_rtt ?
+	self.circuits[s.circ_id].part_rtts[hop] = rtt
 	if hop == None:
-	  self.circuits[s.circ_id].total_rtt = rtt
+	  # This is a total circuit measuring	  
+	  self.circuits[s.circ_id].current_rtt = rtt
+	  self.circuits[s.circ_id].rtt_history.append(rtt)
+	  plog("DEBUG", "Added RTT to history: " + str(self.circuits[s.circ_id].rtt_history))
+	  # Refresh the stats
+	  self.circuits[s.circ_id].refresh_stats()
+	  self.circuits[s.circ_id].age += 1
 	
-	# Close if slow-max is reached
-        if rtt >= slow:
+	# Close if slow-max is reached on avg_rtt
+        if self.circuits[s.circ_id].avg_rtt >= slow:
 	  self.circuits[s.circ_id].slowness_counter += 1
-	  if self.circuits[s.circ_id].slowness_counter >= timeout_limit and not self.circuits[s.circ_id].closed:
-	    plog("DEBUG", "Slow-max is reached --> closing circuit " + str(s.circ_id))
+	  plog("DEBUG", "Slow circuit " + str(s.circ_id))
+	  if self.circuits[s.circ_id].slowness_counter >= slowness_limit and not self.circuits[s.circ_id].closed:
+	    plog("DEBUG", "Slow-max (" + str(slowness_limit) + ") is reached --> closing circuit " + str(s.circ_id))
 	    self.circuits[s.circ_id].closed = True
 	    self.c.close_circuit(s.circ_id)
 
@@ -465,15 +659,9 @@
       
       # Detect timeouts on user streams
       if s.reason == "TIMEOUT":
-        circs_lock.acquire()
-	self.circuits[s.circ_id].timeout_counter += 1
-	plog("DEBUG", str(self.circuits[s.circ_id].timeout_counter) + " timeout(s) on circuit " + str(s.circ_id))
-	if self.circuits[s.circ_id].timeout_counter >= timeout_limit and not self.circuits[s.circ_id].closed:
-	  # Close the circuit
-	  plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
-	  self.circuits[s.circ_id].closed = True
-	  self.c.close_circuit(s.circ_id)
-	circs_lock.release()
+	# Don't increase counter, cause could be a machine that's not responding
+	#self.circuits[s.circ_id].timeout_counter += 1
+	plog("DEBUG", "User stream timed out on circuit " + str(s.circ_id))
 
       # Stream was pending
       if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
@@ -482,155 +670,121 @@
       self.streams[s.strm_id].pending_circ = None
       self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
     
-    # SUCCEEDED
-    elif s.status == "SUCCEEDED":
-      if s.strm_id not in self.streams:
-        plog("NOTICE", "Succeeded stream " + str(s.strm_id) + " not found")
-        return
-      if s.circ_id and self.streams[s.strm_id].pending_circ.circ_id != s.circ_id:
-        # Hrmm.. this can happen on a new-nym.. Very rare, putting warn
-        # in because I'm still not sure this is correct
-        plog("WARN", "Mismatch of pending: "
-          + str(self.streams[s.strm_id].pending_circ.circ_id) + " vs "
-          + str(s.circ_id))
-        circs_lock.acquire()
-	self.streams[s.strm_id].circ = self.circuits[s.circ_id]
-        circs_lock.release()
-      else:
-        self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
-      self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
-      self.streams[s.strm_id].pending_circ = None
-      self.streams[s.strm_id].attached_at = s.arrived_at
-    
-    # FAILED or CLOSED
-    elif s.status == "FAILED" or s.status == "CLOSED":
-      if s.strm_id not in self.streams:
-        plog("NOTICE", "Failed stream " + str(s.strm_id) + " not found")
-        return
-      #if not s.circ_id: plog("WARN", "Stream " + str(s.strm_id) + " closed/failed from no circuit")
-      # We get failed and closed for each stream. OK to return 
-      # and let the CLOSED do the cleanup
-      if s.status == "FAILED":
-        # Avoid busted circuits that will not resolve or carry traffic
-        self.streams[s.strm_id].failed = True
-        circs_lock.acquire()
-	if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
-        elif self.streams[s.strm_id].attached_at != 0: 
-	  plog("WARN","Failed stream on unknown circuit " + str(s.circ_id))
-        circs_lock.release()
-	return
-      # CLOSED
-      if self.streams[s.strm_id].pending_circ:
-        self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
-      # Actual removal of the stream
-      del self.streams[s.strm_id]
+    else: 
+      self.handle_other_events(s)
 
-    # REMAP
-    elif s.status == "REMAP":
-      if s.strm_id not in self.streams:
-        plog("WARN", "Remap id "+str(s.strm_id)+" not found")
-      else:
-        if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
-          s.target_host = "255.255.255.255"
-          plog("NOTICE", "Non-IP remap for "+str(s.strm_id) + " to " + s.target_host)		   
-        self.streams[s.strm_id].host = s.target_host
-        self.streams[s.strm_id].port = s.target_port
+######################################### END: PingHandler         #####################
+######################################### BEGIN: Pinger            #####################
 
-######################################### END: EventHandler        #####################
-######################################### BEGIN: CircuitManager    #####################
-
-# This is the main class that keeps track of: 
-# -- Connection to Tor
-# -- EventHandler
-#
-# Does work regularly
-# TODO: Switch circuit-managing off to get circuits created from Tor
-# TODO: Add a NetworkModel to this!
-# TODO: Make this to contain the circuit-list and use a pinger-thread
-
-class CircuitManager(threading.Thread):
-
-  def __init__(self, selmgr, conn, event_handler):
-    # Set everything
-    self.selmgr = selmgr
-    self.conn = conn
-    self.handler = event_handler
-    # Create the Pinger
-    self.pinger = Pinger(ping_dummy_host, ping_dummy_port)
+class Pinger(threading.Thread):
+  """ Separate thread that triggers the Socks4-connections for pings """
+  def __init__(self, ping_handler, router=None, partial=False):
+    self.handler = ping_handler					# The EventHandler
+    self.router = router					# This router is us
+    self.measure_partial = partial				# Flag to switch off partial measurings
+    if self.measure_partial:
+      # Create the model for recording link-RTTs
+      self.model = NetworkModel(self.router)
+    # The Pinger-object
+    #self.pinger = Pinger(ping_dummy_host, ping_dummy_port)
     # Call constructor of superclass
     threading.Thread.__init__(self)
   
-  # The run()-method
   def run(self):
+    """ The run()-method """
     while self.isAlive():
+      time.sleep(sleep_interval)
       self.do_work()
-      time.sleep(sleep_interval)
- 
-  # Do the work
+
   def do_work(self):
-    # Get number of circuits
-    circs_lock.acquire()
-    n = len(self.handler.circuits.values())
-    circs_lock.release() 
-    # Schedule (idle_circuits-n) circuit-buildups
-    while (n < idle_circuits):      
-      self.build_idle_circuit()
-      plog("DEBUG", "Scheduled circuit No. " + str(n+1))
-      n += 1
+    """ Do the work """
     # Measure RTTs of circuits
     self.measure()
-    self.print_circuits()
+    # self.handler.schedule_low_prio(lambda x: x.measure())
+    # Compute link-RTTs
+    if self.measure_partial:
+      self.compute_link_RTTs()
+    # Print circuits for info
+    self.handler.schedule_low_prio(lambda x: x.print_circuits())
 
-  # Build an idle circuit
-  # Better here than in EventHandler's thread
-  def build_idle_circuit(self):
-    circ = None
-    while circ == None:
-      try:
-        # Build the circuit
-        circ = self.conn.build_circuit(self.selmgr.pathlen, self.selmgr.path_selector)
-        # Using lock:
-	circs_lock.acquire()
-	self.handler.circuits[circ.circ_id] = circ
-        circs_lock.release()
-      except TorCtl.ErrorReply, e:
-        # FIXME: How come some routers are non-existant? Shouldn't
-        # we have gotten an NS event to notify us they disappeared?
-        plog("NOTICE", "Error building circ: " + str(e.args))
-
-  # Measure RTTs of all circuits
   def measure(self):
+    """ Measure RTTs of all circuits """
+    # XXX: Schedule in the EventHandler or leave out queueing!
     circs_lock.acquire()
     circs = self.handler.circuits.values()
     circs_lock.release()
     for c in circs:
       if c.built:
-        # Get length of c ...
-	id = c.circ_id
-	# TODO: Measure for all hops, test if result is 
-	# bigger each time, else start again
-	#self.handler.queue_ping_circ((id, 2))
-        # Trigger ping
-	#self.pinger.ping()
-	# Put in the queue (circ, hop), XXX: synchronize!
-	self.handler.queue_ping_circ((id, None))
-        # Trigger ping
-	self.pinger.ping()
+        # Get id & length of c
+      	id = c.circ_id
+	if self.measure_partial:
+	  path_len = len(c.path)
+	  for i in xrange(1, path_len):
+	    # Put in the queue: (circ, hop)
+	    self.handler.ping_queue.put((id, i))
+	    self.ping()
+	# And for the whole circuit ...
+	self.handler.ping_queue.put((id, None))
+	self.ping()
 
-  # Print circuits
-  def print_circuits(self):
+  # Hmm, there is no "try .. except .. finally .." in Python < 2.5 !!  
+  def ping(self):
+    """ This creates a connection to dummy_host/_port using Socks4 """
+    s = None
+    try:
+      try:
+        s = socks.socksocket()
+        s.setproxy(socks.PROXY_TYPE_SOCKS4, socks_host, socks_port)
+        s.connect((ping_dummy_host, ping_dummy_port))
+      except socks.Socks4Error, e:
+	# Don't do nothing, this will actually happen
+	# print("Got Exception: " + str(e))
+	pass
+    finally:
+      # Close the socket if open
+      if s:
+        s.close()
+
+  def compute_link_RTTs(self):
+    """ Get a copy(?) of the circs an check if we can compute links """    
+    # XXX: Get the circuits
     circs_lock.acquire()
     circs = self.handler.circuits.values()
-    plog("INFO", "We have " + str(len(circs)) + " circuits")
+    circs_lock.release()
     for c in circs:
-      out = "+ Circuit " + str(c.circ_id) + ": "
-      for r in c.path: out = out + " " + r.nickname + "(" + str(r.country_code) + ")"
-      if c.total_rtt: out = out + " (RTT=" + str(c.total_rtt) + ")"
-      if not c.built: out = out + " (not yet built)"
-      print(out)
-    circs_lock.release()
+      # Get the length
+      path_len = len(c.path)
+      # Go through the path
+      for i in xrange(1,path_len):
+        if i in c.part_rtts:
+          # First hop --> add Link from Root to 1
+          if i == 1:
+	    link_rtt = c.part_rtts[i]
+	    self.model.add_link(self.router, c.path[i-1], link_rtt)
+	  # Handle i -- (i+1)
+          if i+1 in c.part_rtts:
+            link_rtt = c.part_rtts[i+1] - c.part_rtts[i]
+	    if link_rtt > 0:          
+	      plog("INFO", "Computed link RTT = " + str(link_rtt))
+	      # Save to NetworkModel
+	      self.model.add_link(c.path[i-1], c.path[i], link_rtt)
+	    else:
+	      plog("WARN", "Negative RTT: " + str(link_rtt))
+	  # Handle (n-1) -- n
+	  elif None in c.part_rtts:
+            # We have a total value
+	    link_rtt = c.part_rtts[None] - c.part_rtts[i]
+	    if link_rtt > 0:          
+	      plog("INFO", "Computed link RTT = " + str(link_rtt))
+	      # Save to NetworkModel
+	      self.model.add_link(c.path[i-1], c.path[i], link_rtt)
+	    else:
+	      plog("WARN", "Negative RTT: " + str(link_rtt))
+    # Print out the model
+    self.model.print_graph()
+    self.model.find_circuits()
 
-######################################### END: CircuitManager      #####################
+######################################### END: Pinger              #####################
 
 # Return a connection to Tor's control port
 def connect(control_host, control_port):
@@ -641,7 +795,8 @@
   
 # Do the configuration
 def configure(conn):
-  # Get our own IP and country here, TODO: use try .. except?
+  global my_ip, my_country
+  # Get our own IP and country
   try:
     info = conn.get_info("address")
     my_ip = info["address"]
@@ -664,11 +819,13 @@
   conn = connect(control_host, control_port)
   #conn.debug(file("control.log", "w"))
   conn.authenticate()
+  # Configure myself  
+  configure(conn)
+  # Setup a router instance here
+  router = GeoIPSupport.GeoIPRouter(TorCtl.Router(None,"ROOT",None,False,None,None,my_ip,None,None))	
   # Set Handler to the connection
-  handler = EventHandler(conn, __selmgr)
+  handler = PingHandler(conn, __selmgr, router, measure_partial_circs)
   conn.set_event_handler(handler)
-  # Configure myself
-  configure(conn)
   # Go to sleep to be able to get killed from the commandline
   try:
     while True:



More information about the tor-commits mailing list