[or-cvs] r19581: {} Implement aggregator to collect parallelized output from mul (torflow/trunk/NetworkScanners/BwAuthority)

mikeperry at seul.org mikeperry at seul.org
Fri May 29 10:49:49 UTC 2009


Author: mikeperry
Date: 2009-05-29 06:49:48 -0400 (Fri, 29 May 2009)
New Revision: 19581

Added:
   torflow/trunk/NetworkScanners/BwAuthority/aggregate.py
Log:

Implement aggregator to collect parallelized output from 
multiple scanners (section 9 of proposal 161). Alpha
factor not present (may fit better on Tor side).



Added: torflow/trunk/NetworkScanners/BwAuthority/aggregate.py
===================================================================
--- torflow/trunk/NetworkScanners/BwAuthority/aggregate.py	                        (rev 0)
+++ torflow/trunk/NetworkScanners/BwAuthority/aggregate.py	2009-05-29 10:49:48 UTC (rev 19581)
@@ -0,0 +1,135 @@
+#!/usr/bin/python
+import os
+import re
+import math
+import sys
+
+
+bw_files = {}
+nodes = {}
+
+def base10_round(bw_val):
+  # This keeps the first 3 decimal digits of the bw value only
+  # to minimize changes for consensus diffs.
+  # Resulting error is +/-0.05%
+  return round(bw_val,-(int(math.log10(bw_val))-2))
+
+def closest_to_one(ratio_list):
+  min_dist = 0x7fffffff
+  min_item = -1
+  for i in xrange(len(ratio_list)):
+    if abs(1.0-ratio_list[i]) < min_dist:
+      min_dist = abs(1.0-ratio_list[i])
+      min_item = i
+  return min_item
+
+class Node:
+  def __init__(self):
+    self.idhex = None
+    self.strm_bw = []
+    self.filt_bw = []
+    self.ns_bw = []
+    self.chosen_sbw = None
+    self.chosen_fbw = None
+    self.sbw_ratio = None
+    self.fbw_ratio = None
+    self.ratio = None
+    self.new_bw = None
+
+  def add_line(self, line):
+    if self.idhex != line.idhex:
+      raise Exception("Line mismatch")
+    self.strm_bw.append(line.strm_bw)     
+    self.filt_bw.append(line.filt_bw)     
+    self.ns_bw.append(line.ns_bw)     
+
+  def avg_strm_bw(self):
+    return sum(self.strm_bw)/float(len(self.strm_bw))
+
+  def avg_filt_bw(self):
+    return sum(self.filt_bw)/float(len(self.filt_bw))
+
+  def avg_ns_bw(self):
+    return sum(self.ns_bw)/float(len(self.ns_bw))
+
+  def choose_strm_bw(self, net_avg):
+    i = closest_to_one(map(lambda f: f/net_avg, self.strm_bw))
+    self.chosen_sbw = i
+    return self.chosen_sbw
+
+  def choose_filt_bw(self, net_avg):
+    i = closest_to_one(map(lambda f: f/net_avg, self.filt_bw))
+    self.chosen_fbw = i
+    return self.chosen_fbw
+
+class Line:
+  def __init__(self, line):
+    self.idhex = re.search("[\s]*node_id=([\S]+)[\s]*", line).group(0)
+    self.strm_bw = int(re.search("[\s]*strm_bw=([\d]+)[\s]*", line).group(0))
+    self.filt_bw = int(re.search("[\s]*filt_bw=([\d]+)[\s]*", line).group(0))
+    self.ns_bw = int(re.search("[\s]*ns_bw=([\d]+)[\s]*", line).group(0))
+
+def main(argv):
+  for d in argv[1:-1]:
+    # First, create a list of the most recent files in the
+    # scan dirs that are recent enough
+    for root, dirs, files in os.walk(d):
+      for f in files:
+        if f.find("-done-"):
+          fp = file(f, "r")
+          ranks = fp.readline()
+          timestamp = float(fp.readline())
+          fp.close()
+          if ranks not in bw_files or bw_files[ranks][0] < timestamp:
+            bw_files[ranks] = (timestamp, f)
+  
+  for (t,f) in bw_files.itervalues():
+    fp = file(f, "r")
+    fp.readline()
+    fp.readline()
+    for l in fp.readlines():
+      line = Line(l)
+      if line.idhex not in nodes:
+        n = Node()
+        nodes[line.idhex] = n
+      else:
+        n = nodes[line.idhex]
+      n.add_line(line)        
+    fp.close()
+   
+  pre_strm_avg = sum(map(lambda n: n.avg_strm_bw(), nodes.itervalues()))/ \
+                  float(len(nodes))
+  pre_filt_avg = sum(map(lambda n: n.avg_filt_bw(), nodes.itervalues()))/ \
+                  float(len(nodes))
+
+  for n in nodes.itervalues():
+    n.choose_strm_bw(pre_strm_avg) 
+    n.choose_filt_bw(pre_filt_avg)
+
+  true_strm_avg = sum(map(lambda n: n.chosen_sbw, nodes.itervalues()))/ \
+                  float(len(nodes))
+  true_filt_avg = sum(map(lambda n: n.chosen_fbw, nodes.itervalues()))/ \
+                  float(len(nodes))
+
+  for n in nodes.itervalues():
+    n.fbw_ratio = n.filt_bw[n.chosen_fbw]/true_filt_avg
+    n.sbw_ratio = n.strm_bw[n.chosen_sbw]/true_strm_avg
+    if closest_to_one((n.sbw_ratio, n.fbw_ratio)) == 0:
+      n.ratio = n.sbw_ratio
+      n.new_bw = n.ns_bw[n.chosen_sbw]*n.ratio
+    else: 
+      n.ratio = n.fbw_ratio
+      n.new_bw = n.ns_bw[n.chosen_fbw]*n.ratio
+
+  n_print = nodes.values()
+  n_print.sort(lambda x,y: x.new_bw < y.new_bw)
+
+  oldest_timestamp = min(map(lambda (t,f): t, bw_files.itervalues()))
+  out = file(argv[-1], "w")
+  out.write(str(int(round(oldest_timestamp,0)))+"\n")
+  for n in n_print:
+    out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+"\n")
+  out.close()
+ 
+if __name__ == "__main__":
+  main(sys.argv)


Property changes on: torflow/trunk/NetworkScanners/BwAuthority/aggregate.py
___________________________________________________________________
Added: svn:executable
   + *



More information about the tor-commits mailing list