[or-cvs] r10748: Added improved code to record statistics on setup-durations (torflow/trunk)

renner at seul.org renner at seul.org
Fri Jul 6 11:43:12 UTC 2007


Author: renner
Date: 2007-07-06 07:43:12 -0400 (Fri, 06 Jul 2007)
New Revision: 10748

Modified:
   torflow/trunk/op-addon.py
Log:

  Added improved code to record statistics on setup-durations of the created circuits 
  and more detailed information (single extend-times + failures with reasons).



Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py	2007-07-06 11:19:28 UTC (rev 10747)
+++ torflow/trunk/op-addon.py	2007-07-06 11:43:12 UTC (rev 10748)
@@ -2,7 +2,7 @@
 """
   RWTH Aachen University, Informatik IV
   Copyright (C) 2007 Johannes Renner 
-  Contact: renner at i4.informatik.rwth-aachen.de
+  Contact: renner <AT> i4.informatik.rwth-aachen.de
 """
 # Addon for Onion Proxies (prototype-v0.0-alpha):
 # Shall eventually improve the performance of anonymous communications 
@@ -44,22 +44,23 @@
 
 # No of idle circuits to build preemptively
 # TODO: Also configure ports to use
-idle_circuits = 4
+idle_circuits = 6
 
 # Measure complete circuits
 measure_circs = True
 # Sleep interval between working loads in sec
 initial_interval = 8
-sleep_interval = 2
-# Close circ after n timeouts or avg measured slownesses
+sleep_interval = 1
+# Close a circ after n timeouts or avg measured slownesses
 timeout_limit = 1
-# Slow RTT := x seconds, close circs slower & create only circs faster than this
-slowness_limit = 2
+# Close a circ after n measured slownesses
+slowness_limit = 5
+# Close circs slower & create only circs faster than this
 slow = 1.
 
 # Set to True if we want to measure partial circuits
 # This also enables circuit creation from the model
-measure_partial_circs = False
+measure_partial_circs = True
 # Minimum number of proposals to choose from
 min_proposals = 1
 # Min ratio of traditionally created circs
@@ -72,7 +73,7 @@
 num_tests = 5
 
 # Do geoip-configuration here
