[or-cvs] r10531: Introduced a new method of measuring all circuits in a pool (in torflow/trunk: . TorCtl)

renner at seul.org renner at seul.org
Fri Jun 8 10:48:14 UTC 2007


Author: renner
Date: 2007-06-08 06:48:14 -0400 (Fri, 08 Jun 2007)
New Revision: 10531

Modified:
   torflow/trunk/TorCtl/PathSupport.py
   torflow/trunk/op-addon.py
Log:

  Introduced a new method of measuring all circuits in a pool by simply using one connection
  and pass it on to the next circuit on a DETACHED + more refactorings.



Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py	2007-06-08 09:30:58 UTC (rev 10530)
+++ torflow/trunk/TorCtl/PathSupport.py	2007-06-08 10:48:14 UTC (rev 10531)
@@ -18,7 +18,8 @@
 "UniqueRestriction", "UniformGenerator", "OrderedExitGenerator",
 "PathSelector", "Connection", "NickRestriction", "IdHexRestriction",
 "PathBuilder", "SelectionManager", "CountryCodeRestriction", 
-"CountryRestriction", "UniqueCountryRestriction", "ContinentRestriction" ]
+"CountryRestriction", "UniqueCountryRestriction", "ContinentRestriction",
+"ContinentJumperRestriction"]
 
 #################### Path Support Interfaces #####################
 

Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py	2007-06-08 09:30:58 UTC (rev 10530)
+++ torflow/trunk/op-addon.py	2007-06-08 10:48:14 UTC (rev 10531)
@@ -4,7 +4,6 @@
   Copyright (C) 2007 Johannes Renner 
   Contact: renner at i4.informatik.rwth-aachen.de
 """
-
 # Addon for Onion Proxies (prototype-v0.0-alpha):
 # Shall eventually improve the performance of anonymous communications 
 # and browsing by measuring RTTs of circuits/links, receiving infos
@@ -17,6 +16,7 @@
 import copy
 import math
 import time
+import random
 import socket
 import threading
 import Queue
@@ -45,25 +45,27 @@
 timeout_limit = 1
 slowness_limit = 3
 # Slow RTT := x seconds 
-slow = 1.5
+slow = 0.7
 # Note: Tor-internal lifetime of a circuit is 10 min --> 600/sleep_interval = max-age
 # Sleep interval between working loads in sec
 sleep_interval = 5
 # No of idle circuits to build preemptively
 # TODO: Also configure ports to use
-idle_circuits = 6
+idle_circuits = 5
 
 # Measure complete circuits
 measure_circs = True
 # Set to True if we want to measure partial circuits
 measure_partial_circs = False
 
-# Testing mode: Close circuits after num_tests measures
+# Testing mode: Close circuits after num_tests measures + 
+# involves a FileHandler to write collected data to a file
 testing_mode = False
 # Number of tests per circuit
 num_tests = 5
 
 # Do configuration here TODO: use my_country for src
+# Set src_country below when setting up our location
 path_config = GeoIPSupport.GeoIPConfig(unique_countries = True,
                                        src_country = None,
 				       crossings = 1,
@@ -84,6 +86,9 @@
       use_guards=True,
       geoip_config=path_config)
 
+# Signalize that a round has finished
+finished_event = threading.Event()
+
 ######################################### BEGIN: Connection         #####################
 
 class Connection(TorCtl.Connection):
@@ -106,9 +111,73 @@
   def build_circuit_from_path(self, path):
     """ Build circuit using a given path shall be used to build circs from NetworkModel """
     circ = Circuit()
+    circ.rtt_created = True
+    # Set path to circuit
+    circ.path = path
+    # Set exit
+    circ.exit = path[len(path)-1]
     if len(path) > 0:
-      circ.circ_id = self.extend_circuit(0, path)
+      circ.circ_id = self.extend_circuit(0, circ.id_path())
+    return circ
 
+######################################### Stats                    #####################
+
+class Stats:
+  def __init__(self):
+    self.values = []
+    self.min = 0.0
+    self.max = 0.0
+    self.mean = 0.0
+    self.dev = 0.0
+    self.median = 0.0
+
+  def add_value(self, value):
+    # Append value
+    self.values.append(value)
+    # Set min & max
+    if self.min == 0: self.min = value
+    elif self.min > value: self.min = value
+    if self.max < value: self.max = value
+    # Refresh everything
+    self.mean = self.get_mean()
+    self.dev = self.get_dev()
+    self.median = self.get_median()
+
+  def get_mean(self):
+    """ Compute mean from the values """
+    if len(self.values) > 0:
+      sum = reduce(lambda x, y: x+y, self.values, 0.0)
+      return sum/len(self.values)
+    else:
+      return 0.0
+
+  def get_dev(self):
+    """ Return the stddev of the values """
+    if len(self.values) > 1:
+      mean = self.get_mean()
+      sum = reduce(lambda x, y: x + ((y-mean)**2.0), self.values, 0.0)
+      s = math.sqrt(sum/(len(self.values)-1))
+      return s
+    else:
+      return 0.0
+
+  def get_median(self):
+    """ Return the median of the values """
+    if len(self.values) > 0:
+      self.values.sort()
+      return self.values[(len(self.values)-1)/2]
+    else: return 0.0
+
+class FileHandler:
+  """ FileHandler for appending collected data to a file """
+  def __init__(self, filename):
+    self.filename = filename
+
+  def write(self, line):
+    self.filehandle = open(self.filename, 'a')
+    self.filehandle.write(line + "\n")
+    self.filehandle.close() 
+
 ######################################### Circuit, Stream          #####################
 
 class Circuit(PathSupport.Circuit): 
@@ -118,38 +187,21 @@
     # RTT stuff
     self.part_rtts = {}		# dict of partial rtts, pathlen 3: 1-2-None
     self.current_rtt = None	# double (sec): current value
-    self.rtt_history = []	# rtt history for computing stats:
-    self.avg_rtt = 0		# avg rtt value		
-    self.dev_rtt = 0		# standard deviation
+    self.stats = Stats()	# stats about total RTT contains history
     # Counters and flags
     self.age = 0		# age in rounds
     self.timeout_counter = 0	# timeout limit
     self.slowness_counter = 0 	# slowness limit
     self.closed = False		# mark circuit closed
-
-  def get_avg_rtt(self):
-    """ Compute average from history """
-    if len(self.rtt_history) > 0:
-      sum = reduce(lambda x, y: x+y, self.rtt_history, 0.0)
-      return sum/len(self.rtt_history)
-    else:
-      return 0.0
-
-  def get_dev_rtt(self):
-    """ Return the stddev of measured rtts """
-    if len(self.rtt_history) > 0:
-      avg = self.get_avg_rtt()
-      sum = reduce(lambda x, y: x + ((y-avg)**2.0), self.rtt_history, 0.0)
-      return math.sqrt(sum/len(self.rtt_history))
-    else:
-      return 0.0
-  
+    self.rtt_created = False	# if this was created from the model
+ 
   def add_rtt(self, rtt):
     """ Add a new value and refresh the stats """
+    # Set current
     self.current_rtt = rtt
-    self.rtt_history.append(rtt)
-    self.avg_rtt = self.get_avg_rtt()
-    self.dev_rtt = self.get_dev_rtt()
+    # Add to the stats
+    self.stats.add_value(rtt)
+    # Increase age
     self.age += 1
 
   def to_string(self):
@@ -158,14 +210,17 @@
     for r in self.path: s += " " + r.nickname + "(" + str(r.country_code) + ")"
     if not self.built: s += " (not yet built)"
     else: s += " (age=" + str(self.age) + ")"
-    if self.current_rtt: s += ": " "RTT last/avg/dev: " + str(self.current_rtt) + "/" + str(self.avg_rtt) + "/"+ str(self.dev_rtt)
+    if self.current_rtt: 
+      s += ": " "RTT current/median/mean/dev: "
+      s += str(self.current_rtt) + "/" + str(self.stats.median) + "/"
+      s += str(self.stats.mean) + "/" + str(self.stats.dev)
+    if self.rtt_created: s += "*"
     return s
 
 class Stream(PathSupport.Stream):
-  """ Stream class extended to isPing and hop """
+  """ Stream class extended to hop """
   def __init__(self, sid, host, port, kind):
     PathSupport.Stream.__init__(self, sid, host, port, kind)
-    self.isPing = False		# set this to mark as a ping-stream
     self.hop = None		# save hop if this is a ping, hop=None means complete circ
 
 ######################################### BEGIN: NetworkModel      #####################
@@ -177,30 +232,32 @@
     # Set src and dest
     self.src = src
     self.dest = dest
-    # Setup the history and record RTT
-    self.rtt_history = []
-    self.set_rtt(rtt)
+    # The current value
+    self.current_rtt = 0.0
+    # Setup the stats and record the first RTT
+    self.stats = Stats()
+    self.add_rtt(rtt)
 
-  def set_rtt(self, rtt):
-    self.rtt = rtt
-    self.rtt_history.append(rtt)
+  def add_rtt(self, rtt):
+    self.current_rtt = rtt
+    self.stats.add_value(rtt)
 
 class PathProposal:
   """ Instances of this class are path-proposals """
   def __init__(self, links, path):
     # This is a list of LinkInfo objects
     self.links = links
-    # Also save the path for passing to build_circuit
-    self.path = path
-    # Compute the expected RTT
-    self.rtt = reduce(lambda x,y: x + y.rtt, self.links, 0.0)
+    # Also save the path for passing to build_circuit, cut off ROOT
+    self.path = path[1:len(path)]
+    # Compute the expected RTT (from current value?)
+    self.rtt = reduce(lambda x,y: x + y.current_rtt, self.links, 0.0)
   
   def to_string(self):
     """ Create a string for printing out information """
     s = ""
     for l in self.links:
       # Get the single objects
-      s += l.src.nickname + "--" + l.dest.nickname + " (" + str(l.rtt) + ") " + ", "
+      s += l.src.nickname + "--" + l.dest.nickname + " (" + str(l.current_rtt) + ") " + ", "
     return "Route proposal: " + s + "--> " + str(self.rtt) + " sec" 
 
 class NetworkModel:  
@@ -213,6 +270,7 @@
     # Initially add THIS proxy to the model
     self.root = rooter
     self.graph.add_node(self.root)
+    self.proposals = []
     plog("DEBUG", "NetworkModel initiated: added " + self.root.nickname)
 
   def add_link(self, src, dest, rtt):
@@ -239,16 +297,14 @@
     for p in self.proposals:
       print(p.to_string())
 
-  def check_proposals(self, m, n):
-    """ Check if we have at least m proposals with rtt <= n seconds """
-    i = 0
+  def check_proposals(self, n):
+    """ Return all proposals with rtt <= n seconds """
+    ret = []
     for p in self.proposals:
       if p.rtt <= n:
-        i += 1
-      if p.rtt > n:
-        return False
-      if i == m:
-        return True
+	ret.append(p) 
+    plog("DEBUG", "Found " + str(len(ret)) + " path proposals having RTT <= " + str(n) + " sec")
+    return ret
 
   def visit(self, node, path, i=1):
     """ Recursive Depth-First-Search: Maybe use some existing method? """
@@ -280,7 +336,7 @@
   def __init__(self, c, selmgr):
     # Init the PathBuilder
     PathSupport.PathBuilder.__init__(self, c, selmgr, GeoIPSupport.GeoIPRouter)    
-    self.circs_sorted = []	# list of circs sorted by avg_rtt
+    self.circs_sorted = []	# list of circs sorted by mean RTT
     self.check_circuit_pool()	# bring up the pool of circs
  
   def check_circuit_pool(self):
@@ -295,13 +351,38 @@
       self.build_idle_circuit()
       plog("DEBUG", "Scheduled circuit No. " + str(n+1))
       n += 1
-    self.print_circuits()
 
+  def check_path(self, path):
+    """ Check if we already have a circuit with this path """
+    for c in self.circuits.values():
+      if c.path == path:
+        return False
+    return True
+
   def build_idle_circuit(self):
     """ Build an idle circuit """
     circ = None
     while circ == None:
       try:
+        if measure_partial_circs:
+	  # Get the proposals RTT <= 0.5
+	  proposals = self.model.check_proposals(slow)
+	  # TODO: Ensure we also create new paths (check number of circs with rtt_created)
+	  # TODO: Check if we have > m proposals
+	  while len(proposals) > 0:
+	    choice = random.choice(proposals)
+	    # Check if we already have a circ with this path
+	    if self.check_path(choice.path):
+	      plog("INFO", "Chosen proposal: " + choice.to_string())
+	      circ = self.c.build_circuit_from_path(choice.path)
+	      self.circuits[circ.circ_id] = circ
+	      return
+	    else:
+	      plog("DEBUG", "Proposed circuit already exists")
+	      # Remove from the proposals
+	      proposals.remove(choice)
+	  plog("DEBUG", "Falling back to normal path selection") 
+
         # Build the circuit
 	self.selmgr.set_target("255.255.255.255", 80)
         circ = self.c.build_circuit(self.selmgr.pathlen, self.selmgr.path_selector)
@@ -313,15 +394,14 @@
 
   def print_circuits(self):
     """ Print out our circuits plus some info """
-    #circs = self.circs_sorted
     circs = self.circuits.values()
     plog("INFO", "We have " + str(len(circs)) + " circuits:")
     for c in circs:
       print("+ " + c.to_string())
 
   def refresh_sorted_list(self):
-    """ Sort the list for average RTTs """
-    self.circs_sorted = sort_list(self.circuits.values(), lambda x: x.avg_rtt)
+    """ Sort the list for their mean RTTs """
+    self.circs_sorted = sort_list(self.circuits.values(), lambda x: x.stats.mean)
     plog("DEBUG", "Refreshed sorted list of circuits")
  
   def circ_status_event(self, c):
@@ -350,13 +430,8 @@
       del self.circuits[c.circ_id]
       # Give away pending streams
       for stream in circ.pending_streams:
-        if stream.isPing:
-	  #plog("DEBUG", "Finding new circ for ping stream " + str(stream.strm_id))
-	  # Close the stream?
-	  pass
-	if not stream.isPing:
-	  plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
-          self.attach_stream_any(stream, stream.detached_from)
+	plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
+        self.attach_stream_any(stream, stream.detached_from)
       # Refresh the list 
       self.refresh_sorted_list()
       # Check if there are enough circs
@@ -415,15 +490,18 @@
         # FIXME: Consider actually closing circ if no streams.
         self.circuits[key].dirty = True
 
-    # Choose from the sorted list!  
+    # Choose from the sorted list
+    # TODO: We don't have a sorted list if we don't measure!
     for circ in self.circs_sorted:
       # Only attach if we already measured
-      if circ.built and circ.circ_id not in badcircs and not circ.closed and circ.avg_rtt:
+      if circ.built and not circ.closed and circ.circ_id not in badcircs and circ.current_rtt:
         if circ.exit.will_exit_to(stream.host, stream.port):
           try:
             self.c.attach_stream(stream.strm_id, circ.circ_id)
             stream.pending_circ = circ # Only one possible here
             circ.pending_streams.append(stream)
+	    # Clear cache after the attach?
+	    self.clear_dns_cache()
           except TorCtl.ErrorReply, e:
             # No need to retry here. We should get the failed
             # event for either the circ or stream next
@@ -450,8 +528,50 @@
       self.circuits[circ.circ_id] = circ
     self.last_exit = circ.exit
 
-  # Catch user stream events
-  def handle_other_events(self, s):
+  def stream_status_event(self, s):
+    """ Catch user stream events """
+    # Construct debugging output
+    output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), s.target_host, str(s.target_port)]
+    if s.reason: output.append("REASON=" + s.reason)
+    if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
+    plog("DEBUG", " ".join(output))
+     
+    # If target_host is not an IP-address
+    if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
+      s.target_host = "255.255.255.255" # ignore DNS for exit policy check
+    
+    # NEW or NEWRESOLVE
+    if s.status == "NEW" or s.status == "NEWRESOLVE":
+      if s.status == "NEWRESOLVE" and not s.target_port:
+        s.target_port = self.resolve_port      
+      # Set up the new stream
+      stream = Stream(s.strm_id, s.target_host, s.target_port, s.status)
+      self.streams[s.strm_id] = stream        
+      self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
+    
+    # DETACHED
+    elif s.status == "DETACHED":
+      # Stream not found
+      if s.strm_id not in self.streams:
+        plog("WARN", "Detached stream " + str(s.strm_id) + " not found")
+        self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, "NEW")
+      # Circuit not found
+      if not s.circ_id:
+        plog("WARN", "Stream " + str(s.strm_id) + " detached from no circuit!")
+      else:
+        self.streams[s.strm_id].detached_from.append(s.circ_id)      
+      # Detect timeouts on user streams
+      if s.reason == "TIMEOUT":
+	# Increase a timeout counter on the stream?
+	#self.circuits[s.circ_id].timeout_counter += 1
+	plog("DEBUG", "User stream timed out on circuit " + str(s.circ_id))
+      # Stream was pending
+      if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
+        self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
+      # Attach to another circ
+      self.streams[s.strm_id].pending_circ = None
+      self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
+
     # SUCCEEDED
     if s.status == "SUCCEEDED":
       if s.strm_id not in self.streams:
@@ -483,7 +603,7 @@
         self.streams[s.strm_id].failed = True
 	if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
         elif self.streams[s.strm_id].attached_at != 0: 
-	  plog("WARN","Failed stream on unknown circuit " + str(s.circ_id))
+	  plog("WARN", "Failed stream on unknown circuit " + str(s.circ_id))
 	return
       # CLOSED
       if self.streams[s.strm_id].pending_circ:
@@ -504,22 +624,29 @@
 
 ######################################### BEGIN: PingHandler       #####################
 
-# This class extends the StreamHandler
 class PingHandler(StreamHandler):
+  """ This class extends the general StreamHandler to handle ping-requests """
   def __init__(self, c, selmgr, router):
-    # Init the StreamHandler
-    StreamHandler.__init__(self, c, selmgr)    
     # Anything ping-related
     self.ping_queue = Queue.Queue()	# (circ_id, hop)-pairs
     self.start_times = {}		# dict mapping (circ_id, hop):start_time TODO: cleanup
-    # Start the Pinger that schedules the measurings
-    self.ping_manager = Pinger(self, router)
-    self.ping_manager.setDaemon(True)
-    self.ping_manager.start()
+    # Start the Pinger that triggers the connections
+    self.pinger = Pinger(self)
+    self.pinger.setDaemon(True)
+    self.pinger.start()
+    # Additional stuff for partial measurings
+    if measure_partial_circs:
+      self.router = router			# this object represents this OR
+      self.model = NetworkModel(self.router)	# model for recording link-RTTs
+    # Handle testing_mode
+    if testing_mode:
+      self.filehandler = FileHandler("data/circuits")
+    # Init the StreamHandler
+    StreamHandler.__init__(self, c, selmgr)    
 
   def enqueue_pings(self):
-    """ To be schedule_immediated by ping_manager before the first connection is triggered """
-    # TODO: Empty the queue?
+    """ To be schedule_immediated by pinger before the first connection is triggered """
+    print("\n")
     circs = self.circuits.values()
     for c in circs:
       if c.built:
@@ -534,14 +661,58 @@
 	# And for the whole circuit ...
         self.ping_queue.put((id, None))
         plog("DEBUG", "Enqueued circuit " + str(id) + " hop None")
-  
-  def attach_ping(self, stream, arrived_at):
+ 
+  def compute_link_RTTs(self):
+    """ Get the circs and check if we can compute RTTs of single links and store these in the model """    
+    circs = self.circuits.values()
+    # Measure also the duration
+    start = time.time()
+    for c in circs:
+      # Get the length
+      path_len = len(c.path)
+      # Go through the path
+      for i in xrange(1,path_len):
+        if i in c.part_rtts:
+          # First hop --> add Link from Root to 1
+          if i == 1:
+	    link_rtt = c.part_rtts[i]
+	    self.model.add_link(self.router, c.path[i-1], link_rtt)
+	  # Handle i -- (i+1)
+          if i+1 in c.part_rtts:
+            link_rtt = c.part_rtts[i+1] - c.part_rtts[i]
+	    if link_rtt > 0:          
+	      plog("INFO", "Computed link-RTT: " + str(link_rtt))
+	      # Save to NetworkModel
+	      self.model.add_link(c.path[i-1], c.path[i], link_rtt)
+	    else:
+	      plog("WARN", "Negative link-RTT: " + str(link_rtt))
+	  # Handle (n-1) -- n
+	  elif None in c.part_rtts:
+            # We have a total value
+	    link_rtt = c.part_rtts[None] - c.part_rtts[i]
+	    if link_rtt > 0:          
+	      plog("INFO", "Computed link-RTT: " + str(link_rtt))
+	      # Save to NetworkModel
+	      self.model.add_link(c.path[i-1], c.path[i], link_rtt)
+	    else:
+	      plog("WARN", "Negative link-RTT: " + str(link_rtt))
+    plog("DEBUG", "Computation of link-RTTs took us " + str(time.time()-start) + " seconds")
+    # Print out the model
+    self.model.print_graph()
+    self.model.find_circuits()
+
+  def attach_ping(self, stream):
     """ Attach a ping stream to its circuit """
-    plog("DEBUG", "New ping request")
-    # Check if there is something in the queue
     if self.ping_queue.empty():
-      plog("DEBUG", "Queue is empty --> discarding ping stream " + str(stream.strm_id))
+      # This round has finished
+      plog("INFO", "Queue is empty --> round finished, closing stream " + str(stream.strm_id))
       self.c.close_stream(stream.strm_id, 5)
+      # Fire the event
+      finished_event.set()
+      # Call the rest from here?
+      self.print_circuits()
+      if measure_partial_circs:
+        self.compute_link_RTTs()
       return
     else:
       # Get the info and extract
@@ -555,142 +726,111 @@
         if circ_id in self.circuits:
           circ = self.circuits[circ_id]
           if circ.built and not circ.closed:        
-            self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
-            # Measure here or move to before attaching?
-            self.start_times[(circ_id, hop)] = arrived_at
-	    stream.hop = hop
-            stream.pending_circ = circ # Only one possible here
-            circ.pending_streams.append(stream)
+            stream.hop = hop
+	    self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
+            # Don't use pending for pings
           else:
-            plog("WARN", "Circuit not built")
+            plog("WARN", "Circuit not built or closed")
+	    self.attach_ping(stream)
         else:
-          # Close stream if circuit is gone
-          plog("WARN", "Circuit does not exist anymore, closing stream " + str(stream.strm_id))
-          self.c.close_stream(stream.strm_id, 5)
+          # Go to next test if circuit is gone
+          plog("WARN", "Circuit " + str(circ_id) + " does not exist anymore --> passing")
+          self.attach_ping(stream)
       except TorCtl.ErrorReply, e:
         plog("WARN", "Error attaching stream: " + str(e.args))
 
-  # TODO: Separate pings from normal streams directly, to make StreamHandler usable even without this ..
   def stream_status_event(self, s):
-    """ Catch stream status events: Handle NEW and DETACHED here, 
-        pass other events to StreamHandler """
+    """ Separate Pings from regular streams directly """
+    if not (s.target_host == ping_dummy_host and s.target_port == ping_dummy_port):
+      # This is no ping, call the other method
+      return StreamHandler.stream_status_event(self, s)
+    
     # Construct debugging output
     output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), s.target_host, str(s.target_port)]
     if s.reason: output.append("REASON=" + s.reason)
     if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
     plog("DEBUG", " ".join(output))
-     
-    # If target_host is not an IP-address
-    if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
-      s.target_host = "255.255.255.255" # ignore DNS for exit policy check
-    
+ 
     # NEW or NEWRESOLVE
-    if s.status == "NEW" or s.status == "NEWRESOLVE":
-      if s.status == "NEWRESOLVE" and not s.target_port:
-        s.target_port = self.resolve_port      
-      # Set up the new stream
+    if s.status == "NEW":
+      # Set up the stream object
       stream = Stream(s.strm_id, s.target_host, s.target_port, s.status)
       self.streams[s.strm_id] = stream        
-      # (Double-)Check if this is a ping stream
-      if (stream.host == ping_dummy_host) & (stream.port == ping_dummy_port):
-        # Set isPing
-	stream.isPing = True
-        self.attach_ping(stream, s.arrived_at)
+      self.attach_ping(stream)
+
+    # SENTCONNECT 
+    elif s.status == "SENTCONNECT":
+      # Measure here, means save arrived_at in the dict
+      self.start_times[(s.circ_id, self.streams[s.strm_id].hop)] = s.arrived_at
+  
+    # DETACHED (CLOSED + TORPROTOCOL is also ping, some routers send it when measuring 1-hop)
+    elif s.status == "DETACHED" or (s.status == "CLOSED" and s.remote_reason == "TORPROTOCOL"):
+      if (s.reason == "TIMEOUT"):
+        self.circuits[s.circ_id].timeout_counter += 1
+	self.circuits[s.circ_id].slowness_counter += 1
+	plog("DEBUG", str(self.circuits[s.circ_id].timeout_counter) + " timeout(s) on circuit " + str(s.circ_id))
+	if self.circuits[s.circ_id].timeout_counter >= timeout_limit and not self.circuits[s.circ_id].closed:
+	  # Close the circuit
+	  plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
+	  self.circuits[s.circ_id].closed = True
+	  try: self.c.close_circuit(s.circ_id)
+	  except TorCtl.ErrorReply, e: 
+	    plog("ERROR", "Failed closing circuit " + str(s.circ_id) + ": " + str(e))	    
+	# Set RTT for circ to None
+	self.circuits[s.circ_id].current_rtt = None
+      
       else:
-        self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
-    
-    # DETACHED
-    elif s.status == "DETACHED":
-      # Stream not found
-      if s.strm_id not in self.streams:
-        plog("WARN", "Detached stream " + str(s.strm_id) + " not found")
-        self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, "NEW")
-      # s.circ_id not found
-      if not s.circ_id:
-        plog("WARN", "Stream " + str(s.strm_id) + " detached from no circuit!")
-      else:
-        self.streams[s.strm_id].detached_from.append(s.circ_id)
+        # No timeout, this is a successful ping: measure here	  
+        hop = self.streams[s.strm_id].hop
+        # Compute RTT using arrived_at 
+        rtt = s.arrived_at - self.start_times[(s.circ_id, hop)]
+        plog("INFO", "Measured RTT: " + str(rtt) + " sec")
+        # Save RTT to circuit
+        self.circuits[s.circ_id].part_rtts[hop] = rtt
 
-      # If this is a ping
-      if self.streams[s.strm_id].isPing:
-        if (s.reason == "TIMEOUT"):
-	  self.circuits[s.circ_id].timeout_counter += 1
-	  self.circuits[s.circ_id].slowness_counter += 1
-	  plog("DEBUG", str(self.circuits[s.circ_id].timeout_counter) + " timeout(s) on circuit " + str(s.circ_id))
-	  if self.circuits[s.circ_id].timeout_counter >= timeout_limit and not self.circuits[s.circ_id].closed:
-	    # Close the circuit
-	    plog("DEBUG", "Reached limit on timeouts --> closing circuit " + str(s.circ_id))
-	    self.circuits[s.circ_id].closed = True
-	    try: self.c.close_circuit(s.circ_id)
-	    except TorCtl.ErrorReply, e: 
-	      plog("ERROR", "Failed closing circuit " + str(s.circ_id) + ": " + str(e))	    
-	  # Set RTT for circ to None
-	  self.circuits[s.circ_id].current_rtt = None
-	  # Only close the stream
-          self.c.close_stream(s.strm_id, 7)
-	  return
-        # This is a successful ping: measure here	  
-	hop = self.streams[s.strm_id].hop
-	# Compute RTT using arrived_at 
-	rtt = s.arrived_at - self.start_times[(s.circ_id, hop)]
-        plog("INFO", "Measured RTT: " + str(rtt) + " sec")
-	# Save RTT to circuit
-	self.circuits[s.circ_id].part_rtts[hop] = rtt
-	if hop == None:
-	  # This is a total circuit measuring
+        if hop == None:
+          # This is a total circuit measuring
 	  self.circuits[s.circ_id].add_rtt(rtt)
-	  plog("DEBUG", "Added RTT to history: " + str(self.circuits[s.circ_id].rtt_history))
-          
+	  plog("DEBUG", "Added RTT to history: " + str(self.circuits[s.circ_id].stats.values))
+	  
+	  # Close if num_tests is reached          
 	  if testing_mode:
-	    # Close if num_tests is reached
 	    if self.circuits[s.circ_id].age >= num_tests:
-	      self.print_circuits()
+	      plog("DEBUG", "Closing circ " + str(s.circ_id) + ": num_tests is reached")
 	      self.circuits[s.circ_id].closed = True
+	      # Save stats to a file in for generating plots etc.
+	      self.filehandler.write(str(self.circuits[s.circ_id].stats.mean) + "\t" + str(self.circuits[s.circ_id].stats.dev))
 	      self.c.close_circuit(s.circ_id)
 
-	  # Close if slow-max is reached on avg_rtt
-          if self.circuits[s.circ_id].avg_rtt >= slow:
+	  # Close if slow-max is reached on mean RTT
+          if self.circuits[s.circ_id].stats.mean >= slow:
 	    self.circuits[s.circ_id].slowness_counter += 1
-	    plog("DEBUG", "Slow circuit " + str(s.circ_id))
 	    if self.circuits[s.circ_id].slowness_counter >= slowness_limit and not self.circuits[s.circ_id].closed:
 	      plog("DEBUG", "Slow-max (" + str(slowness_limit) + ") is reached --> closing circuit " + str(s.circ_id))
 	      self.circuits[s.circ_id].closed = True
 	      self.c.close_circuit(s.circ_id)
 
-	# Resort every time ??
-	self.refresh_sorted_list()
-	# Close the stream
-        self.c.close_stream(s.strm_id, 6)
+          # Resort only if this is for the complete circ
+          self.refresh_sorted_list()
+
+      if s.status == "CLOSED":
+        # Stream is gone .. we have to create a new ping :(
+        t = threading.Thread(None, self.pinger.ping, "Ping")
+	t.setDaemon(True)
+	t.start()
 	return
-      
-      # Detect timeouts on user streams
-      if s.reason == "TIMEOUT":
-	# Don't increase counter, cause could be a machine that's not responding
-	#self.circuits[s.circ_id].timeout_counter += 1
-	plog("DEBUG", "User stream timed out on circuit " + str(s.circ_id))
 
-      # Stream was pending
-      if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
-        self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
-      # Attach to another circ
-      self.streams[s.strm_id].pending_circ = None
-      self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
-    
-    else: 
-      self.handle_other_events(s)
+      # Call attach ping here and use only one stream for all tests
+      self.attach_ping(self.streams[s.strm_id])
+      return
 
 ######################################### BEGIN: Pinger            #####################
 
 class Pinger(threading.Thread):
   """ Separate thread that triggers the Socks4-connections for pings """
-  def __init__(self, ping_handler, router=None):
-    self.handler = ping_handler		# The PingHandler
-    self.router = router		# This router object represents us
-    if measure_partial_circs:
-      # Create the model for recording link-RTTs only if wished
-      self.model = NetworkModel(self.router)
-    # Call thread-constructor
-    threading.Thread.__init__(self)
+  def __init__(self, ping_handler):
+    self.handler = ping_handler		# the PingHandler
+    threading.Thread.__init__(self)	# call the thread-constructor
   
   def run(self):
     """ The run()-method """
@@ -700,28 +840,16 @@
 
   def do_work(self):
     """ Do the work """
-    # Measure RTTs of complete circuits
-    if measure_circs:
-      # Schedule to enqueue all built circs
-      def notlambda(h): h.enqueue_pings() 
-      self.handler.schedule_immediate(notlambda)
-      # Get number of _all_ circs (>= no. of circs built)
-      i = len(self.handler.circuits)
-      for x in xrange(1, i+1):
-        # Maybe run in parallel (separate threads)?
-	plog("DEBUG", "Triggered ping " + str(x) + "/" + str(i))
-	self.ping()
-      
-      # Compute link-RTTs
-      if measure_partial_circs:
-        self.compute_link_RTTs()
-        # Check if we have m proposals with rtt <= n
-        if self.model.check_proposals(10, 0.5):
-          plog("INFO", "We now have enough proposals!")
-    
-    # Print circuits for logging
-    self.handler.schedule_immediate(lambda x: x.print_circuits())
-
+    # Event is only needed, because some routers close our connection if trying 
+    # to use them as one-hop, so we need to create a new connection sometimes and
+    # cannot rely on the failing of our first connection
+    finished_event.clear()
+    # Let all circs to test be enqueued 
+    self.handler.schedule_immediate(lambda x: x.enqueue_pings())
+    # Simply trigger only _one_ connection
+    self.ping()
+    finished_event.wait()
+  
   # No "try .. except .. finally .." in Python < 2.5 !
   def ping(self):
     """ Create a connection to dummy_host/_port using Socks4 """
@@ -739,44 +867,6 @@
       # Close the socket if open
       if s: s.close()
 
-  def compute_link_RTTs(self):
-    """ Get a copy of the circs and check if we can compute links 
-        and store these in the model """    
-    # TODO: Refactor: move to PingHandler or get a _copy_ of the circuits
-    circs = self.handler.circuits.values()
-    for c in circs:
-      # Get the length
-      path_len = len(c.path)
-      # Go through the path
-      for i in xrange(1,path_len):
-        if i in c.part_rtts:
-          # First hop --> add Link from Root to 1
-          if i == 1:
-	    link_rtt = c.part_rtts[i]
-	    self.model.add_link(self.router, c.path[i-1], link_rtt)
-	  # Handle i -- (i+1)
-          if i+1 in c.part_rtts:
-            link_rtt = c.part_rtts[i+1] - c.part_rtts[i]
-	    if link_rtt > 0:          
-	      plog("INFO", "Computed link RTT = " + str(link_rtt))
-	      # Save to NetworkModel
-	      self.model.add_link(c.path[i-1], c.path[i], link_rtt)
-	    else:
-	      plog("WARN", "Negative RTT: " + str(link_rtt))
-	  # Handle (n-1) -- n
-	  elif None in c.part_rtts:
-            # We have a total value
-	    link_rtt = c.part_rtts[None] - c.part_rtts[i]
-	    if link_rtt > 0:          
-	      plog("INFO", "Computed link RTT = " + str(link_rtt))
-	      # Save to NetworkModel
-	      self.model.add_link(c.path[i-1], c.path[i], link_rtt)
-	    else:
-	      plog("WARN", "Negative RTT: " + str(link_rtt))
-    # Print out the model
-    self.model.print_graph()
-    self.model.find_circuits()
-
 ######################################### END: Pinger              #####################
 
 def connect(control_host, control_port):
@@ -787,18 +877,21 @@
  
 def setup_location(conn):
   """ Setup a router object representing this proxy """
+  global path_config
   plog("INFO","Setting up our location")
+  ip = 0
+  # Try to get our IP from Tor
   try:
-    # Get our IP from Tor and set up a router object
     info = conn.get_info("address")
     ip = info["address"]
-    router = GeoIPSupport.GeoIPRouter(TorCtl.Router(None,"ROOT",None,False,None,None,ip,None,None))
-    #country = GeoIPSupport.geoip.country_code_by_addr(my_ip)
-    #country = GeoIPSupport.get_country_from_record(my_ip)
-    plog("INFO", "Our IP address is " + router.get_ip_dotted() + " [" + router.country_code + "]")
-    return router
   except: 
     plog("ERROR", "Could not get our IP")
+  # Set up a router object
+  router = GeoIPSupport.GeoIPRouter(TorCtl.Router(None,"ROOT",None,False,None,None,ip,None,None))
+  plog("INFO", "Our IP address is " + router.get_ip_dotted() + " [" + router.country_code + "]")
+  # To be configured
+  path_config.src_country = router.country_code
+  return router
  
 def configure(conn):
   """ Set events and options """
@@ -820,7 +913,10 @@
   # Configure myself  
   configure(conn)
   # Set Handler to the connection
-  handler = PingHandler(conn, __selmgr, router)
+  if measure_circs:
+    handler = PingHandler(conn, __selmgr, router)
+  else:
+    handler = StreamHandler(conn, __selmgr)
   conn.set_event_handler(handler)
   # Go to sleep to be able to get killed from the commandline
   try:



More information about the tor-commits mailing list