[or-cvs] r12279: Latest revision of op-addon.py containing additional methods (in torflow/trunk: . TorCtl)

renner at seul.org renner at seul.org
Tue Oct 30 10:41:28 UTC 2007


Author: renner
Date: 2007-10-30 06:41:28 -0400 (Tue, 30 Oct 2007)
New Revision: 12279

Added:
   torflow/trunk/stream-server.pl
Modified:
   torflow/trunk/TorCtl/PathSupport.py
   torflow/trunk/bw-informer.py
   torflow/trunk/op-addon.py
   torflow/trunk/pathrc.example
Log:

  Latest revision of op-addon.py containing additional methods that were used
  to perform evaluations (involving stream-server.pl) + new version of 
  bw-informer.py



Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py	2007-10-30 08:59:34 UTC (rev 12278)
+++ torflow/trunk/TorCtl/PathSupport.py	2007-10-30 10:41:28 UTC (rev 12279)
@@ -456,7 +456,9 @@
             self.total_exit_bw += r.bw
 
       bw_per_hop = (1.0*self.total_bw)/self.pathlen
-      ratio = self.total_exit_bw/float(self.total_bw)
+      if self.total_bw > 0:
+        ratio = self.total_exit_bw/float(self.total_bw)
+      else: ratio = 0
       plog("DEBUG", "E = " + str(self.total_exit_bw) +
          ", T = " + str(self.total_bw) +
          ", ratio = " + str(ratio) +
@@ -467,7 +469,9 @@
         # add a ConserveExitsRestriction?
         self.weight = 0
       else:
-        self.weight = ((self.total_exit_bw-bw_per_hop)/self.total_exit_bw)
+        if self.total_exit_bw > 0:
+          self.weight = ((self.total_exit_bw-bw_per_hop)/self.total_exit_bw)
+        else: self.weight = 0
     plog("DEBUG", "The exit-weight is: " + str(self.weight))
 
   def next_r(self):

Modified: torflow/trunk/bw-informer.py
===================================================================
--- torflow/trunk/bw-informer.py	2007-10-30 08:59:34 UTC (rev 12278)
+++ torflow/trunk/bw-informer.py	2007-10-30 10:41:28 UTC (rev 12279)
@@ -1,65 +1,71 @@
 #!/usr/bin/python
+
 """
   RWTH Aachen University, Informatik IV
   Copyright (C) 2007 Johannes Renner 
   Contact: renner at i4.informatik.rwth-aachen.de
 """
-# Addon for Onion Routers (prototype-v0.0-alpha):
-# Provides bandwidth-data about single TLS-connections as well as 
-# total bandwidth-data about the router for requesting clients. 
-# This software works in a passive way: It does _not_ control the 
-# Tor process, e.g. close connections, _only_ records traffic.
 
-# TODO: Howto make the document be served by Tor via http?
-# TODO: Check nicknames for uniqueness
+# Addon for onion routers:
+# Shall provide information about available bandwidth on single 
+# TLS-connections as well as globally available bandwidth for 
+# requesting clients in an anonymity-preserving way (?).
 
+# TODO: Make the document be served by Tor via HTTP
+
+import re
 import sys
 import sched
 import time
 import socket
 import atexit
 import threading
+import traceback
 
 from TorCtl import *
 from TorCtl.TorUtil import *
 
-# Move these to config file:
+# Set the version here
+VERSION = "0.0-alpha"
+
+# Move these to a config file:
 # Tor host and port
 control_host = "127.0.0.1"
 control_port = 9051
 # Listen host and port
-listen_host = "127.0.0.1"
+listen_host = "137.226.12.177"
 listen_port = 9053
 
 # Duration of single measuring interval (seconds)
-interval = 30
-# No of inactive rounds to decrease max until 
-# we set it to zero, this leads to 1 hour
-inactive_limit = 3600/interval
+interval = 20
+
 # Alpha for computing new max values, let max
 # decrease slowly if no traffic or not topped
-alpha = 0.01
-# Minimum 'available' bandwidth (bytes/sec) 
+alpha = .9999
+# Minimum 'available' bandwidth (byte/sec) 
 # to show up on the document
-available_min = 10000
+available_min = 0
 
 # Global variable marks the start of an interval
 start = time.time()
-# variable that contains the status-document
+# Overall start time
+total_start = time.time()
+
+# Variable that contains the status-document
 bw_status = "no status document available yet :(\r\n"
 
 # Dictionary that contains all stats 
 stats = {}
 stats_lock = threading.Lock()
+# Dicts that contain mappings
+key_to_name = {}
+name_to_key = {}
 
-#key_to_name = {}
-#name_to_key = {}
-
-# We use one class for recording global stats and link stats
+# We use the same class for recording global stats and link stats
 class LinkBandwidthStats(TorCtl.Router):
   def __init__(self, r=None):
     if r:
-      self.__dict__ = r.dict
+      self.__dict__ = r.__dict__
     else:
       self.down = 0
     # Total counters
@@ -68,15 +74,14 @@
     self.tot_ncircs = 0
     self.tot_read = 0
     self.tot_written = 0
+    self.tot_bytes = 0		# total read + written
     # Interval stats
     self.int_read = 0		# count bytes read & written ..
     self.int_written = 0	# in the last interval
     self.int_bytes = 0 		# sum of both, gets set on update()
-    self.avg_throughput = 0.0	# avg throughput for the last interval 
+    self.curr_throughput = 0.0	# avg throughput for the last interval 
     self.max_throughput = 0.0	# throughput max-value
     self.available = 0.0	# max - avg
-    self.inactive_count = 0	# counter for inactive rounds
-    self.inactive = False	# inactive flag
 
   def read(self, bytes_read):
     self.tot_read += bytes_read
@@ -95,50 +100,35 @@
   # Most important method here
   def update(self, elapsed):
     # Compute the interval-bytes read+written
-    self.int_bytes = self.int_read + self.int_written    
-    # If nothing read or written this round
-    if self.int_bytes == 0:
-      # Increase counter
-      self.inactive_count += 1
-      if self.inactive_count >= inactive_limit:
-        # Limit reached: set max to 0 to get this deleted from stats
-	plog("DEBUG", "Inactive limit reached --> setting max to 0: " + self.nickname)
-        self.max_throughput = 0
-        self.inactive = True
-	# Not needed since inactive --> del
-	#reset_interval_counters()
-	return
-    else:
-      # We have read or written something
-      self.inactive_count = 0
+    self.int_bytes = self.int_read + self.int_written
+    # Compute total bytes
+    self.tot_bytes = self.tot_read + self.tot_written
     # Compute avg interval throughput
-    self.avg_throughput = self.int_bytes/elapsed        
+    self.curr_throughput = self.int_bytes/elapsed        
 
     # Max handling ..
-    if self.avg_throughput > self.max_throughput:
+    if self.curr_throughput > self.max_throughput:
       # We have a new max!
-      self.max_throughput = self.avg_throughput
-      plog("DEBUG", self.nickname + " reached new max: " + str(self.max_throughput) + " bytes/sec")
+      self.max_throughput = self.curr_throughput
+      plog("DEBUG", self.nickname + " reached new max: " + 
+         str(self.max_throughput) + " byte/sec")
     else:
       # Saving old max for debugging only
-      #old_max = self.max_throughput    
+      old_max = self.max_throughput    
       # Decrease the max-value using alpha-formula 
-      self.max_throughput = max(self.avg_throughput, (self.max_throughput*(1-alpha) + self.avg_throughput*alpha))	
-      #plog("DEBUG", self.nickname + ": max decreased from " + str(old_max) + " to " + str(self.max_throughput))
+      self.max_throughput = max(self.curr_throughput, (self.max_throughput*alpha + self.curr_throughput*(1-alpha)))	
+      #plog("DEBUG", self.nickname + ": max decreased from " 
+      #   + str(old_max) + " to " + str(self.max_throughput))
 
-    # Also set inactive if nothing read/written and max decreased to zero
-    if self.int_bytes == 0 and self.max_throughput == 0:
-      self.inactive = True
     # Compute the difference as 'available'
-    # TODO: Do it in the clients, or deliver ONLY this value??
-    self.available = self.max_throughput - self.avg_throughput
+    # TODO: Add the frac part from the approaches 
+    self.available = self.max_throughput - self.curr_throughput
     # Reset the counters
     self.reset_interval_counters()
 
 # Special instance of LinkBandwidthStats for recording of bw-events
 global_stats = LinkBandwidthStats()
-# TODO: Get my hostname/nickname?
-global_stats.nickname = "This Router"
+global_stats.nickname = "Global stats"
 
 # We need an EventHandler
 # extend from TorCtl.EventHandler
@@ -155,24 +145,24 @@
     if event.written: global_stats.written(event.written)
 
   # Method to handle ORCONN-events
-  def or_conn_status_event(self, o):    
-    # XXX: Count all routers as one?
+  def or_conn_status_event(self, o): 
+    # Count all clients as one:
     # If o.endpoint is an idhash
-    #if re.search(r"^\$", o.endpoint):
-      #if o.endpoint not in key_to_name:
-        #o.endpoint = "AllClients:HASH"
-      #else: o.endpoint = key_to_name[o.endpoint]
+    if re.search(r"^\$", o.endpoint):
+      if o.endpoint not in key_to_name:
+        o.endpoint = "AllClients:HASH"
+      else: o.endpoint = key_to_name[o.endpoint]
     # If it is no idhash and not in name_to_key
-    #elif o.endpoint not in name_to_key:
-      #plog("DEBUG", "IP? " + o.endpoint)
-      #o.endpoint = "AllClients:IP"
+    elif o.endpoint not in name_to_key:
+      plog("DEBUG", "IP? " + o.endpoint)
+      o.endpoint = "AllClients:IP"
 
     # If NEW, LAUNCHED or CONNECTED
     if o.status == "NEW" or o.status == "LAUNCHED" or o.status == "CONNECTED":
       plog("NOTICE", "Connection to " + o.endpoint + " is now " + o.status)
 
     # If status is READ or WRITE
-    if o.status == "READ" or o.status == "WRITE":
+    elif o.status == "READ" or o.status == "WRITE":
       #plog("DEBUG", o.endpoint + ", read: " + str(o.read_bytes) + " wrote: " + str(o.wrote_bytes))      
       stats_lock.acquire()
       # If not in stats: add!
@@ -181,14 +171,12 @@
         stats[o.endpoint].nickname = o.endpoint
         plog("NOTICE", "+ Added " + o.endpoint + " to the stats")
       # Add number of bytes to total and interval
-      if o.read_bytes:
-        stats[o.endpoint].read(o.read_bytes)
-      if o.wrote_bytes:
-        stats[o.endpoint].written(o.wrote_bytes)
+      if o.read_bytes: stats[o.endpoint].read(o.read_bytes)
+      if o.wrote_bytes: stats[o.endpoint].written(o.wrote_bytes)
       stats_lock.release()
       
     # If CLOSED or FAILED  
-    if o.status == "CLOSED" or o.status == "FAILED": 
+    elif o.status == "CLOSED" or o.status == "FAILED": 
       # Don't record reasons!
       stats_lock.acquire()      
       if o.endpoint not in stats:
@@ -206,21 +194,28 @@
       #if o.read_bytes: stats[o.endpoint].tot_read += o.read_bytes
       #if o.wrote_bytes: stats[o.endpoint].tot_wrote += o.wrote_bytes      
       stats_lock.release()
-    else: return
 
-    # This is only for constructing debug output
-    if o.age: age = "AGE="+str(o.age)
-    else: age = ""
-    if o.read_bytes: read = "READ="+str(o.read_bytes)
-    else: read = ""
-    if o.wrote_bytes: wrote = "WRITTEN="+str(o.wrote_bytes)
-    else: wrote = ""
-    if o.reason: reason = "REASON="+o.reason
-    else: reason = ""
-    if o.ncircs: ncircs = "NCIRCS="+str(o.ncircs)
-    else: ncircs = ""
-    plog("DEBUG", " ".join((o.event_name, o.endpoint, o.status, age, read, wrote, reason, ncircs)))
+      # This is only for constructing debug output
+      if o.age: age = "AGE="+str(o.age)
+      else: age = ""
+      if o.read_bytes: read = "READ="+str(o.read_bytes)
+      else: read = ""
+      if o.wrote_bytes: wrote = "WRITTEN="+str(o.wrote_bytes)
+      else: wrote = ""
+      if o.reason: reason = "REASON="+o.reason
+      else: reason = ""
+      if o.ncircs: ncircs = "NCIRCS="+str(o.ncircs)
+      else: ncircs = ""
+      plog("DEBUG", " ".join((o.event_name, o.endpoint, o.status, age, read, wrote, reason, ncircs)))
 
+  # NS-EventHandler methods
+  def ns_event(self, n):
+    read_routers(self.c, n.nslist)
+ 
+  def new_desc_event(self, d):
+    for i in d.idlist: # Is this too slow?
+      read_routers(self.c, self.c.get_network_status("id/"+i))
+
 # Sort a list by a specified key
 def sort_list(list, key):
   list.sort(lambda x,y: cmp(key(y), key(x))) # Python < 2.4 hack
@@ -231,6 +226,31 @@
   f.write(bw_status)
   f.close()
 
+# Read the routers
+def read_routers(c, nslist):
+  global key_to_name, name_to_key
+  bad_key = 0
+  stats_lock.acquire()
+  for ns in nslist:
+    try:
+      key_to_name[ns.idhex] = ns.nickname
+      name_to_key[ns.nickname] = ns.idhex
+      r = LinkBandwidthStats(c.get_router(ns))
+      if ns.nickname in stats:
+        if stats[ns.nickname].idhex != r.idhex:
+          plog("NOTICE", "Router "+r.nickname+" has multiple keys: "
+             +stats[ns.nickname].idhex+" and "+r.idhex)
+      stats[r.nickname] = r # XXX: We get names only from ORCONN :(
+    except TorCtl.ErrorReply:
+      bad_key += 1
+      if "Running" in ns.flags:
+        plog("INFO", "Running router "+ns.nickname+"="+ns.idhex+" has no descriptor")
+      pass
+    except:
+      traceback.print_exception(*sys.exc_info())
+      continue
+  stats_lock.release()
+
 # Update stats and reset every router's counters
 # (Requires stats_lock.acquire())
 def update_stats(elapsed):
@@ -241,15 +261,12 @@
   for l in links:
     # Update & reset stats
     l.update(elapsed)
-    # If inactive --> delete
-    if l.inactive:
-      del stats[l.nickname]
-      plog("NOTICE", "- No traffic on link to " + l.nickname + " --> deleted from stats")  
 
 # Create the new status document
 # (Requires stats_lock.acquire())
-# TODO: Compress the data:
+# TODO: Somehow compress the data:
 #  - if available==max --> only deliver max?
+#  - only deliver available?
 #  - leave out links with available==0 ? 
 #      - No, avail==0 means new max, but not nothing available!
 #  - clustering/classification?
@@ -258,21 +275,20 @@
   # Fill in global_stats
   new_status += str(global_stats.available) + " "
   new_status += str(global_stats.max_throughput) + " "
-  new_status += str(global_stats.avg_throughput) + "\r\n"  
-  new_status += "--------------------\r\n"
-  # TODO: Better sort for available or max?
+  new_status += str(global_stats.curr_throughput) + "\r\n"    
+  # Sort the document for available
   key = lambda x: x.available
-  links_sorted = sort_list(stats.values(), key)
+  links_sorted = sort_list(stats.values(), key)  
   for l in links_sorted:
     # Cutoff at available_min
-    if key(l) >= available_min:
-      new_status += l.nickname + " " + str(key(l)) + " "
-      new_status += str(l.max_throughput) + " " + str(l.avg_throughput) + "\r\n"
+    if key(l) >= available_min and l.nickname != "AllClients:HASH":
+      new_status += l.nickname + " " + str(key(l)) + " "      
+      new_status += str(l.max_throughput) + " " + str(l.curr_throughput) + "\r\n"      
   # Critical: Exchange global bw_status document
   global bw_status
   bw_status = new_status
 
-# This is the method where the main work gets done
+# This is the method where the main work is done
 # Schedule the call every 'interval' seconds
 def do_work(s):
   global start
@@ -289,8 +305,8 @@
   # Release lock
   stats_lock.release()  
   
-  # Write to file, TODO: Write to Tor-dir, find out!
-  write_file(file("./data/bw-document", "w"))
+  # Write to file, TODO: Write to Tor-dir: data/status/
+  write_file(file("./data/bw-informer/bw-document", "w"))
   # Some debugging
   plog("INFO", "Created new document for the last interval (" + str(elapsed) + ") seconds\n") # + bw_status)  
   # Reschedule
@@ -298,12 +314,14 @@
   s.enter(interval, 1, do_work, (s,))
 
 # Run a scheduler that does work every interval
-def start_sched():
-  #global key_to_name, name_to_key
-  #nslist = c.get_network_status()
-  #read_routers(c, nslist)  
+def start_sched(c):
+  # Ge the network status
+  nslist = c.get_network_status()
+  read_routers(c, nslist)  
+  # Setup scheduler
   s = sched.scheduler(time.time, time.sleep)
   start = time.time()
+  total_start = time.time()
   s.enter(interval, 1, do_work, (s,))
   try:
     s.run()
@@ -338,7 +356,7 @@
 
 # Main function
 def main(argv):
-  plog("INFO", "This is bandwidth-informer v0.0-alpha")
+  plog("INFO", "bw-informer v" + VERSION)
   # Create connection to Tor
   s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   s.connect((control_host, control_port))
@@ -348,19 +366,22 @@
   c.set_event_handler(LinkHandler(c))
   # Close connection on exit
   atexit.register(cleanup, *(c,))
-  # Start the thread
+  # Start the connection thread
   c.launch_thread()
   c.authenticate()
-  # Only listen to ORCONN
-  c.set_events([TorCtl.EVENT_TYPE.ORCONN, TorCtl.EVENT_TYPE.BW], True)
+  # Listen to some events
+  c.set_events([TorCtl.EVENT_TYPE.ORCONN, 
+                TorCtl.EVENT_TYPE.BW, 
+                TorCtl.EVENT_TYPE.NS, 
+                TorCtl.EVENT_TYPE.NEWDESC], True)
   # TODO: Set extra-info for descriptor here
   # Start server thread
   thr = threading.Thread(None, lambda: start_server())
-  thr.setName("Server")
+  thr.setName("BW-Server")
   thr.setDaemon(1)
   thr.start()
-  # Start the monitor here
-  start_sched()
+  # Start the actual monitor here
+  start_sched(c)
 
 # Program entry point
 if __name__ == '__main__':

Modified: torflow/trunk/op-addon.py
===================================================================
--- torflow/trunk/op-addon.py	2007-10-30 08:59:34 UTC (rev 12278)
+++ torflow/trunk/op-addon.py	2007-10-30 10:41:28 UTC (rev 12279)
@@ -24,15 +24,15 @@
 ## CONFIGURATION ##############################################################
 
 # Set the version
-VERSION = "0.0.01-alpha"
-# Path to data-directory
+VERSION = "0.0.10"
+# Path to the data directory
 DATADIR = "data/op-addon/"
 # Our IP-address
 IP = None
 # Simulation modus
 SIMULATE = False
 
-# Try to get the config-file from the commandline
+# Try to get the config-file from the commandline first
 if len(sys.argv) == 1:
   CONFIG_FILE = "pathrc.example"
 elif len(sys.argv) == 2:
@@ -60,16 +60,16 @@
   plog("ERROR", "Config file '" + CONFIG_FILE + "' does not exist, exiting.")
   sys.exit(0)
   
-# Configuration sections
+# Different configuration sections
 HOST_PORT = "HOST_PORT"
 CIRC_MANAGEMENT = "CIRC_MANAGEMENT"
 NODE_SELECTION = "NODE_SELECTION"
 GEOIP = "GEOIP"
-TESTING = "TESTING"
+EVALUATE = "EVALUATE"
 RTT = "RTT"
 MODEL = "MODEL"
 
-# Measure the circuits
+# Measure RTTs of circuits
 ping_circs = config.getboolean(RTT, "ping_circs")
 network_model = False
 if ping_circs:
@@ -86,12 +86,12 @@
   # Close a circ after n timeouts
   timeout_limit = config.getint(RTT, "timeout_limit")
   
-  # Set to True if we want to measure partial circuits
-  # This also enables circuit creation from the model
+  # Set to True to measure RTTs of partial circuits,
+  # also enables circuit creation from the model
   network_model = config.getboolean(MODEL, "network_model")
   if network_model:
     import networkx
-    # RTT-threshhold when creating circs from the model
+    # RTT-threshold when creating circs from the model
     max_rtt = config.getfloat(MODEL, "max_rtt")    
     # Minimum number of proposals to choose from
     min_proposals = config.getint(MODEL, "min_proposals")
@@ -102,10 +102,11 @@
   # Testing mode: Collect latencies of circuits and links in the 
   # network. Close circuits after num_xx_tests measures and involve 
   # a FileHandler to write data to a file
-  TESTING_MODE = config.getboolean(TESTING, "testing_mode")
-  if TESTING_MODE:
-    num_rtt_tests = config.getint(TESTING, "num_rtt_tests")
-    num_records = config.getint(TESTING, "num_records")
+  EVAL_MODE = config.getboolean(EVALUATE, "evaluate")
+  if EVAL_MODE:
+    num_rtt_tests = config.getint(EVALUATE, "num_rtt_tests")
+    num_bw_tests = config.getint(EVALUATE, "num_bw_tests")
+    num_records = config.getint(EVALUATE, "num_records")
 
 def get_geoip_config():
   """ Read the geoip-configuration from the config-file """
@@ -161,7 +162,7 @@
 
   def build_circuit_from_path(self, path):
     """ Build circuit using a given path (= router-objects), 
-        used to build circs from NetworkModel """
+        used to build circuits from a NetworkModel """
     circ = Circuit()
     circ.path = path
     circ.exit = path[len(path)-1]
@@ -258,6 +259,7 @@
   def get_line_count(self):
     self.filehandle = open(self.filename)
     lines = self.filehandle.readlines()
+    # Close handle?
     return len(lines)
 
 ## Circuit & Stream ###########################################################
@@ -267,30 +269,30 @@
   def __init__(self):
     PathSupport.Circuit.__init__(self)
     # RTT stuff
-    self.part_rtts = {}		# Dict of partial rtts, pathlen 3: 1-2-None
-    self.current_rtt = None	# Double (sec): current value
-    self.stats = Stats()	# Stats about total RTT contains history
+    self.part_rtts = {}		# Dict of partial RTTs, pathlen 3: 1-2-None
+    self.current_rtt = None	# Double (sec): current ranking of this circ
+    self.stats = Stats()	# Stats about total RTT, contains the history
     # Counters and flags
     self.age = 0		# Age in rounds
     self.timeout_counter = 0	# Timeout limit
-    self.rtt_created = False	# Created from the model    
+    self.rtt_created = False	# Created from the model
     # XXX: BW stuff
     self.bw = 0
     self.bw_tested = False
       
   def add_rtt(self, rtt):
     """ Add a new value and refresh stats and current """
-    # Set current
+    # Set current circuit-ranking
     if self.current_rtt == None:
       self.current_rtt = rtt
     else:
-      # Weight the current value with the last
+      # Weight the current value with the previous
       self.current_rtt = (self.current_rtt * 0.5) + (rtt * 0.5)
       plog("DEBUG", "Computing new current RTT from " + str(rtt) + " to " + 
          str(self.current_rtt))
-    # Add new RTT to the stats
+    # Add a new RTT to the stats
     self.stats.add_value(rtt)
-    # Increase age
+    # Increase the age
     self.age += 1
 
   def to_string(self):
@@ -316,7 +318,7 @@
 
 ## NetworkModel ###############################################################
 
-class LinkInfo:
+class TorLink:
   """ This class contains infos about a link: source, destination, RTT
       plus: rtt_history, methods to compute stats, etc. """
   def __init__(self, src, dest, rtt=0):
@@ -330,7 +332,8 @@
 
   def add_rtt(self, rtt):
     # Compute new current value from the last
-    if self.current_rtt == None: self.current_rtt = rtt
+    if self.current_rtt == None: 
+      self.current_rtt = rtt
     else: 
       self.current_rtt = (self.current_rtt * 0.5) + (rtt * 0.5)
       plog("DEBUG", "Computing new current RTT from " + str(rtt) + " to " + 
@@ -339,23 +342,23 @@
 class PathProposal:
   """ Instances of this class are path-proposals found in the model """
   def __init__(self, links, path):
-    # This is a list of LinkInfo objects
+    # This is a list of TorLink objects
     self.links = links
-    # Cut off ROOT here
+    # Cut off the ROOT here
     self.path = path[1:len(path)]
     # Compute the expected RTT
     self.rtt = reduce(lambda x,y: x + y.current_rtt, self.links, 0.0)
     self.rtt_score = 0          # RTT score
     self.bw_score = 0           # BW score
-    self.min_bw = 0             # Minimum BW of routers in self.path
-    self.ranking_index = None   # Index computed from BW and RTT
+    self.min_bw = 0             # Minimum bw of routers in path
+    self.ranking_index = None   # Index computed from bw and RTT
 
   def to_string(self):
     """ Create a string for printing out information """
     s = ""
     for l in self.links:
       s += str(l.src) + "--" + l.dest + " (" + str(l.current_rtt) + ") " + ", "
-    return s + "--> " + str(self.rtt) + " sec"
+    return s + "--> " + str(self.rtt) + " sec" 
 
 class NetworkModel:  
   """ This class is used to record measured RTTs of single links in a model 
@@ -375,7 +378,7 @@
       self.up_to_date = False
     except:
       plog("INFO", "Could not load a model, creating a new one ..")
-      self.graph = networkx.XGraph(name="Explored Tor Subnet")
+      self.graph = networkx.XGraph(name="Tor Subnet")
       self.graph.add_node(None)
       self.up_to_date = True
     self.print_info()
@@ -385,18 +388,18 @@
     """ Write the graph to a binary file """
     start = time.time()
     networkx.write_gpickle(self.graph, self.pickle_path)
-    plog("INFO", "Saved network-model to '" + self.pickle_path +
+    plog("INFO", "Stored Tor-graph to '" + self.pickle_path +
        "' in " + str(time.time()-start) + " sec")
 
   def load_graph(self):
     """ Load a graph from a binary file and return it """
     graph = networkx.read_gpickle(self.pickle_path)    
-    plog("INFO", "Loaded graph from '" + self.pickle_path + "'")
+    plog("INFO", "Loaded Tor-graph from '" + self.pickle_path + "'")
     return graph
    
   def add_link(self, src, dest, rtt):
-    """ Add link to the graph given src, dest (router-ids) & RTT (LinkInfo) """
-    self.graph.add_edge(src, dest, LinkInfo(src, dest, rtt))
+    """ Add link to the graph given src, dest (router-ids) & RTT (TorLink) """
+    self.graph.add_edge(src, dest, TorLink(src, dest, rtt))
  
   def add_circuit(self, c):
     """ Check if we can compute RTTs of single links for a circuit 
@@ -615,7 +618,7 @@
     self.circ_stats = CircuitBuildingStats()    # record setup-durations
     self.stats_logger = FileHandler(DATADIR + "circ-setup-stats")
     self.setup_logger = None # FileHandler(DATADIR + "circ-setup-durations")
-    if TESTING_MODE:
+    if EVAL_MODE:
       self.testing_logger = FileHandler(DATADIR + "circ-data")
       self.bw_queue = Queue.Queue()     # circ_ids to bw-test
     # Queue containing circs to be tested
@@ -671,7 +674,19 @@
     # TODO: Check if there are any circs, else set 'frequency' to 10?
     circs = self.circuits.values()
     for c in circs:
-      self.enqueue_circ(c)
+      # XXX: First test BW, then enqueue for RTTs
+      if EVAL_MODE and num_bw_tests > 0:
+        if self.model:
+          if c.rtt_created and c.bw_tested:
+            self.enqueue_circ(c)
+          elif not c.rtt_created:
+            self.enqueue_circ(c)
+        elif not c.bw_tested:
+          pass
+        else:
+          self.enqueue_circ(c)
+      else:
+        self.enqueue_circ(c)
 
   def enqueue_circ(self, c):
     """ Enqueue a circuit for measuring RTT """
@@ -743,8 +758,8 @@
       plog("DEBUG", "Added RTT to history: " + 
          str(self.circuits[s.circ_id].stats.values))	  
       
-      # TESTING_MODE: close if num_rtt_tests is reached  
-      if TESTING_MODE:
+      # EVAL_MODE: close if num_rtt_tests is reached  
+      if EVAL_MODE:
         if self.circuits[s.circ_id].age == num_rtt_tests:
           plog("DEBUG", "Closing circ " + str(s.circ_id) + 
              ": num_rtt_tests is reached")
@@ -763,10 +778,69 @@
         # Add the links of this circuit to the model
         self.model.add_circuit(self.circuits[s.circ_id])
 
+  def handle_bw_test(self, s):
+    """ Handle special streams to measure the bandwidth of circs """
+    output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), 
+       s.target_host, str(s.target_port)]
+    if s.reason: output.append("REASON=" + s.reason)
+    if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
+    plog("DEBUG", " ".join(output))
+    # NEW
+    if s.status == "NEW":
+      stream = Stream(s.strm_id, s.target_host, s.target_port, s.status)
+      self.streams[s.strm_id] = stream 
+      # Set next circ_id to stream
+      stream.circ = self.bw_queue.get()
+      try:
+        if stream.circ in self.circuits:
+          circ = self.circuits[stream.circ]
+          if circ.built and not circ.closed:
+	    self.c.attach_stream(stream.strm_id, circ.circ_id)               
+          else:
+            plog("WARN", "Circuit not built or closed")
+            self.close_stream(s.strm_id, 5)
+        else:
+          # Go to next test if circuit is gone or we get an ErrorReply
+          plog("WARN", "Circuit " + str(circ_id) + 
+             " does not exist anymore --> closing stream")
+          # Close stream, XXX: Reason?
+          self.close_stream(s.strm_id, 5)
+      except TorCtl.ErrorReply, e:
+        plog("WARN", "Error attaching stream " + str(stream.strm_id) + 
+           " :" + str(e.args))
+        self.close_stream(s.strm_id, 5)
+    # SUCCEEDED
+    if s.status == "SUCCEEDED":
+      self.streams[s.strm_id].attached_at = s.arrived_at      
+    # DONE
+    if s.status == "CLOSED" and s.reason == "DONE":
+      stream = self.streams[s.strm_id]
+      # Since bytes are counted from events, use the timestamp 
+      # of the last stream_bw event for computing the lifespan
+      #lifespan = stream.lifespan(s.arrived_at)
+      lifespan = stream.lifespan(stream.bw_timestamp)
+      plog("INFO", "Lifespan is " + str(lifespan))
+      # Compute bandwidth
+      total_bytes = stream.bytes_read + stream.bytes_written
+      plog("DEBUG", "Total number of bytes (read+written) is " + str(total_bytes))
+      bw = total_bytes/float(lifespan)
+      plog("INFO", "Got bandwidth: " + str(bw))
+      self.circuits[s.circ_id].bw = bw
+      self.circuits[s.circ_id].bw_tested = True
+    # DETACHED reason EXITPOLICY
+    if s.status == "DETACHED":
+      if s.remote_reason in ["EXITPOLICY","TIMEOUT"]:
+        # Close circuit and stream
+        self.close_stream(s.strm_id, 5)
+        self.close_circuit(s.circ_id)
+
   def stream_status_event(self, s):
     """ Separate pings from regular streams directly """
     if not (s.target_host == ping_dummy_host and 
        s.target_port == ping_dummy_port):
+      # XXX: Catch bandwidth-streams
+      if s.target_host == IP and s.target_port == 8041:
+        return self.handle_bw_test(s)      
 
       # TODO: Handle echelon here?
       # - perform DNS request (or use REMAP?)
@@ -774,8 +848,9 @@
       # - check if there is already a circuit with exit node
       #   in destination country
       
-      # This is no ping, call the other method
-      return PathSupport.StreamHandler.stream_status_event(self, s)
+      # This is NO test: call the underlying method to attach
+      else:
+        return PathSupport.StreamHandler.stream_status_event(self, s)
     
     # Construct debugging output
     output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), 
@@ -817,19 +892,19 @@
       # Close the stream
       self.close_stream(s.strm_id, 5)
 
-    # CLOSED + END is also ping, some routers send it when measuring
-    # latency to a single hop, better measure on FAILED?
+    # CLOSED + END is also a ping, some routers send it when 
+    # measuring RTT to a single hop, better measure on FAILED?
     elif s.status == "CLOSED":
       if s.reason == "END":
         # Only record
         self.record_ping(s)
 
   def circ_status_event(self, c):
-    """ Override to record statistics on circuit-setups and -failures """
+    """ Override this to record statistics on circuit-setups and -failures """
     if c.circ_id not in self.circuits:
       return PathSupport.CircuitHandler.circ_status_event(self, c)    
     
-    # Catch FAILED/CLOSED now since circ will be removed
+    # Catch FAILED/CLOSED now: circ will be removed
     elif c.status == "FAILED" or c.status == "CLOSED":
       circ = self.circuits[c.circ_id]
       # Setup a message for logging
@@ -878,7 +953,33 @@
         self.circ_stats.add_value(circ.setup_duration)
         self.stats_logger.write(self.circ_stats.to_string())
       self.refresh_sorted_list()
+      
+      # XXX: Initialize a bw-test here
+      if EVAL_MODE and num_bw_tests > 0:
+        if self.model:
+          # Only test bandwidth on rtt_created circs
+          if circ.rtt_created:
+            self.start_bw_test(c.circ_id)
+	else: self.start_bw_test(c.circ_id)
+
+  def start_bw_test(self, circ_id):
+    """ Perform a bandwidth-test on circuit with given circ_id """
+    plog("INFO", "Starting BW-test on circuit " + str(circ_id))
+    # Enqueue the circuit
+    self.bw_queue.put(circ_id)
+    # Start the stream-thread (512 KB = 524288)
+    bw_tester = BwTester(1000000)
+    bw_tester.setDaemon(True)
+    bw_tester.start()
   
+  def stream_bw_event(self, s):
+    """ Record the timestamp of the last stream_bw event to any stream """
+    if not s.strm_id in self.streams:
+      plog("WARN", "BW event for unknown stream id: "+str(s.strm_id))
+    else:
+      self.streams[s.strm_id].bw_timestamp = s.arrived_at
+    PathSupport.PathBuilder.stream_bw_event(self, s)
+  
   def build_circuit(self, host, port):
     """ Override from CircuitHandler to support circuit-creation from model """
     if self.model:
@@ -905,7 +1006,7 @@
     plog("DEBUG", "Current number of proposals is "+
        str(len(self.model.proposals)))
     if len(self.model.proposals) >= min_proposals:
-      # Give weights for single scores
+      # TODO: Set weights for single scores here!
       self.model.update_ranking(1, 0)
       # As long as there are enough
       while len(self.model.proposals) >= min_proposals:
@@ -985,6 +1086,47 @@
       # Close the socket if open
       if s: s.close()
 
+## BW-Tester ##################################################################
+
+class BwTester(threading.Thread):
+  """ Thread that connects to our own IP and downloads a stream """
+  def __init__(self, bytes):
+    self.bytes = bytes                  # Amount of bytes to request
+    threading.Thread.__init__(self)	# Call the thread-constructor
+  
+  def run(self):
+    """ The run()-method """
+    self.run_test()
+  
+  # No "try .. except .. finally .." in Python < 2.5 !
+  def run_test(self):
+    """ Create a connection to stream-server.pl using SOCKS4 """
+    s = None
+    try:
+      try:
+        s = socks.socksocket()
+        s.setproxy(socks.PROXY_TYPE_SOCKS4, socks_host, socks_port)
+        s.connect((IP, 8041))
+        plog("INFO", "Connected to " + IP)
+        # Request bytes
+        s.send(str(self.bytes) + "\n")
+        plog("INFO", "Sent request for " + str(self.bytes) + " bytes")
+        byte_counter = 0
+        while 1:
+          buffer = s.recv(4096)
+          if buffer:
+            #plog("INFO", "Received " + str(len(buffer)) + " bytes")
+            byte_counter += len(buffer)
+            if byte_counter >= self.bytes:
+              plog("INFO", "Received " + str(byte_counter) + " bytes in total")
+              s.send("close\n")
+              break  
+      except socks.Socks4Error, e:
+	print("Got Exception: " + str(e))
+    finally:
+      # Close the socket if open
+      if s: s.close()
+
 ## End of Classes #############################################################
 
 def connect():
@@ -1028,8 +1170,8 @@
      TorCtl.EVENT_TYPE.NS,	  
      TorCtl.EVENT_TYPE.NEWDESC], True)
   # Set options: We attach streams now & build circuits
+  conn.set_option("__DisablePredictedCircuits", "1")
   conn.set_option("__LeaveStreamsUnattached", "1")
-  conn.set_option("__DisablePredictedCircuits", "1")
 
 def startup(argv):
   # Connect to Tor process
@@ -1040,14 +1182,14 @@
   configure(conn)
   # Get the size of the circuit-pool from config
   num_circs = config.getint(CIRC_MANAGEMENT, "idle_circuits")
-  # Set an EventHandler to the connection
+  # Set an EventHandler to the connection 
   if ping_circs:
     if network_model:
       handler = PingHandler(conn, __selmgr, num_circs, 
          GeoIPSupport.GeoIPRouter, True)
     else:
       handler = PingHandler(conn, __selmgr, num_circs, 
-         GeoIPSupport.GeoIPRouter)  
+         GeoIPSupport.GeoIPRouter)
   else:
     # No pings, only a StreamHandler
     handler = PathSupport.StreamHandler(conn, __selmgr, num_circs, 
@@ -1075,7 +1217,7 @@
 
 def simulate(n):
   """ Simulate circuit creations """
-  plog("INFO", "Starting simulation ..")
+  plog("INFO", "Running a simulation ..")
   # Connect to Tor process
   conn = connect()
   setup_location(conn)
@@ -1083,13 +1225,13 @@
   path_list = []
   # Instantiate a PathBuilder
   path_builder = PathSupport.PathBuilder(conn, __selmgr, GeoIPSupport.GeoIPRouter)
-  plog("INFO", "Creating "+str(n)+" paths")
+  plog("INFO", "Generating "+str(n)+" paths")
   if network_model:
     model = NetworkModel(path_builder.routers)
     model.set_target("255.255.255.255", 80, max_rtt)
     model.generate_proposals()
-    # Give weights for single scores (RTT, advertised BW)
-    model.update_ranking(1, 1)
+    # TODO: Set weights for single scores (RTT, advertised BW) here!
+    model.update_ranking(1, 0)
     while n > 0:
       # Probabilistic selection
       choice = model.weighted_selection(lambda x: x.ranking_index)
@@ -1108,7 +1250,7 @@
   sys.exit(1)
 
 def evaluate(path_list):
-  """ Currently evaluates only lists of 3-hop paths """
+  """ Currently evaluates lists of 3-hop paths only """
   import sets
   entries = sets.Set()
   middles = sets.Set()
@@ -1205,7 +1347,7 @@
   return max_entropy
 
 if __name__ == '__main__':
-  plog("INFO", "OP-Addon v" + VERSION)
+  plog("INFO", "Starting OP-Addon v" + VERSION)
   if SIMULATE:
     if len(sys.argv) == 3:
       simulate(10)

Modified: torflow/trunk/pathrc.example
===================================================================
--- torflow/trunk/pathrc.example	2007-10-30 08:59:34 UTC (rev 12278)
+++ torflow/trunk/pathrc.example	2007-10-30 10:41:28 UTC (rev 12279)
@@ -25,12 +25,12 @@
 use_all_exits = yes
 
 # UniformGenerator with optionally ordered exits,
-# (uniform = no) means bandwidth-weighted selection
+# (uniform = no) --> bandwidth-weighted selection
 uniform = no
 order_exits = no
 
 # Make use of guard-nodes (yes|no) or a specific 
-# exit node (nickname or IDHex)
+# exit node (nickname or IDHex) for all paths
 use_guards = yes
 #use_exit = xyz
 
@@ -42,8 +42,16 @@
 
 # yes|no for unique|distinct countries,
 # ! comment to don't care
-#unique_countries = yes
+unique_countries = yes
 
+# Maximum number of continent crossings: 0-n
+# ! comment out to enforce distinct continents
+# ! set >= pathlen to not care about
+continent_crossings = 2
+# Maximum number of ocean crossings: 0-n
+# ! comment out to don't care
+ocean_crossings = 1
+
 # If echelon is set, OP-Addon will try to find an 
 # exit in the destination country of the current 
 # request (exit_country may be used as backup)
@@ -55,30 +63,26 @@
 #middle_country = RU
 #exit_country = US
 
-# Maximum number of continent-crossings: 0-n
-# ! comment out to enforce distinct continents
-# ! set >= pathlen to not care about
-continent_crossings = 2
-# Maximum number of ocean crossings: 0-n
-# ! comment out to don't care
-ocean_crossings = 0
-
 # TODO: excludes = [".."]
 
-[TESTING]
+[EVALUATE]
 
-# Testing mode: Close every circuit after testing
+# Evaluation mode: close every circuit after measuring performance
 # yes|no
-testing_mode = no
+evaluate = no
 
 # Number of latency-tests per circuit (int: 0-n)
-num_rtt_tests = 5
+num_rtt_tests = 3
+# Number of bandwidth-tests per circuit (int:0 or 1)
+# Requires stream-server.pl listening on the same host
+num_bw_tests = 0
+
 # Amount of circuits to test (int)
 num_records = 300
 
 [RTT]
 
-# Measure latencies of complete circuits
+# Ping the latencies of complete circuits
 # yes|no
 ping_circs = yes
 
@@ -86,34 +90,34 @@
 socks_host = 127.0.0.1
 socks_port = 9050
 
-# Host and port dummies to be used 
+# Host- and port-dummies to be used 
 # for ping-connections
 ping_dummy_host = 127.0.0.1
 ping_dummy_port = 100
 
 # Time interval to wait before triggering
-# pings and frequency in seconds (float)
+# pings and frequency of pings in seconds (float)
 initial_interval = 10
 frequency = 5
 
-# Close circ after n timeouts on measurings
-# Set to 0 to not close circs (int)
+# Close a circuit after n timeouts on measurings
+# Set to 0 to never close circs (int)
 timeout_limit = 1
 
 [MODEL]
 
-# Set to True if you want to measure latencies of single
-# links and enable circuit creation from the model
+# Set to 'yes' to measure latencies of single links 
+# and enable circuit creation from the model
 # yes|no
 network_model = no
 
-# RTT-threshhold in seconds when creating circs (float):
-#   0:  no threshhold, choose from all proposals
-max_rtt = 1
-# Minimum number of proposals to choose from (int)
-min_proposals = 100
 # Min ratio of circs created with the backup-method,
 # controls growing of the model (float in [0,1])
 #   0:  no growing
 #   1:  growing only
 min_ratio = 0.5
+# RTT-threshhold in seconds when creating circs (float):
+#   0:  no threshhold, choose from all proposals
+max_rtt = 0
+# Minimum number of proposals to choose from (int)
+min_proposals = 100

Added: torflow/trunk/stream-server.pl
===================================================================
--- torflow/trunk/stream-server.pl	                        (rev 0)
+++ torflow/trunk/stream-server.pl	2007-10-30 10:41:28 UTC (rev 12279)
@@ -0,0 +1,44 @@
+#!/usr/bin/perl -w
+
+use strict;
+use IO::Socket::INET;
+
+# specify the port
+my $port = 8041;
+
+# create the socket
+my $server = IO::Socket::INET->new(Listen=>100, LocalPort=>$port, Proto=>'tcp', Reuse=>'yes');
+
+# set the number of bytes one line contains: 1024 Byte = 1 kB
+my $line_count = 1000000;
+
+# print some startup-information
+print "pid ".$$.": listening on port ".$server->sockport."\n";
+
+# main loop
+while(my $client = $server->accept) {
+	if(fork()) {
+		# parent
+		close($client);
+	} else {
+		# child
+		print "pid ".$$.": accepted connection from ".$client->peerhost."\n";
+		while(my $line = <$client>) {
+			if ($line =~ /(\d+)/) {
+				my $counter = $1;
+				while($counter>0) {
+					my $send = ($counter>$line_count) ? $line_count : $counter;
+					print $client "X" x $send;
+					print $client "\r\n";
+					$counter -= $send;
+				}
+			}
+			elsif ($line =~ m/close/) {
+				print "pid ".$$.": closing connection to ".$client->peerhost."\n";
+				close($client);
+				exit(0);
+			}
+		}
+		close($client);
+	}
+}


Property changes on: torflow/trunk/stream-server.pl
___________________________________________________________________
Name: svn:executable
   + *



More information about the tor-commits mailing list