[or-cvs] r19644: {torflow} Track slice source timestamp information on a per-node basis (torflow/trunk/NetworkScanners/BwAuthority)

mikeperry at seul.org mikeperry at seul.org
Sun Jun 7 02:29:01 UTC 2009


Author: mikeperry
Date: 2009-06-06 22:29:01 -0400 (Sat, 06 Jun 2009)
New Revision: 19644

Modified:
   torflow/trunk/NetworkScanners/BwAuthority/aggregate.py
   torflow/trunk/NetworkScanners/BwAuthority/run_scan.sh
Log:

Track slice source timestamp information on a per-node basis
instead of per file. We were missing nodes that switched
slices during scans.



Modified: torflow/trunk/NetworkScanners/BwAuthority/aggregate.py
===================================================================
--- torflow/trunk/NetworkScanners/BwAuthority/aggregate.py	2009-06-07 00:24:07 UTC (rev 19643)
+++ torflow/trunk/NetworkScanners/BwAuthority/aggregate.py	2009-06-07 02:29:01 UTC (rev 19644)
@@ -9,7 +9,8 @@
 from TorCtl.TorUtil import plog
 from TorCtl import TorCtl,TorUtil
 
-bw_files = {}
+bw_files = []
+timestamps = {}
 nodes = {}
 prev_consensus = {}
 ALPHA = 0.3333 # Prev consensus values count for 1/3 of the avg 
@@ -29,29 +30,51 @@
       min_item = i
   return min_item
 
+class NodeData:
+  def __init__(self, timestamp):
+    self.strm_bw = []
+    self.filt_bw = []
+    self.ns_bw = []
+    self.timestamp = timestamp
+
 class Node:
   def __init__(self):
+    self.node_data = {}
     self.idhex = None
     self.nick = None
-    self.strm_bw = []
-    self.filt_bw = []
-    self.ns_bw = []
     self.chosen_sbw = None
     self.chosen_fbw = None
     self.sbw_ratio = None
     self.fbw_ratio = None
     self.ratio = None
     self.new_bw = None
+    self.strm_bw = []
+    self.filt_bw = []
+    self.ns_bw = []
 
   def add_line(self, line):
     if self.idhex and self.idhex != line.idhex:
       raise Exception("Line mismatch")
     self.idhex = line.idhex
     self.nick = line.nick
-    self.strm_bw.append(line.strm_bw)     
-    self.filt_bw.append(line.filt_bw)     
-    self.ns_bw.append(line.ns_bw)     
+    if line.slice_file not in self.node_data \
+      or self.node_data[line.slice_file].timestamp < line.timestamp:
+      self.node_data[line.slice_file] = NodeData(line.timestamp)
 
+    nd = self.node_data[line.slice_file]
+    nd.strm_bw.append(line.strm_bw)
+    nd.filt_bw.append(line.filt_bw)
+    nd.ns_bw.append(line.ns_bw)
+
+    self.strm_bw = []
+    self.filt_bw = []
+    self.ns_bw = []
+
+    for nd in self.node_data.itervalues():
+      self.strm_bw.extend(nd.strm_bw)
+      self.filt_bw.extend(nd.filt_bw)
+      self.ns_bw.extend(nd.ns_bw)
+
   def avg_strm_bw(self):
     return sum(self.strm_bw)/float(len(self.strm_bw))
 
@@ -72,12 +95,14 @@
     return self.chosen_fbw
 
 class Line:
-  def __init__(self, line):
+  def __init__(self, line, slice_file, timestamp):
     self.idhex = re.search("[\s]*node_id=([\S]+)[\s]*", line).group(1)
     self.nick = re.search("[\s]*nick=([\S]+)[\s]*", line).group(1)
     self.strm_bw = int(re.search("[\s]*strm_bw=([\S]+)[\s]*", line).group(1))
     self.filt_bw = int(re.search("[\s]*filt_bw=([\S]+)[\s]*", line).group(1))
     self.ns_bw = int(re.search("[\s]*ns_bw=([\S]+)[\s]*", line).group(1))
+    self.slice_file = slice_file
+    self.timestamp = timestamp
 
 def main(argv):
   TorUtil.read_config(argv[1]+"/scanner.1/bwauthority.cfg")
@@ -85,44 +110,59 @@
   s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   s.connect((TorUtil.control_host,TorUtil.control_port))
   c = TorCtl.Connection(s)
+  c.debug(file(argv[1]+"/aggregate-control.log", "w", buffering=0))
   c.authenticate_cookie(file(argv[1]+"/scanner.1/tor-data/control_auth_cookie",
-                              "r"))
+                         "r"))
+
   ns_list = c.get_network_status()
+  ns_list.sort(lambda x, y: x.bandwidth < y.bandwidth)
+  got_ns_bw = False
   for n in ns_list:
     if n.bandwidth == None:
