[or-cvs] r10502: Finally refactored in the desired way, not needing any Lock- (torflow/trunk)

renner at seul.org renner at seul.org
Tue Jun 5 17:40:20 UTC 2007


Author: renner
Date: 2007-06-05 13:40:19 -0400 (Tue, 05 Jun 2007)
New Revision: 10502

Modified:
   torflow/trunk/op-addon.py
Log:

  Finally refactored in the desired way, not needing any Lock-objects anymore + introduced
  testing_mode that closes every circuit after n measurings to collect data.



Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py	2007-06-05 10:36:30 UTC (rev 10501)
+++ torflow/trunk/op-addon.py	2007-06-05 17:40:19 UTC (rev 10502)
@@ -4,6 +4,7 @@
   Copyright (C) 2007 Johannes Renner 
   Contact: renner at i4.informatik.rwth-aachen.de
 """
+
 # Addon for Onion Proxies (prototype-v0.0-alpha):
 # Shall eventually improve the performance of anonymous communications 
 # and browsing by measuring RTTs of circuits/links, receiving infos
@@ -19,6 +20,7 @@
 import socket
 import threading
 import Queue
+#import ConfigParser
 
 # Non-standard packages
 import socks
@@ -27,8 +29,6 @@
 # TorCtl
 import TorCtl.PathSupport
 import TorCtl.GeoIPSupport
-
-# From .. import ..
 from TorCtl import *
 from TorCtl.TorUtil import plog, sort_list
 
@@ -37,7 +37,6 @@
 control_port = 9051
 socks_host = "127.0.0.1"
 socks_port = 9050
-
 # Any ideas/proposals?
 ping_dummy_host = "127.0.0.1"
 ping_dummy_port = 100
@@ -48,27 +47,27 @@
 # Slow RTT := x seconds 
 slow = 1.5
 # Note: Tor-internal lifetime of a circuit is 10 min --> 600/sleep_interval = max-age
-# Set interval between working loads in sec
-sleep_interval = 10
+# Sleep interval between working loads in sec
+sleep_interval = 5
 # No of idle circuits to build preemptively
 # TODO: Also configure ports to use
 idle_circuits = 6
 
+# Measure complete circuits
+measure_circs = True
 # Set to True if we want to measure partial circuits
 measure_partial_circs = False
 
-# Still needed: Lock object for regulating access to the circuit list
-circs_lock = threading.Lock()
+# Testing mode: Close circuits after num_tests measures
+testing_mode = False
+# Number of tests per circuit
+num_tests = 5
 
-# Infos about this proxy TODO: Save in some class: Router?
-my_ip = None
-my_country = None
-
 # Do configuration here TODO: use my_country for src
-path_config = GeoIPSupport.GeoIPConfig(unique_countries = False,
+path_config = GeoIPSupport.GeoIPConfig(unique_countries = True,
                                        src_country = None,
 				       crossings = 1,
-				       excludes = ["FR"])
+				       excludes = [])
 
 # Configure Selection Manager here!!
 # Do NOT modify this object directly after it is handed to 
@@ -87,8 +86,8 @@
 
 ######################################### BEGIN: Connection         #####################
 
-# Circuit building code here
 class Connection(TorCtl.Connection):
+  """ Connection class that uses my Circuit class """
   def build_circuit(self, pathlen, path_sel):
     circ = Circuit()
     if pathlen == 1:
@@ -104,27 +103,21 @@
       circ.circ_id = self.extend_circuit(0, circ.id_path())
     return circ
 
-  def build_circuit_80(self, selmgr):
-    """ Set target port to 80 """
-    selmgr.set_target("255.255.255.255", 80)
-    return self.build_circuit(selmgr.pathlen, selmgr.path_selector)
-
   def build_circuit_from_path(self, path):
-    """ Build circuit using a given path """
+    """ Build circuit using a given path shall be used to build circs from NetworkModel """
     circ = Circuit()
     if len(path) > 0:
       circ.circ_id = self.extend_circuit(0, path)
 
-######################################### END: Connection          #####################
 ######################################### Circuit, Stream          #####################
 
-# Circuit class extended to RTTs
-class Circuit(PathSupport.Circuit):  
+class Circuit(PathSupport.Circuit): 
+  """ Circuit class extended to RTTs and related stats """
   def __init__(self):
     PathSupport.Circuit.__init__(self)
     # RTT stuff
-    self.current_rtt = None	# double (sec): current value
     self.part_rtts = {}		# dict of partial rtts, pathlen 3: 1-2-None
+    self.current_rtt = None	# double (sec): current value
     self.rtt_history = []	# rtt history for computing stats:
     self.avg_rtt = 0		# avg rtt value		
     self.dev_rtt = 0		# standard deviation
@@ -150,12 +143,24 @@
       return math.sqrt(sum/len(self.rtt_history))
     else:
       return 0.0
-
-  def refresh_stats(self):
-    """ Refresh the stats """
+  
+  def add_rtt(self, rtt):
+    """ Add a new value and refresh the stats """
+    self.current_rtt = rtt
+    self.rtt_history.append(rtt)
     self.avg_rtt = self.get_avg_rtt()
     self.dev_rtt = self.get_dev_rtt()
+    self.age += 1
 
+  def to_string(self):
+    """ Create a string representation """
+    s = "Circuit " + str(self.circ_id) + ": "
+    for r in self.path: s += " " + r.nickname + "(" + str(r.country_code) + ")"
+    if not self.built: s += " (not yet built)"
+    else: s += " (age=" + str(self.age) + ")"
+    if self.current_rtt: s += ": " "RTT last/avg/dev: " + str(self.current_rtt) + "/" + str(self.avg_rtt) + "/"+ str(self.dev_rtt)
+    return s
+
 class Stream(PathSupport.Stream):
   """ Stream class extended to isPing and hop """
   def __init__(self, sid, host, port, kind):
@@ -163,7 +168,6 @@
     self.isPing = False		# set this to mark as a ping-stream
     self.hop = None		# save hop if this is a ping, hop=None means complete circ
 
-######################################### Circuit, Stream          #####################
 ######################################### BEGIN: NetworkModel      #####################
 
 class LinkInfo:
@@ -183,9 +187,11 @@
 
 class PathProposal:
   """ Instances of this class are path-proposals """
-  def __init__(self, links):
+  def __init__(self, links, path):
     # This is a list of LinkInfo objects
     self.links = links
+    # Also save the path for passing to build_circuit
+    self.path = path
     # Compute the expected RTT
     self.rtt = reduce(lambda x,y: x + y.rtt, self.links, 0.0)
   
@@ -233,13 +239,25 @@
     for p in self.proposals:
       print(p.to_string())
 
+  def check_proposals(self, m, n):
+    """ Check if we have at least m proposals with rtt <= n seconds """
+    i = 0
+    for p in self.proposals:
+      if p.rtt <= n:
+        i += 1
+      if p.rtt > n:
+        return False
+      if i == m:
+        return True
+
   def visit(self, node, path, i=1):
     """ Recursive Depth-First-Search: Maybe use some existing method? """
     if node not in path:
       path.append(node)
       # Root -- Exit
       if len(path) == 4:
-        self.proposals.append(PathProposal(self.get_link_info(path)))
+        # We found a possible circuit: add to the proposals
+        self.proposals.append(PathProposal(self.get_link_info(path), path))
       else:
         self.prefixes[i] = path
 	# G is also a dict
@@ -250,32 +268,28 @@
   def print_graph(self):
     """ Print current info about the graph """
     print(self.graph.info())
-    # Print edges
     #for e in self.graph.edges():
     #  src, dest, link = e
     #  plog("INFO", "Edge: " + src.nickname + " -- " + dest.nickname + ", RTT = " + str(link.rtt) + " sec")
 
-######################################### END: NetworkModel        #####################
 ######################################### BEGIN: EventHandlers     #####################
 
-# CircuitHandler extending PathBuilder
+# TODO: Store the number of circuits here
 class CircuitHandler(PathSupport.PathBuilder):
+  """ CircuitHandler that extends from PathBuilder """
   def __init__(self, c, selmgr):
     # Init the PathBuilder
     PathSupport.PathBuilder.__init__(self, c, selmgr, GeoIPSupport.GeoIPRouter)    
-    # Maintain a sorted list of circs that gets regenerated regularly
-    self.circs_sorted = []    
-    # Bring up the circuits
-    self.check_circuit_pool()
+    self.circs_sorted = []	# list of circs sorted by avg_rtt
+    self.check_circuit_pool()	# bring up the pool of circs
  
   def check_circuit_pool(self):
     """ Init or check the status of our pool of circuits """
-    # Get number of circuits
-    circs_lock.acquire()
+    # Get current number of circuits
     n = len(self.circuits.values())
-    circs_lock.release() 
-    # Some Logging
-    plog("INFO", "Checked circuit-pool: building " + str(idle_circuits-n) + " circuits")
+    i = idle_circuits-n
+    if i > 0:
+      plog("INFO", "Checked pool of circuits: we need to build " + str(i) + " circuits")
     # Schedule (idle_circuits-n) circuit-buildups
     while (n < idle_circuits):      
       self.build_idle_circuit()
@@ -289,11 +303,9 @@
     while circ == None:
       try:
         # Build the circuit
-        circ = self.c.build_circuit_80(self.selmgr)
-        # Using lock:
-	circs_lock.acquire()
+	self.selmgr.set_target("255.255.255.255", 80)
+        circ = self.c.build_circuit(self.selmgr.pathlen, self.selmgr.path_selector)
 	self.circuits[circ.circ_id] = circ
-        circs_lock.release()
       except TorCtl.ErrorReply, e:
         # FIXME: How come some routers are non-existant? Shouldn't
         # we have gotten an NS event to notify us they disappeared?
@@ -301,25 +313,15 @@
 
   def print_circuits(self):
     """ Print out our circuits plus some info """
-    circs_lock.acquire()
     #circs = self.circs_sorted
     circs = self.circuits.values()
-    plog("INFO", "We have " + str(len(circs)) + " circuits")
+    plog("INFO", "We have " + str(len(circs)) + " circuits:")
     for c in circs:
-      out = "+ Circuit " + str(c.circ_id) + ": "
-      for r in c.path: out += " " + r.nickname + "(" + str(r.country_code) + ")"
-      if not c.built: out += " (not yet built)"
-      else: out += " (age=" + str(c.age) + ")"
-      if c.current_rtt: out += ": " "RTT last/avg/dev: " + str(c.current_rtt) + "/" + str(c.avg_rtt) + "/"+ str(c.dev_rtt) + ""
-      print(out)
-    circs_lock.release()
+      print("+ " + c.to_string())
 
-  # Call after each measuring
   def refresh_sorted_list(self):
-    # Sort the list for average RTTs
-    circs_lock.acquire()
+    """ Sort the list for average RTTs """
     self.circs_sorted = sort_list(self.circuits.values(), lambda x: x.avg_rtt)
-    circs_lock.release()
     plog("DEBUG", "Refreshed sorted list of circuits")
  
   def circ_status_event(self, c):
@@ -330,18 +332,15 @@
     if c.reason: output.append("REASON=" + c.reason)
     if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
     plog("DEBUG", " ".join(output))
-    # Acquire lock here
-    circs_lock.acquire()
+    
     # Circuits we don't control get built by Tor
     if c.circ_id not in self.circuits:
       plog("DEBUG", "Ignoring circuit " + str(c.circ_id) + " (controlled by Tor or not yet in the list)")
-      circs_lock.release()
       return
     
     # EXTENDED
     if c.status == "EXTENDED":
       self.circuits[c.circ_id].last_extended_at = c.arrived_at
-      circs_lock.release()
     
     # FAILED & CLOSED
     elif c.status == "FAILED" or c.status == "CLOSED":
@@ -349,7 +348,6 @@
       circ = self.circuits[c.circ_id]
       # Actual removal of the circ
       del self.circuits[c.circ_id]
-      circs_lock.release()
       # Give away pending streams
       for stream in circ.pending_streams:
         if stream.isPing:
@@ -375,43 +373,38 @@
         # No need to retry here. We should get the failed
         # event for either the circ or stream next
         plog("WARN", "Error attaching stream: " + str(e.args))
-        circs_lock.release()
 	return
-      circs_lock.release()
     
     # OTHER?
     else:
       # If this was e.g. a LAUNCHED
-      circs_lock.release()
+      pass
 
-######################################### END: CircuitHandler       #####################
 ######################################### BEGIN: StreamHandler      #####################
 
-# StreamHandler that extends CircuitHandler
-class StreamHandler(CircuitHandler):  
+class StreamHandler(CircuitHandler):
+  """ This is a StreamHandler that extends from the CircuitHandler """
   def __init__(self, c, selmgr):    
     # Call constructor of superclass
     CircuitHandler.__init__(self, c, selmgr)
+    # NEWNYM is needed for testing bandwidth
+    #self.new_nym = True
  
-  # Send signal "CLEARDNSCACHE"
   def clear_dns_cache(self):
+    """ Send signal CLEARDNSCACHE """
     lines = self.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
     for _, msg, more in lines:
       plog("DEBUG", "CLEARDNSCACHE: " + msg)
 
-  # Attach a regular user stream
   def attach_stream_any(self, stream, badcircs):
-    
-    # To be able to always choose the fastest:
-    # slows down attaching?
-    self.clear_dns_cache()
-    
+    """ Attach a regular user stream """
+    # To be able to always choose the fastest: slows down attaching?
+    #self.clear_dns_cache()
     # Newnym, and warn if not built plus pending
     unattached_streams = [stream]
     if self.new_nym:
       self.new_nym = False
       plog("DEBUG", "Obeying new nym")
-      circs_lock.acquire()
       for key in self.circuits.keys():
         if (not self.circuits[key].dirty and len(self.circuits[key].pending_streams)):
           plog("WARN", "New nym called, destroying circuit "+str(key)
@@ -421,12 +414,11 @@
           self.circuits[key].pending_streams.clear()
         # FIXME: Consider actually closing circ if no streams.
         self.circuits[key].dirty = True
-      circs_lock.release()
 
     # Choose from the sorted list!  
     for circ in self.circs_sorted:
       # Only attach if we already measured
-      if circ.built and circ.current_rtt and circ.circ_id not in badcircs and not circ.closed:
+      if circ.built and circ.circ_id not in badcircs and not circ.closed and circ.avg_rtt:
         if circ.exit.will_exit_to(stream.host, stream.port):
           try:
             self.c.attach_stream(stream.strm_id, circ.circ_id)
@@ -455,9 +447,7 @@
         u.pending_circ = circ      
       circ.pending_streams.extend(unattached_streams)
       # Problem here??
-      circs_lock.acquire()
       self.circuits[circ.circ_id] = circ
-      circs_lock.release()
     self.last_exit = circ.exit
 
   # Catch user stream events
@@ -473,9 +463,7 @@
         plog("WARN", "Mismatch of pending: "
           + str(self.streams[s.strm_id].pending_circ.circ_id) + " vs "
           + str(s.circ_id))
-        circs_lock.acquire()
 	self.streams[s.strm_id].circ = self.circuits[s.circ_id]
-        circs_lock.release()
       else:
         self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
       self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
@@ -493,11 +481,9 @@
       if s.status == "FAILED":
         # Avoid busted circuits that will not resolve or carry traffic
         self.streams[s.strm_id].failed = True
-        circs_lock.acquire()
 	if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
         elif self.streams[s.strm_id].attached_at != 0: 
 	  plog("WARN","Failed stream on unknown circuit " + str(s.circ_id))
-        circs_lock.release()
 	return
       # CLOSED
       if self.streams[s.strm_id].pending_circ:
@@ -516,55 +502,75 @@
         self.streams[s.strm_id].host = s.target_host
         self.streams[s.strm_id].port = s.target_port
 
-######################################### END: StreamHandler       #####################
 ######################################### BEGIN: PingHandler       #####################
 
 # This class extends the StreamHandler
 class PingHandler(StreamHandler):
-  def __init__(self, c, selmgr, router, partial):
+  def __init__(self, c, selmgr, router):
     # Init the StreamHandler
     StreamHandler.__init__(self, c, selmgr)    
     # Anything ping-related
     self.ping_queue = Queue.Queue()	# (circ_id, hop)-pairs
     self.start_times = {}		# dict mapping (circ_id, hop):start_time TODO: cleanup
-    # Start the Pinger that schedules the ping-connections
-    self.ping_manager = Pinger(self, router, partial)
+    # Start the Pinger that schedules the measurings
+    self.ping_manager = Pinger(self, router)
     self.ping_manager.setDaemon(True)
     self.ping_manager.start()
 
-  # Attach a ping stream to its circuit
+  def enqueue_pings(self):
+    """ To be schedule_immediated by ping_manager before the first connection is triggered """
+    # TODO: Empty the queue?
+    circs = self.circuits.values()
+    for c in circs:
+      if c.built:
+        # Get id of c
+      	id = c.circ_id
+        if measure_partial_circs:
+	  # If partial measures wanted: get length
+	  path_len = len(c.path)
+	  for i in xrange(1, path_len):
+            self.ping_queue.put((id, i))
+            plog("DEBUG", "Enqueued circuit " + str(id) + " hop " + str(i))
+	# And for the whole circuit ...
+        self.ping_queue.put((id, None))
+        plog("DEBUG", "Enqueued circuit " + str(id) + " hop None")
+  
   def attach_ping(self, stream, arrived_at):
+    """ Attach a ping stream to its circuit """
     plog("DEBUG", "New ping request")
-    # Get info from the Queue TODO: check if empty
-    ping_info = self.ping_queue.get()
-    # Extract ping-info
-    circ_id = ping_info[0]
-    hop = ping_info[1]
-    # Set circ to stream
-    stream.circ = circ_id
-    try:
-      circs_lock.acquire()
-      # Get the circuit 
-      if circ_id in self.circuits:
-        circ = self.circuits[circ_id]
-	# TODO: and not circ.busy
-        if circ.built and not circ.closed:        
-          self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
-          # Measure here or move to before attaching?
-          self.start_times[(circ_id, hop)] = arrived_at
-	  stream.hop = hop
-          stream.pending_circ = circ # Only one possible here
-          circ.pending_streams.append(stream)
+    # Check if there is something in the queue
+    if self.ping_queue.empty():
+      plog("DEBUG", "Queue is empty --> discarding ping stream " + str(stream.strm_id))
+      self.c.close_stream(stream.strm_id, 5)
+      return
+    else:
+      # Get the info and extract
+      ping_info = self.ping_queue.get()
+      circ_id = ping_info[0]
+      hop = ping_info[1]
+      # Set circ to stream
+      stream.circ = circ_id
+      try:
+        # Get the circuit 
+        if circ_id in self.circuits:
+          circ = self.circuits[circ_id]
+          if circ.built and not circ.closed:        
+            self.c.attach_stream(stream.strm_id, circ.circ_id, hop)
+            # Measure here or move to before attaching?
+            self.start_times[(circ_id, hop)] = arrived_at
+	    stream.hop = hop
+            stream.pending_circ = circ # Only one possible here
+            circ.pending_streams.append(stream)
+          else:
+            plog("WARN", "Circuit not built")
         else:
-          plog("WARN", "Circuit not built")
-      else:
-        # Close stream if circuit is gone
-        plog("WARN", "Circuit does not exist anymore, closing stream " + str(stream.strm_id))
-        self.c.close_stream(stream.strm_id, 5)
-      circs_lock.release()
-    except TorCtl.ErrorReply, e:
-      plog("WARN", "Error attaching stream: " + str(e.args))
+          # Close stream if circuit is gone
+          plog("WARN", "Circuit does not exist anymore, closing stream " + str(stream.strm_id))
+          self.c.close_stream(stream.strm_id, 5)
+      except TorCtl.ErrorReply, e:
+        plog("WARN", "Error attaching stream: " + str(e.args))
 
+  # TODO: Separate pings from normal streams directly, to make StreamHandler usable even without this ..
   def stream_status_event(self, s):
     """ Catch stream status events: Handle NEW and DETACHED here, 
         pass other events to StreamHandler """
@@ -607,7 +613,6 @@
 
       # If this is a ping
       if self.streams[s.strm_id].isPing:
-        circs_lock.acquire()
         if (s.reason == "TIMEOUT"):
 	  self.circuits[s.circ_id].timeout_counter += 1
 	  self.circuits[s.circ_id].slowness_counter += 1
@@ -621,11 +626,10 @@
 	      plog("ERROR", "Failed closing circuit " + str(s.circ_id) + ": " + str(e))	    
 	  # Set RTT for circ to None
 	  self.circuits[s.circ_id].current_rtt = None
-	  circs_lock.release()
 	  # Only close the stream
           self.c.close_stream(s.strm_id, 7)
 	  return
-        # This is a successful ping: measure here
+        # This is a successful ping: measure here	  
 	hop = self.streams[s.strm_id].hop
 	# Compute RTT using arrived_at 
 	rtt = s.arrived_at - self.start_times[(s.circ_id, hop)]
@@ -633,24 +637,26 @@
 	# Save RTT to circuit
 	self.circuits[s.circ_id].part_rtts[hop] = rtt
 	if hop == None:
-	  # This is a total circuit measuring	  
-	  self.circuits[s.circ_id].current_rtt = rtt
-	  self.circuits[s.circ_id].rtt_history.append(rtt)
+	  # This is a total circuit measuring
+	  self.circuits[s.circ_id].add_rtt(rtt)
 	  plog("DEBUG", "Added RTT to history: " + str(self.circuits[s.circ_id].rtt_history))
-	  # Refresh the stats
-	  self.circuits[s.circ_id].refresh_stats()
-	  self.circuits[s.circ_id].age += 1
-	
-	# Close if slow-max is reached on avg_rtt
-        if self.circuits[s.circ_id].avg_rtt >= slow:
-	  self.circuits[s.circ_id].slowness_counter += 1
-	  plog("DEBUG", "Slow circuit " + str(s.circ_id))
-	  if self.circuits[s.circ_id].slowness_counter >= slowness_limit and not self.circuits[s.circ_id].closed:
-	    plog("DEBUG", "Slow-max (" + str(slowness_limit) + ") is reached --> closing circuit " + str(s.circ_id))
-	    self.circuits[s.circ_id].closed = True
-	    self.c.close_circuit(s.circ_id)
+          
+	  if testing_mode:
+	    # Close if num_tests is reached
+	    if self.circuits[s.circ_id].age >= num_tests:
+	      self.print_circuits()
+	      self.circuits[s.circ_id].closed = True
+	      self.c.close_circuit(s.circ_id)
 
-	circs_lock.release()
+	  # Close if slow-max is reached on avg_rtt
+          if self.circuits[s.circ_id].avg_rtt >= slow:
+	    self.circuits[s.circ_id].slowness_counter += 1
+	    plog("DEBUG", "Slow circuit " + str(s.circ_id))
+	    if self.circuits[s.circ_id].slowness_counter >= slowness_limit and not self.circuits[s.circ_id].closed:
+	      plog("DEBUG", "Slow-max (" + str(slowness_limit) + ") is reached --> closing circuit " + str(s.circ_id))
+	      self.circuits[s.circ_id].closed = True
+	      self.c.close_circuit(s.circ_id)
+
 	# Resort every time ??
 	self.refresh_sorted_list()
 	# Close the stream
@@ -673,21 +679,17 @@
     else: 
       self.handle_other_events(s)
 
-######################################### END: PingHandler         #####################
 ######################################### BEGIN: Pinger            #####################
 
 class Pinger(threading.Thread):
   """ Separate thread that triggers the Socks4-connections for pings """
-  def __init__(self, ping_handler, router=None, partial=False):
-    self.handler = ping_handler					# The EventHandler
-    self.router = router					# This router is us
-    self.measure_partial = partial				# Flag to switch off partial measurings
-    if self.measure_partial:
-      # Create the model for recording link-RTTs
+  def __init__(self, ping_handler, router=None):
+    self.handler = ping_handler		# The PingHandler
+    self.router = router		# This router object represents us
+    if measure_partial_circs:
+      # Create the model for recording link-RTTs only if wished
       self.model = NetworkModel(self.router)
-    # The Pinger-object
-    #self.pinger = Pinger(ping_dummy_host, ping_dummy_port)
-    # Call constructor of superclass
+    # Call thread-constructor
     threading.Thread.__init__(self)
   
   def run(self):
@@ -698,38 +700,31 @@
 
   def do_work(self):
     """ Do the work """
-    # Measure RTTs of circuits
-    self.measure()
-    # self.handler.schedule_low_prio(lambda x: x.measure())
-    # Compute link-RTTs
-    if self.measure_partial:
-      self.compute_link_RTTs()
-    # Print circuits for info
-    self.handler.schedule_low_prio(lambda x: x.print_circuits())
-
-  def measure(self):
-    """ Measure RTTs of all circuits """
-    # XXX: Schedule in the EventHandler or leave out queueing!
-    circs_lock.acquire()
-    circs = self.handler.circuits.values()
-    circs_lock.release()
-    for c in circs:
-      if c.built:
-        # Get id & length of c
-      	id = c.circ_id
-	if self.measure_partial:
-	  path_len = len(c.path)
-	  for i in xrange(1, path_len):
-	    # Put in the queue: (circ, hop)
-	    self.handler.ping_queue.put((id, i))
-	    self.ping()
-	# And for the whole circuit ...
-	self.handler.ping_queue.put((id, None))
+    # Measure RTTs of complete circuits
+    if measure_circs:
+      # Schedule to enqueue all built circs
+      def notlambda(h): h.enqueue_pings() 
+      self.handler.schedule_immediate(notlambda)
+      # Get number of _all_ circs (>= no. of circs built)
+      i = len(self.handler.circuits)
+      for x in xrange(1, i+1):
+        # Maybe run in parallel (separate threads)?
+	plog("DEBUG", "Triggered ping " + str(x) + "/" + str(i))
 	self.ping()
+      
+      # Compute link-RTTs
+      if measure_partial_circs:
+        self.compute_link_RTTs()
+        # Check if we have m proposals with rtt <= n
+        if self.model.check_proposals(10, 0.5):
+          plog("INFO", "We now have enough proposals!")
+    
+    # Print circuits for logging
+    self.handler.schedule_immediate(lambda x: x.print_circuits())
 
-  # Hmm, there is no "try .. except .. finally .." in Python < 2.5 !!  
+  # No "try .. except .. finally .." in Python < 2.5 !
   def ping(self):
-    """ This creates a connection to dummy_host/_port using Socks4 """
+    """ Create a connection to dummy_host/_port using Socks4 """
     s = None
     try:
       try:
@@ -742,15 +737,13 @@
 	pass
     finally:
       # Close the socket if open
-      if s:
-        s.close()
+      if s: s.close()
 
   def compute_link_RTTs(self):
-    """ Get a copy(?) of the circs an check if we can compute links """    
-    # XXX: Get the circuits
-    circs_lock.acquire()
+    """ Get a copy of the circs and check if we can compute links 
+        and store these in the model """    
+    # TODO: Refactor: move to PingHandler or get a _copy_ of the circuits
     circs = self.handler.circuits.values()
-    circs_lock.release()
     for c in circs:
       # Get the length
       path_len = len(c.path)
@@ -786,26 +779,29 @@
 
 ######################################### END: Pinger              #####################
 
-# Return a connection to Tor's control port
 def connect(control_host, control_port):
-  # Create a socket and connect to Tor
+  """ Return a connection to Tor's control port """
   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   sock.connect((control_host, control_port))
   return Connection(sock)