-# Set src_country below when setting up our location
+# Set entry_country when setting up our location?
 path_config = GeoIPSupport.GeoIPConfig(unique_countries = True,
                                        entry_country = None,
 				       exit_country = None,
@@ -128,7 +129,7 @@
 ######################################### Stats                    #####################
 
 class Stats:
-  """ Statistics class that can be used for recording stats about measured RTTs """
+  """ Statistics class that is used for recording stats about measured RTTs, circuit creations """
   def __init__(self):
     self.values = []
     self.min = 0.0
@@ -176,6 +177,7 @@
 
 ######################################## FileHandler              ######################
 
+# TODO: Move to TorCtl.TorUtil?
 class FileHandler:
   """ FileHandler class for writing/appending collected data to a file """
   def __init__(self, filename):
@@ -207,7 +209,6 @@
     self.slowness_counter = 0 	# slowness limit
     self.closed = False		# mark circuit closed
     self.rtt_created = False	# if this was created from the model
-    # List of extend-times
     self.extend_times = []      # list of all extend-times, sum up for setup duration
  
   def add_rtt(self, rtt):
@@ -244,18 +245,19 @@
 
 ######################################### BEGIN: CircuitStats      #####################
 
+# TODO: Move to TorCtl.TorUtil?
 class CircuitBuildingStats(Stats):
   """ Create an instance of this and gather overall circuit stats """
   def __init__(self):
     Stats.__init__(self)
-    self.timeouts = 0        # count occurrences of timeouts
+    self.failures = 0   # count failures
 
   def to_string(self):
     """ Create a string for writing to a file """
-    s = "Circuit buildups: "
+    s = "Successful circuit buildups: "
     s += str(len(self.values)) + " records, median=" + str(self.median) + " sec, avg=" + str(self.mean) + " sec" 
     s += ", dev=" + str(self.dev) + " sec (min=" + str(self.min) + " sec, max=" + str(self.max) + " sec)\n"
-    s += "Total number of timeouts: " + str(self.timeouts)
+    s += "Total number of failures during buildup: " + str(self.failures)
     return s
 
 ######################################### BEGIN: NetworkModel      #####################
@@ -289,11 +291,6 @@
     # Compute the expected RTT (from current value?)
     self.rtt = reduce(lambda x,y: x + y.current_rtt, self.links, 0.0)
  
-  # TODO: Call before actually creating a circuit
-  def will_exit_to(self, host, port):
-    """ Check for the last router in the path """
-    return self.path(len(self.path)-1).will_exit_to(host, port)
-
   def to_string(self):
     """ Create a string for printing out information """
     s = ""
@@ -319,6 +316,37 @@
     """ Add a link to the graph given src, dest & rtt """
     self.graph.add_edge(src, dest, LinkInfo(src, dest, rtt))
 
+  def add_circuit(self, c):
+    """ Check if we can compute RTTs of single links for circuit c and store these in the model """
+    # Get the length
+    path_len = len(c.path)
+    # Go through the path
+    for i in xrange(1,path_len):
+      if i in c.part_rtts:
+        # First hop --> add Link from Root to 1
+        if i == 1:
+	  link_rtt = c.part_rtts[i]
+	  self.add_link(self.root, c.path[i-1], link_rtt)
+	# Handle i -- (i+1)
+        if i+1 in c.part_rtts:
+          link_rtt = c.part_rtts[i+1] - c.part_rtts[i]
+	  if link_rtt > 0:          
+	    plog("INFO", "Computed link-RTT " + str(i) + ": " + str(link_rtt))
+	    # Save to NetworkModel
+	    self.add_link(c.path[i-1], c.path[i], link_rtt)
+	  else:
+	    plog("WARN", "Negative link-RTT " + str(i) + ": " + str(link_rtt))
+	# Handle (n-1) -- n
+	elif None in c.part_rtts:
+          # We have a total value
+	  link_rtt = c.part_rtts[None] - c.part_rtts[i]
+	  if link_rtt > 0:          
+	    plog("INFO", "Computed link-RTT " + str(i) + ": " + str(link_rtt))
+	    # Save to NetworkModel
+	    self.add_link(c.path[i-1], c.path[i], link_rtt)
+	  else:
+	    plog("WARN", "Negative link-RTT " + str(i) + ": " + str(link_rtt))
+
   def get_link_info(self, path):
     """ From a path given as list of routers, return link-infos """
     links = []
@@ -343,7 +371,7 @@
     for p in self.proposals:
       print(p.to_string())
 
-  def check_proposals(self, n):
+  def get_proposals(self, n):
     """ Return all proposals with rtt <= n seconds """
     ret = []
     for p in self.proposals:
@@ -384,6 +412,9 @@
     self.num_circuits = num_circuits            # size of the circuit pool
     self.check_circuit_pool()	                # bring up the pool of circs
     self.circ_stats = CircuitBuildingStats()    # record buildup-times, no. of timeouts
+    # Filehandlers for saving general and more detailed stats about circuit building
+    self.stats_logger = FileHandler("data/circ_setup_stats")
+    self.setup_logger = FileHandler("data/circ_setup_durations")
 
   def check_circuit_pool(self):
     """ Init or check the status of our pool of circuits """
@@ -405,26 +436,20 @@
     except TorCtl.ErrorReply, e: 
       plog("ERROR", "Failed closing circuit " + str(id) + ": " + str(e))	    
 
-  def print_circuits(self):
-    """ Print out our circuits plus some info """
-    circs = self.circuits.values()
+  def print_circuits(self, list=None):
+    """ Print out our circuits plus some info, optionally pass a list """
+    if list: circs = list
+    else: circs = self.circuits.values()
     plog("INFO", "We have " + str(len(circs)) + " circuits:")
     for c in circs:
       print("+ " + c.to_string())
 
-  def check_path(self, path):
-    """ Check if we currently do not have (TODO: had?) a circuit with the given path """
-    for c in self.circuits.values():
-      if c.path == path:
-        return False
-    return True
-
   def build_idle_circuit(self):
     """ Build an idle circuit """
     circ = None
     while circ == None:
       try:
-        # Build the circuit, configure which ports to use
+        # Configure which port to use here
 	self.selmgr.set_target("255.255.255.255", 80)
         circ = self.c.build_circuit(self.selmgr.pathlen, self.selmgr.path_selector)
 	self.circuits[circ.circ_id] = circ
@@ -460,6 +485,17 @@
     elif c.status == "FAILED" or c.status == "CLOSED":
       # XXX: Can still get a STREAM FAILED for this circ after this
       circ = self.circuits[c.circ_id]
+      
+      # Logging and statistics
+      if not circ.built:
+        message = ["FAILED"]
+        if c.reason: message.append("REASON=" + c.reason)
+        if c.remote_reason: message.append("REMOTE_REASON=" + c.remote_reason)
+        self.setup_logger.append(" ".join(message) + ": " + str(circ.extend_times))
+        # Increase counter and write circ_stats to file
+        self.circ_stats.failures += 1
+        self.stats_logger.write(self.circ_stats.to_string()) 
+      
       # Actual removal of the circ
       del self.circuits[c.circ_id]
       # Give away pending streams
@@ -473,19 +509,22 @@
     # BUILT
     elif c.status == "BUILT":
       self.circuits[c.circ_id].built = True
+      for stream in self.circuits[c.circ_id].pending_streams:
+        try:  
+          self.c.attach_stream(stream.strm_id, c.circ_id)
+        except TorCtl.ErrorReply, e:
+          # No need to retry here. We should get the failed
+          # event for either the circ or stream next
+          plog("WARN", "Error attaching stream: " + str(e.args))
+      
+      # Log setup durations to file
+      self.setup_logger.append(str(self.circuits[c.circ_id].extend_times))
       # Compute duration by summing up extend_times
       duration = reduce(lambda x, y: x+y, self.circuits[c.circ_id].extend_times, 0.0)
-      plog("DEBUG", "Circuit " + str(c.circ_id) + " needed " + str(duration) + " seconds to be built")
-      # Add duration to circ_stats
+      plog("DEBUG", "Circuit " + str(c.circ_id) + " needed " + str(duration) + " seconds to be built")      
+      # Add duration to circ_stats and write file
       self.circ_stats.add_value(duration)
-      try:
-        for stream in self.circuits[c.circ_id].pending_streams:
-          self.c.attach_stream(stream.strm_id, c.circ_id)
-      except TorCtl.ErrorReply, e:
-        # No need to retry here. We should get the failed
-        # event for either the circ or stream next
-        plog("WARN", "Error attaching stream: " + str(e.args))
-	return
+      self.stats_logger.write(self.circ_stats.to_string()) 
     
     # OTHER?
     else:
@@ -499,7 +538,8 @@
   def __init__(self, c, selmgr, num_circs):    
     # Call constructor of superclass
     CircuitHandler.__init__(self, c, selmgr, num_circs)
-    self.sorted_circs = None		# attribute to hold a sorted list of the circuits
+    self.sorted_circs = None    # sorted list of the circs for attaching streams, initially None
+    #self.new_nym = True
 
   def clear_dns_cache(self):
     """ Send signal CLEARDNSCACHE """
@@ -511,21 +551,37 @@
     """ Close a stream with given id and reason """
     self.c.close_stream(id, reason)
 
+  def create_and_attach(self, stream, unattached_streams):
+    """ Create a new circuit and attach stream + unattached_streams """
+    circ = None
+    self.selmgr.set_target(stream.host, stream.port)
+    while circ == None:
+      try:
+        circ = self.c.build_circuit(self.selmgr.pathlen, self.selmgr.path_selector)
+      except TorCtl.ErrorReply, e:
+        plog("NOTICE", "Error building circ: " + str(e.args))
+    for u in unattached_streams:
+      plog("DEBUG", "Attaching " + str(u.strm_id) + " pending build of circuit " + str(circ.circ_id))
+      u.pending_circ = circ      
+    circ.pending_streams.extend(unattached_streams)
+    self.circuits[circ.circ_id] = circ
+    self.last_exit = circ.exit
+ 
   def attach_stream_any(self, stream, badcircs):
     """ Attach a regular user stream """
-    # Newnym, and warn if not built plus pending
     unattached_streams = [stream]
     if self.new_nym:
       self.new_nym = False
       plog("DEBUG", "Obeying new nym")
       for key in self.circuits.keys():
-        if (not self.circuits[key].dirty and len(self.circuits[key].pending_streams)):
+        if (not self.circuits[key].dirty
+            and len(self.circuits[key].pending_streams)):
           plog("WARN", "New nym called, destroying circuit "+str(key)
              +" with "+str(len(self.circuits[key].pending_streams))
              +" pending streams")
           unattached_streams.extend(self.circuits[key].pending_streams)
           self.circuits[key].pending_streams.clear()
-        # FIXME: Consider actually closing circ if no streams.
+        # FIXME: Consider actually closing circs if no streams
         self.circuits[key].dirty = True
 
     # Check if there is a sorted list of circs
@@ -534,14 +590,15 @@
     # Choose a circuit
     for circ in list:
       # Check each circuit
-      if circ.built and not circ.closed and circ.circ_id not in badcircs:
+      if circ.built and not circ.closed and circ.circ_id not in badcircs and not circ.dirty:
         if circ.exit.will_exit_to(stream.host, stream.port):
           try:
             self.c.attach_stream(stream.strm_id, circ.circ_id)
             stream.pending_circ = circ # Only one possible here
-            circ.pending_streams.append(stream)
-	    # Clear cache after the attach?
-	    self.clear_dns_cache()
+            circ.pending_streams.append(stream)    
+            # Clear cache after the attach?
+	    #self.clear_dns_cache()
+            self.last_exit = circ.exit
           except TorCtl.ErrorReply, e:
             # No need to retry here. We should get the failed
             # event for either the circ or stream next
@@ -551,20 +608,7 @@
 	else:
 	  plog("DEBUG", "Circuit " + str(circ.circ_id) + " won't exit")
     else:
-      circ = None
-      self.selmgr.set_target(stream.host, stream.port)
-      while circ == None:
-        try:
-          circ = self.c.build_circuit(self.selmgr.pathlen, self.selmgr.path_selector)
-        except TorCtl.ErrorReply, e:
-          plog("NOTICE", "Error building circ: " + str(e.args))
-      for u in unattached_streams:
-        plog("DEBUG", "Attaching " + str(u.strm_id) + " pending build of circuit " + str(circ.circ_id))
-        u.pending_circ = circ      
-      circ.pending_streams.extend(unattached_streams)
-      # Problem here??
-      self.circuits[circ.circ_id] = circ
-    self.last_exit = circ.exit
+      self.create_and_attach(stream, unattached_streams)
 
   def stream_status_event(self, s):
     """ Catch user stream events """
@@ -670,13 +714,11 @@
     # Additional stuff for partial measurings
     self.partial_circs = partial
     if self.partial_circs:
-      self.router = router			# this object represents this OR
+      self.router = router			# object that represents this OR
       self.model = NetworkModel(self.router)	# model for recording link-RTTs
-    # For saving stats about circuit building
-    self.circ_filehandler = FileHandler("data/circ_stats")
     # Handle testing_mode
     if testing_mode:
-      self.latency_filehandler = FileHandler("data/mean_latencies")
+      self.latency_logger= FileHandler("data/mean_latencies")
     # Init the StreamHandler
     StreamHandler.__init__(self, c, selmgr, num_circs)    
     # Start the Pinger that triggers the connections
@@ -713,66 +755,22 @@
         self.ping_queue.put((id, None))
         plog("DEBUG", "Enqueued circuit " + str(id) + " hop None")
 
-  # XXX: Not used
-  def compute_all_RTTs(self):
-    """ Get the circs and compute everything """    
-    circs = self.circuits.values()
-    # Measure also the duration
-    start = time.time()
-    for c in circs:
-      self.compute_link_RTTs(c)
-    plog("DEBUG", "Computation of link-RTTs took us " + str(time.time()-start) + " seconds")
-    # Print out the model
-    self.model.print_graph()
-    self.model.find_circuits()
-
-  def compute_link_RTTs(self, c):
-    """ Check if we can compute RTTs of single links for circuit c and store these in the model """
-    # Get the length
-    path_len = len(c.path)
-    # Go through the path
-    for i in xrange(1,path_len):
-      if i in c.part_rtts:
-        # First hop --> add Link from Root to 1
-        if i == 1:
-	  link_rtt = c.part_rtts[i]
-	  self.model.add_link(self.router, c.path[i-1], link_rtt)
-	# Handle i -- (i+1)
-        if i+1 in c.part_rtts:
-          link_rtt = c.part_rtts[i+1] - c.part_rtts[i]
-	  if link_rtt > 0:          
-	    plog("INFO", "Computed link-RTT " + str(i) + ": " + str(link_rtt))
-	    # Save to NetworkModel
-	    self.model.add_link(c.path[i-1], c.path[i], link_rtt)
-	  else:
-	    plog("WARN", "Negative link-RTT " + str(i) + ": " + str(link_rtt))
-	# Handle (n-1) -- n
-	elif None in c.part_rtts:
-          # We have a total value
-	  link_rtt = c.part_rtts[None] - c.part_rtts[i]
-	  if link_rtt > 0:          
-	    plog("INFO", "Computed link-RTT " + str(i) + ": " + str(link_rtt))
-	    # Save to NetworkModel
-	    self.model.add_link(c.path[i-1], c.path[i], link_rtt)
-	  else:
-	    plog("WARN", "Negative link-RTT " + str(i) + ": " + str(link_rtt))
-
   def attach_ping(self, stream):
     """ Attach a ping stream to its circuit """
     if self.ping_queue.empty():
       # This round has finished
       plog("INFO", "Queue is empty --> round has finished, closing stream " + str(stream.strm_id))
       self.close_stream(stream.strm_id, 5)
+      # Empty start_times
+      self.start_times = {}
       # Call the rest from here?
-      self.print_circuits()
+      self.print_circuits(self.sorted_circs)
       if self.partial_circs:
         # Print out the model
         self.model.print_graph()
         self.model.find_circuits()
       # Enqueue again all circs
       self.enqueue_pings()
-      # Write circ_stats to a file every round
-      self.circ_filehandler.write(self.circ_stats.to_string()) 
 
     else:
       # Get the info and extract
@@ -823,9 +821,9 @@
           if self.partial_circs:
 	    if self.circuits[s.circ_id].rtt_created:
 	      # TODO: Do we want to check if this circuit is _really_ new?
-              self.latency_filehandler.append(str(self.circuits[s.circ_id].stats.mean))
+              self.latency_logger.append(str(self.circuits[s.circ_id].stats.mean))
           else:
-	    self.latency_filehandler.append(str(self.circuits[s.circ_id].stats.mean))
+	    self.latency_logger.append(str(self.circuits[s.circ_id].stats.mean))
 	  # Close the circuit
           self.close_circuit(s.circ_id)
       
@@ -837,9 +835,9 @@
           self.close_circuit(s.circ_id)
       # Resort only if this is for the complete circ
       self.refresh_sorted_list()
-
       if self.partial_circs == True:
-        self.compute_link_RTTs(self.circuits[s.circ_id])
+        # Add the links of this circuit to the model
+        self.model.add_circuit(self.circuits[s.circ_id])
 
   def stream_status_event(self, s):
     """ Separate pings from regular streams directly """
@@ -874,8 +872,6 @@
         if self.circuits[s.circ_id].timeout_counter >= timeout_limit and not self.circuits[s.circ_id].closed:
           # Close the circuit
           plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
-          # Increase total counter for timeouts
-          self.circ_stats.timeouts += 1
           self.close_circuit(s.circ_id)
         # Set RTT for this circ to None
         self.circuits[s.circ_id].current_rtt = None
@@ -900,6 +896,17 @@
         trad_circs += 1
     return trad_circs
 
+  def path_is_ok(self, path):
+    """ Check if we currently do not have (TODO: had?) a circuit 
+        with the given path (= Routers) """
+    for c in self.circuits.values():
+      if c.path == path: return False
+    # XXX: Check if this path can exit?
+    if not path[len(path)-1].will_exit_to("255.255.255.255", 80): 
+      plog("ERROR", "This circuit would not exit")
+      return False
+    return True
+
   def build_idle_circuit(self):
     """ Override from CircuitHandler to support circuit-creation from the NetworkModel """
     if self.partial_circs:
@@ -911,7 +918,7 @@
       plog("DEBUG","Expected Ratio = " + str(ratio) + " >= " + str(min_ratio) + " ?")
       if ratio >= min_ratio:
         # Get the proposals RTT <= slow
-	proposals = self.model.check_proposals(slow)
+	proposals = self.model.get_proposals(slow)
 	# Check if we have >= min_proposals
         if len(proposals) >= min_proposals:
 	  proposals = sort_list(proposals, lambda x: x.rtt)
@@ -921,9 +928,9 @@
 	      
 	    choice = random.choice(proposals)
             #choice = proposals[0]
-	    
+            	    
             # Check if we already have a circ with this path
-            if self.check_path(choice.path):
+            if self.path_is_ok(choice.path):
               plog("INFO", "Chosen proposal: " + choice.to_string())
               try:
                 circ = self.c.build_circuit_from_path(choice.path)
@@ -973,7 +980,7 @@
       # Close the socket if open
       if s: s.close()
 
-######################################### END: Pinger              #####################
+######################################### End: Classes             #####################
 
 def connect(control_host, control_port):
   """ Return a connection to Tor's control port """
@@ -984,10 +991,10 @@
 def setup_location(conn):
   """ Setup a router object representing this proxy """
   global path_config
-  plog("INFO","Setting up our location")
+  plog("INFO", "Setting up our location")
   ip = None
-  # Try to get our IP from Tor
   try:
+    # Try to get our IP
     info = conn.get_info("address")
     ip = info["address"]
   except: 
@@ -995,9 +1002,9 @@
     ip = "127.0.0.1"
   # Set up a router object
   router = GeoIPSupport.GeoIPRouter(TorCtl.Router(None,"ROOT",None,False,None,None,ip,None,None))
-  # TODO: Check if ip == None
+  # TODO: Check if ip == None?
   plog("INFO", "Our IP address is " + router.get_ip_dotted() + " [" + str(router.country_code) + "]")
-  # Set entry_country from here?
+  # Set entry_country here?
   # path_config.entry_country = router.country_code
   return router
  
@@ -1012,16 +1019,21 @@
   conn.set_option("__DisablePredictedCircuits", "1")
 
 def startup(argv):
-  # Connect to Tor process
-  conn = connect(control_host, control_port)
-  #conn.debug(file("control.log", "w"))
-  conn.authenticate()
+  try:
+    # Connect to Tor process
+    conn = connect(control_host, control_port)
+    conn.authenticate()
+    #conn.debug(file("control.log", "w"))
+  except socket.error, e:
+    plog("ERROR", "Could not connect to Tor process .. running?")
+    return
   # Setup a router instance here
   router = setup_location(conn)
   # Configure myself  
   configure(conn)
-  # Set Handler to the connection
+  # Set Handler to the connection  
   if measure_circs:
+    # We measure latencies
     if measure_partial_circs:
       handler = PingHandler(conn, __selmgr, idle_circuits, router, True)
     else:



More information about the tor-commits mailing list