-      plog("ERROR", "Your Tor is not providing NS w bandwidths!")
-      sys.exit(0)
+      plog("NOTICE", "Your Tor is not providing NS w bandwidths for "+n.idhex)
+    else:
+      got_ns_bw = True
+    n.measured = False
     prev_consensus["$"+n.idhex] = n
 
+  if not got_ns_bw:
+    # Sometimes the consensus lacks a descriptor. In that case,
+    # it will skip outputting 
+    plog("ERROR", "Your Tor is not providing NS w bandwidths!")
+    sys.exit(0)
+
   for da in argv[1:-1]:
     # First, create a list of the most recent files in the
     # scan dirs that are recent enough
     for root, dirs, f in os.walk(da):
       for ds in dirs:
-        print ds
         if re.match("^scanner.[\d+]$", ds):
-          print ds
           for sr, sd, files in os.walk(da+"/"+ds+"/scan-data"):
             for f in files:
               if re.search("^bws-[\S]+-done-", f):
-                print sr+"/"+f
                 found_done = True
                 fp = file(sr+"/"+f, "r")
                 slicenum = sr+"/"+fp.readline()
                 timestamp = float(fp.readline())
                 fp.close()
-                if slicenum not in bw_files \
-                       or bw_files[slicenum][0] < timestamp:
-                  bw_files[slicenum] = (timestamp, sr+"/"+f)
-          
-  
-  for (t,f) in bw_files.itervalues():
+                bw_files.append((slicenum, timestamp, sr+"/"+f))
+                if slicenum not in timestamps or \
+                     timestamps[slicenum] < timestamp:
+                  timestamps[slicenum] = timestamp
+
+  # FIXME: Hrmm.. there may be edge cases here where we have
+  # an extra slice number that is never scanned again (ie the network
+  # shrinks). This will leave us with really old timestamps.
+  oldest_timestamp = min(timestamps.itervalues())
+
+  # Need to only use most recent slice-file for each node..
+  for (s,t,f) in bw_files:
     fp = file(f, "r")
     fp.readline() # slicenum
     fp.readline() # timestamp
     for l in fp.readlines():
       try:
-        line = Line(l)
+        line = Line(l,s,t)
         if line.idhex not in nodes:
           n = Node()
           nodes[line.idhex] = n
@@ -139,12 +179,12 @@
                   float(len(nodes))
 
   for n in nodes.itervalues():
-    n.choose_strm_bw(pre_strm_avg) 
+    n.choose_strm_bw(pre_strm_avg)
     n.choose_filt_bw(pre_filt_avg)
 
-  true_strm_avg = sum(map(lambda n: n.strm_bw[n.chosen_sbw], 
+  true_strm_avg = sum(map(lambda n: n.strm_bw[n.chosen_sbw],
                        nodes.itervalues()))/float(len(nodes))
-  true_filt_avg = sum(map(lambda n: n.filt_bw[n.chosen_fbw], 
+  true_filt_avg = sum(map(lambda n: n.filt_bw[n.chosen_fbw],
                        nodes.itervalues()))/float(len(nodes))
 
   for n in nodes.itervalues():
@@ -156,13 +196,17 @@
     else:
       n.ratio = n.fbw_ratio
       n.new_bw = n.ns_bw[n.chosen_fbw]*n.ratio
-    if n.idhex in prev_consensus:
+    if n.idhex in prev_consensus and prev_consensus[n.idhex].bandwidth != None:
+      prev_consensus[n.idhex].measured = True
       n.new_bw = ((prev_consensus[n.idhex].bandwidth*ALPHA + n.new_bw)/(ALPHA + 1))/1024.0
 
+  for n in prev_consensus.itervalues():
+    if not n.measured:
+      plog("INFO", "Didn't measure "+n.idhex+"="+n.nickname)
+
   n_print = nodes.values()
   n_print.sort(lambda x,y: int(x.new_bw) - int(y.new_bw))
 
-  oldest_timestamp = min(map(lambda (t,f): t, bw_files.itervalues()))
   out = file(argv[-1], "w")
   out.write(str(int(round(oldest_timestamp,0)))+"\n")
   for n in n_print:

Modified: torflow/trunk/NetworkScanners/BwAuthority/run_scan.sh
===================================================================
--- torflow/trunk/NetworkScanners/BwAuthority/run_scan.sh	2009-06-07 00:24:07 UTC (rev 19643)
+++ torflow/trunk/NetworkScanners/BwAuthority/run_scan.sh	2009-06-07 02:29:01 UTC (rev 19644)
@@ -7,7 +7,7 @@
 #      git branch --track rs-format-fix mikeperry/rs-format-fix
 #      git checkout rs-format-fix
 TOR_EXE=../../../tor.git/src/or/tor
-#PYTHONPATH=../../../SQLAlchemy-0.5.4p2/lib
+PYTHONPATH=../../../SQLAlchemy-0.5.4p2/lib
 
 for i in data/scanner.*
 do



More information about the tor-commits mailing list