-  
-# Do the configuration
-def configure(conn):
-  global my_ip, my_country
-  # Get our own IP and country
+ 
+def setup_location(conn):
+  """ Setup a router object representing this proxy """
+  plog("INFO","Setting up our location")
   try:
+    # Get our IP from Tor and set up a router object
     info = conn.get_info("address")
-    my_ip = info["address"]
-    my_country = GeoIPSupport.geoip.country_code_by_addr(my_ip)
-    #my_country = GeoIPSupport.get_country_from_record(my_ip)
-    plog("INFO", "Our IP address is " + str(my_ip) + " [" + my_country + "]")
+    ip = info["address"]
+    router = GeoIPSupport.GeoIPRouter(TorCtl.Router(None,"ROOT",None,False,None,None,ip,None,None))
+    #country = GeoIPSupport.geoip.country_code_by_addr(my_ip)
+    #country = GeoIPSupport.get_country_from_record(my_ip)
+    plog("INFO", "Our IP address is " + router.get_ip_dotted() + " [" + router.country_code + "]")
+    return router
   except: 
-    plog("INFO", "Could not get our IP")
-  # Set events to listen to
+    plog("ERROR", "Could not get our IP")
+ 
+def configure(conn):
+  """ Set events and options """
   conn.set_events([TorCtl.EVENT_TYPE.STREAM,
       TorCtl.EVENT_TYPE.CIRC,
       TorCtl.EVENT_TYPE.NS,	  
@@ -819,12 +815,12 @@
   conn = connect(control_host, control_port)
   #conn.debug(file("control.log", "w"))
   conn.authenticate()
+  # Setup a router instance here
+  router = setup_location(conn)
   # Configure myself  
   configure(conn)
-  # Setup a router instance here
-  router = GeoIPSupport.GeoIPRouter(TorCtl.Router(None,"ROOT",None,False,None,None,my_ip,None,None))	
   # Set Handler to the connection
-  handler = PingHandler(conn, __selmgr, router, measure_partial_circs)
+  handler = PingHandler(conn, __selmgr, router)
   conn.set_event_handler(handler)
   # Go to sleep to be able to get killed from the commandline
   try:
@@ -833,8 +829,8 @@
   except KeyboardInterrupt:
     cleanup(conn)
 
-# Call this on exit
 def cleanup(conn):
+  """ To be called on exit """
   plog("INFO", "Cleaning up...")
   conn.set_option("__LeaveStreamsUnattached", "0")
   conn.set_option("__DisablePredictedCircuits", "0")



More information about the tor-commits mailing list