[tor-commits] [torflow/master] Make way for fix for #1984.

mikeperry at torproject.org mikeperry at torproject.org
Thu Nov 17 23:50:09 UTC 2011


commit 5331580a114442db79a7af3fd767107e98b4969a
Author: Mike Perry <mikeperry-git at fscked.org>
Date:   Mon Oct 31 20:07:45 2011 -0700

    Make way for fix for #1984.
    
    Clean out some annoying garbage from a previous filtering method.
---
 NetworkScanners/BwAuthority/aggregate.py |  181 +++++++----------------------
 1 files changed, 44 insertions(+), 137 deletions(-)

diff --git a/NetworkScanners/BwAuthority/aggregate.py b/NetworkScanners/BwAuthority/aggregate.py
index 8658d33..0e20c5b 100755
--- a/NetworkScanners/BwAuthority/aggregate.py
+++ b/NetworkScanners/BwAuthority/aggregate.py
@@ -69,34 +69,11 @@ def base10_round(bw_val):
       return 1
     return ret
 
-
-
-def closest_to_one(ratio_list):
-  min_dist = 0x7fffffff
-  min_item = -1
-  for i in xrange(len(ratio_list)):
-    if abs(1.0-ratio_list[i]) < min_dist:
-      min_dist = abs(1.0-ratio_list[i])
-      min_item = i
-  return min_item
-
-class NodeData:
-  def __init__(self, timestamp):
-    self.strm_bw = []
-    self.filt_bw = []
-    self.ns_bw = []
-    self.desc_bw = []
-    self.timestamp = timestamp
-
 class Node:
   def __init__(self):
-    self.node_data = {}
     self.ignore = False
     self.idhex = None
     self.nick = None
-    self.chosen_time = None
-    self.chosen_sbw = None
-    self.chosen_fbw = None
     self.sbw_ratio = None
     self.fbw_ratio = None
     self.pid_bw = 0
@@ -108,17 +85,20 @@ class Node:
     self.ratio = None
     self.new_bw = None
     self.change = None
-    self.bw_idx = 0
-    self.strm_bw = []
-    self.filt_bw = []
-    self.ns_bw = []
-    self.desc_bw = []
-    self.timestamps = []
+
+    # measurement vars from bwauth lines
+    self.measured_at = 0
+    self.strm_bw = 0
+    self.filt_bw = 0
+    self.ns_bw = 0
+    self.desc_bw = 0
+    self.circ_fail_rate = 0
+    self.strm_fail_rate = 0
 
   def revert_to_vote(self, vote):
     self.new_bw = vote.bw
     self.pid_error = vote.pid_error
-    self.chosen_time = vote.measured_at
+    self.measured_at = vote.measured_at
 
   # Derivative of error for pid control
   def get_pid_bw(self, prev_vote, kp):
@@ -128,10 +108,10 @@ class Node:
     # This is non-standard
     self.pid_error_sum = prev_vote.pid_error_sum*(1 - 1.0/T_i) + self.pid_error
 
-    self.pid_bw = self.ns_bw[self.bw_idx] \
-             + kp*(self.ns_bw[self.bw_idx]*self.pid_error \
-             +     self.ns_bw[self.bw_idx]*self.integral_error()/T_i \
-             +     self.ns_bw[self.bw_idx]*self.d_error_dt()*T_d)
+    self.pid_bw = self.ns_bw \
+             + kp*(self.ns_bw*self.pid_error \
+             +     self.ns_bw*self.integral_error()/T_i \
+             +     self.ns_bw*self.d_error_dt()*T_d)
     return self.pid_bw
 
   # Time-weighted sum of error per unit of time (measurement sample)
@@ -153,74 +133,14 @@ class Node:
       raise Exception("Line mismatch")
     self.idhex = line.idhex
     self.nick = line.nick
-    if line.slice_file not in self.node_data \
-      or self.node_data[line.slice_file].timestamp < line.timestamp:
-      self.node_data[line.slice_file] = NodeData(line.timestamp)
-
-    # TODO: This is kinda nutty. Can we simplify? For instance,
-    # do these really need to be lists inside the nd?
-    nd = self.node_data[line.slice_file]
-    nd.strm_bw.append(line.strm_bw)
-    nd.filt_bw.append(line.filt_bw)
-    nd.ns_bw.append(line.ns_bw)
-    nd.desc_bw.append(line.desc_bw)
-
-    self.strm_bw = []
-    self.filt_bw = []
-    self.ns_bw = []
-    self.desc_bw = []
-    self.timestamps = []
-
-    for nd in self.node_data.itervalues():
-      self.strm_bw.extend(nd.strm_bw)
-      self.filt_bw.extend(nd.filt_bw)
-      self.ns_bw.extend(nd.ns_bw)
-      self.desc_bw.extend(nd.desc_bw)
-      for i in xrange(len(nd.ns_bw)):
-        self.timestamps.append(nd.timestamp)
-
-  def avg_strm_bw(self):
-    return sum(self.strm_bw)/float(len(self.strm_bw))
-
-  def avg_filt_bw(self):
-    return sum(self.filt_bw)/float(len(self.filt_bw))
-
-  def avg_ns_bw(self):
-    return sum(self.ns_bw)/float(len(self.ns_bw))
-
-  def avg_desc_bw(self):
-    return sum(self.desc_bw)/float(len(self.desc_bw))
-
-  # This can be bad for bootstrapping or highly bw-variant nodes... 
-  # we will choose an old measurement in that case.. We need
-  # to build some kind of time-bias here..
-  def _choose_strm_bw_one(self, net_avg):
-    i = closest_to_one(map(lambda f: f/net_avg, self.strm_bw))
-    self.chosen_sbw = i
-    return self.chosen_sbw
-
-  def _choose_filt_bw_one(self, net_avg):
-    i = closest_to_one(map(lambda f: f/net_avg, self.filt_bw))
-    self.chosen_fbw = i
-    return self.chosen_fbw
-
-  # Simply return the most recent one instead of this
-  # closest-to-one stuff
-  def choose_filt_bw(self, net_avg):
-    max_idx = 0
-    for i in xrange(len(self.timestamps)):
-      if self.timestamps[i] > self.timestamps[max_idx]:
-        max_idx = i
-    self.chosen_fbw = max_idx
-    return self.chosen_fbw
-
-  def choose_strm_bw(self, net_avg):
-    max_idx = 0
-    for i in xrange(len(self.timestamps)):
-      if self.timestamps[i] > self.timestamps[max_idx]:
-        max_idx = i
-    self.chosen_sbw = max_idx
-    return self.chosen_sbw
+    if line.measured_at > self.measured_at:
+      self.measured_at = line.measured_at
+      self.strm_bw = line.strm_bw
+      self.filt_bw = line.filt_bw
+      self.ns_bw = line.ns_bw
+      self.desc_bw = line.desc_bw
+      self.circ_fail_rate = line.circ_fail_rate
+      self.strm_fail_rate = line.strm_fail_rate
 
 class Line:
   def __init__(self, line, slice_file, timestamp):
@@ -231,7 +151,13 @@ class Line:
     self.ns_bw = int(re.search("[\s]*ns_bw=([\S]+)[\s]*", line).group(1))
     self.desc_bw = int(re.search("[\s]*desc_bw=([\S]+)[\s]*", line).group(1))
     self.slice_file = slice_file
-    self.timestamp = timestamp
+    self.measured_at = timestamp
+    try:
+      self.circ_fail_rate = float(re.search("[\s]*circ_fail_rate=([\S]+)[\s]*", line).group(1))
+      self.strm_fail_rate = float(re.search("[\s]*strm_fail_rate=([\S]+)[\s]*", line).group(1))
+    except:
+      self.circ_fail_rate = 0
+      self.strm_fail_rate = 0
 
 class Vote:
   def __init__(self, line):
@@ -391,25 +317,10 @@ def main(argv):
   if len(nodes) == 0:
     plog("NOTICE", "No scan results yet.")
     sys.exit(1)
- 
-  pre_strm_avg = sum(map(lambda n: n.avg_strm_bw(), nodes.itervalues()))/ \
-                  float(len(nodes))
-  pre_filt_avg = sum(map(lambda n: n.avg_filt_bw(), nodes.itervalues()))/ \
-                  float(len(nodes))
-
-  plog("DEBUG", "Network pre_strm_avg: "+str(pre_strm_avg))
-  plog("DEBUG", "Network pre_filt_avg: "+str(pre_filt_avg))
 
-  for n in nodes.itervalues():
-    n.choose_strm_bw(pre_strm_avg)
-    n.choose_filt_bw(pre_filt_avg)
-    plog("DEBUG", "Node "+n.nick+" chose sbw: "+\
-                str(n.strm_bw[n.chosen_sbw])+" fbw: "+\
-                str(n.filt_bw[n.chosen_fbw]))
-
-  true_strm_avg = sum(map(lambda n: n.strm_bw[n.chosen_sbw],
+  true_strm_avg = sum(map(lambda n: n.strm_bw,
                        nodes.itervalues()))/float(len(nodes))
-  true_filt_avg = sum(map(lambda n: n.filt_bw[n.chosen_fbw],
+  true_filt_avg = sum(map(lambda n: n.filt_bw,
                        nodes.itervalues()))/float(len(nodes))
 
   plog("DEBUG", "Network true_strm_avg: "+str(true_strm_avg))
@@ -427,11 +338,11 @@ def main(argv):
       if n.idhex in prev_votes.vote_map and n.idhex in prev_consensus:
         if "Guard" in prev_consensus[n.idhex].flags:
           guard_cnt += 1
-          guard_measure_time += (n.timestamps[n.chosen_fbw] - \
+          guard_measure_time += (n.measured_at - \
                                   prev_votes.vote_map[n.idhex].measured_at)
         else:
           node_cnt += 1
-          node_measure_time += (n.timestamps[n.chosen_fbw] - \
+          node_measure_time += (n.measured_at - \
                                   prev_votes.vote_map[n.idhex].measured_at)
 
   # There is a difference between measure period and sample rate.
@@ -443,25 +354,21 @@ def main(argv):
 
   tot_net_bw = 0
   for n in nodes.itervalues():
-    n.fbw_ratio = n.filt_bw[n.chosen_fbw]/true_filt_avg
-    n.sbw_ratio = n.strm_bw[n.chosen_sbw]/true_strm_avg
+    n.fbw_ratio = n.filt_bw/true_filt_avg
+    n.sbw_ratio = n.strm_bw/true_strm_avg
     if n.sbw_ratio > n.fbw_ratio:
       # Does this ever happen?
       plog("NOTICE", "sbw > fbw for "+n.nick)
       n.ratio = n.sbw_ratio
-      n.bw_idx = n.chosen_sbw
-      n.pid_error = (n.strm_bw[n.chosen_sbw] - true_strm_avg)/true_strm_avg
+      n.pid_error = (n.strm_bw - true_strm_avg)/true_strm_avg
     else:
       n.ratio = n.fbw_ratio
-      n.bw_idx = n.chosen_fbw
-      n.pid_error = (n.filt_bw[n.chosen_fbw] - true_filt_avg)/true_filt_avg
-
-    n.chosen_time = n.timestamps[n.bw_idx]
+      n.pid_error = (n.filt_bw - true_filt_avg)/true_filt_avg
 
     if cs_junk.bwauth_pid_control:
       if n.idhex in prev_votes.vote_map:
         # If there is a new sample, let's use it for all but guards
-        if n.chosen_time > prev_votes.vote_map[n.idhex].measured_at:
+        if n.measured_at > prev_votes.vote_map[n.idhex].measured_at:
           # Nodes with the Guard flag will respond slowly to feedback,
           # so they should be sampled less often, and in proportion to
           # the appropriate Wgx weight.
@@ -470,7 +377,7 @@ def main(argv):
              and "Exit" not in prev_consensus[n.idhex].flags):
             # Do full feedback if our previous vote > 2.5 weeks old
             if n.idhex not in prev_votes.vote_map or \
-                n.chosen_time - prev_votes.vote_map[n.idhex].measured_at > GUARD_SAMPLE_RATE:
+                n.measured_at - prev_votes.vote_map[n.idhex].measured_at > GUARD_SAMPLE_RATE:
               n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex], K_p)
             else:
               pid_error = n.pid_error
@@ -497,7 +404,7 @@ def main(argv):
           # Reset values. Don't vote/sample this measurement round.
           n.revert_to_vote(prev_votes.vote_map[n.idhex])
       else: # No prev vote, pure consensus feedback this round
-        n.new_bw = n.ns_bw[n.bw_idx] + K_p*n.ns_bw[n.bw_idx]*n.pid_error
+        n.new_bw = n.ns_bw + K_p*n.ns_bw*n.pid_error
         n.pid_error_sum = n.pid_error
         n.pid_bw = n.new_bw
         plog("INFO", "No prev vote for node "+n.nick+": Consensus feedback")
@@ -505,9 +412,9 @@ def main(argv):
       n.pid_bw = 0
       n.pid_error = 0
       n.pid_error_sum = 0
-      n.new_bw = n.desc_bw[n.bw_idx]*n.ratio
+      n.new_bw = n.desc_bw*n.ratio
 
-    n.change = n.new_bw - n.desc_bw[n.bw_idx]
+    n.change = n.new_bw - n.desc_bw
 
     if n.idhex in prev_consensus:
       if prev_consensus[n.idhex].bandwidth != None:
@@ -535,7 +442,7 @@ def main(argv):
       n.pid_error_sum = 0 # Don't let unused error accumulate...
 
   # WTF is going on here?
-  oldest_timestamp = min(map(lambda n: n.chosen_time,
+  oldest_timestamp = min(map(lambda n: n.measured_at,
              filter(lambda n: n.idhex in prev_consensus,
                        nodes.itervalues())))
   plog("INFO", "Oldest measured node: "+time.ctime(oldest_timestamp))
@@ -577,7 +484,7 @@ def main(argv):
   # FIXME: Split out debugging data
   for n in n_print:
     if not n.ignore:
-      out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" nick="+n.nick+ " measured_at="+str(int(n.chosen_time))+" pid_error="+str(n.pid_error)+" pid_error_sum="+str(n.pid_error_sum)+" pid_bw="+str(n.pid_bw)+" pid_delta="+str(n.derror_dt)+"\n")
+      out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" nick="+n.nick+ " measured_at="+str(int(n.measured_at))+" pid_error="+str(n.pid_error)+" pid_error_sum="+str(n.pid_error_sum)+" pid_bw="+str(n.pid_bw)+" pid_delta="+str(n.derror_dt)+"\n")
   out.close()
 
 if __name__ == "__main__":





More information about the tor-commits mailing list