[tor-commits] [torflow/master] Move more parameters to consensus.

mikeperry at torproject.org mikeperry at torproject.org
Thu Nov 17 23:50:09 UTC 2011


commit 13aa3b4268a98a07e76e8ba9cb8cfccac39ff2c0
Author: Mike Perry <mikeperry-git at fscked.org>
Date:   Sun Nov 13 17:45:01 2011 -0800

    Move more parameters to consensus.
---
 NetworkScanners/BwAuthority/aggregate.py |  103 +++++++++++++++++++++++-------
 1 files changed, 79 insertions(+), 24 deletions(-)

diff --git a/NetworkScanners/BwAuthority/aggregate.py b/NetworkScanners/BwAuthority/aggregate.py
index 1746256..9bed1d0 100755
--- a/NetworkScanners/BwAuthority/aggregate.py
+++ b/NetworkScanners/BwAuthority/aggregate.py
@@ -23,22 +23,25 @@ IGNORE_GUARDS = 0
 # rate for guard nodes
 GUARD_SAMPLE_RATE = 2*7*24*60*60 # 2wks
 
-# PID constants
-# See https://en.wikipedia.org/wiki/PID_controller#Ideal_versus_standard_PID_form
+# PID constant defaults. May be overridden by consensus
+# https://en.wikipedia.org/wiki/PID_controller#Ideal_versus_standard_PID_form
 K_p = 1.0
 
 # We expect to correct steady state error in 5 samples (guess)
 T_i = 5.0
 
+# T_i_decay is a weight factor to govern how fast integral sums
+# decay. For the values of T_i that we care about, T_i_decay represents
+# the fraction of integral sum that is eliminated after T_i sample rounds.
+# This decay is non-standard, but we do it to avoid overflow
+T_i_decay = 0.5
+
 # We can only expect to predict less than one sample into the future, as
 # after 1 sample, clients will have migrated
-# FIXME: Our prediction ability is a function of the consensus time
+# FIXME: Our prediction ability is a function of the consensus uptake time
 # vs measurement rate
 T_d = 0.5
 
-K_i = K_p/T_i
-K_d = K_p*T_d
-
 NODE_CAP = 0.05
 
 MIN_REPORT = 60 # Percent of the network we must measure before reporting
@@ -94,6 +97,7 @@ class Node:
     self.circ_fail_rate = 0
     self.strm_fail_rate = 0
 
+  # FIXME: Need to set pid_error_sum.. It is getting lost when we don't vote
   def revert_to_vote(self, vote):
     self.new_bw = vote.bw*1000
     self.pid_bw = vote.pid_bw
@@ -101,17 +105,17 @@ class Node:
     self.measured_at = vote.measured_at
 
   # Derivative of error for pid control
-  def get_pid_bw(self, prev_vote, kp):
+  def get_pid_bw(self, prev_vote, kp, ki, kd, kidecay):
     self.prev_error = prev_vote.pid_error
     self.prev_measured_at = prev_vote.measured_at
-    # We decay the interval by 1/T_i each round to keep it bounded.
-    # This is non-standard
-    self.pid_error_sum = prev_vote.pid_error_sum*(1 - 1.0/T_i) + self.pid_error
+    # We decay the interval each round to keep it bounded.
+    # This decay is non-standard. We do it to avoid overflow
+    self.pid_error_sum = prev_vote.pid_error_sum*kidecay + self.pid_error
 
     self.pid_bw = self.ns_bw \
-             + kp*(self.ns_bw*self.pid_error \
-             +     self.ns_bw*self.integral_error()/T_i \
-             +     self.ns_bw*self.d_error_dt()*T_d)
+                             + kp*self.ns_bw*self.pid_error \
+                             + ki*self.ns_bw*self.integral_error() \
+                             + kd*self.ns_bw*self.d_error_dt()
     return self.pid_bw
 
   # Time-weighted sum of error per unit of time (measurement sample)
@@ -139,8 +143,7 @@ class Node:
       self.filt_bw = line.filt_bw
       self.ns_bw = line.ns_bw
       self.desc_bw = line.desc_bw
-      # XXX: Temporary test
-      self.circ_fail_rate = 0.0 #line.circ_fail_rate
+      self.circ_fail_rate = line.circ_fail_rate
       self.strm_fail_rate = line.strm_fail_rate
 
 class Line:
@@ -197,16 +200,54 @@ class ConsensusJunk:
   def __init__(self, c):
     cs_bytes = c.sendAndRecv("GETINFO dir/status-vote/current/consensus\r\n")[0][2]
     self.bwauth_pid_control = False
+    self.use_circ_fails = False
+    self.use_best_ratio = False
+
+    self.K_p = K_p
+    self.T_i = T_i
+    self.T_d = T_d
+    self.T_i_decay = T_i_decay
+
     try:
       cs_params = re.search("^params ((?:[\S]+=[\d]+[\s]?)+)",
                                      cs_bytes, re.M).group(1).split()
       for p in cs_params:
         if p == "bwauthpid=1":
           self.bwauth_pid_control = True
+        elif p == "bwauthcircs=1":
+          self.use_circ_fails = True
+          plog("INFO", "Counting circuit failures")
+        elif p == "bwauthbestratio=1":
+          self.use_best_ratio = True
+          plog("INFO", "Choosing larger of sbw vs fbw")
+        elif p.startswith("bwauthkp="):
+          self.K_p = int(p.split("=")[1])/10000.0
+          plog("INFO", "Got K_p=%f from consensus." % self.K_p)
+        elif p.startswith("bwauthti="):
+          self.T_i = (int(p.split("=")[1])/10000.0)
+          plog("INFO", "Got T_i=%f from consensus." % self.T_i)
+        elif p.startswith("bwauthtd="):
+          self.T_d = (int(p.split("=")[1])/10000.0)
+          plog("INFO", "Got T_d=%f from consensus." % self.T_d)
+        elif p.startswith("bwauthtidecay="):
+          self.T_i_decay = (int(p.split("=")[1])/10000.0)
+          plog("INFO", "Got T_i_decay=%f from consensus." % self.T_i_decay)
     except:
       plog("NOTICE", "Bw auth PID control disabled due to parse error.")
       traceback.print_exc()
 
+    if self.T_i == 0:
+      self.K_i = 0
+      self.K_i_decay = 0
+    else:
+      self.K_i = self.K_p/self.T_i
+      self.K_i_decay = (1.0-self.T_i_decay/self.T_i)
+
+    self.K_d = self.K_p*self.T_d
+
+    plog("INFO", "Got K_p=%f K_i=%f K_d=%f K_i_decay=%f" %
+                  (self.K_p, self.K_i, self.K_d, self.K_i_decay))
+
     self.bw_weights = {}
     try:
       bw_weights = re.search("^bandwidth-weights ((?:[\S]+=[\d]+[\s]?)+)",
@@ -355,6 +396,11 @@ def main(argv):
     plog("NOTICE", "No scan results yet.")
     sys.exit(1)
 
+  if not cs_junk.use_circ_fails:
+    plog("INFO", "Ignoring circuit failures")
+    for n in nodes.itervalues():
+      n.circ_fail_rate = 0.0
+
   if cs_junk.bwauth_pid_control:
     # Penalize nodes for circuit failure: it indicates CPU pressure
     # TODO: Potentially penalize for stream failure, if we run into
@@ -406,10 +452,7 @@ def main(argv):
 
     if cs_junk.bwauth_pid_control:
       # Penalize nodes for circ failure rate
-      # n.pid_error = (n.filt_bw*(1.0-n.circ_fail_rate) - true_filt_avg)/true_filt_avg
-
-      # FIXME: For now, let's try this larger-ratio thing..
-      if n.sbw_ratio > n.fbw_ratio:
+      if cs_junk.use_best_ratio and n.sbw_ratio > n.fbw_ratio:
         n.pid_error = (n.strm_bw*(1.0-n.circ_fail_rate) - true_strm_avg)/true_strm_avg
       else:
         n.pid_error = (n.filt_bw*(1.0-n.circ_fail_rate) - true_filt_avg)/true_filt_avg
@@ -426,9 +469,14 @@ def main(argv):
             # Do full feedback if our previous vote > 2.5 weeks old
             if n.idhex not in prev_votes.vote_map or \
                 n.measured_at - prev_votes.vote_map[n.idhex].measured_at > GUARD_SAMPLE_RATE:
-              n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex], K_p)
+              n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex],
+                                      cs_junk.K_p,
+                                      cs_junk.K_i,
+                                      cs_junk.K_d,
+                                      cs_junk.K_i_decay)
             else:
               pid_error = n.pid_error
+              # FIXME: We possibly lose the pid_error_sum here
               n.revert_to_vote(prev_votes.vote_map[n.idhex])
               # Don't use feedback here, but we might as well use our
               # new measurement against the previous vote.
@@ -436,11 +484,11 @@ def main(argv):
                 # This should no longer happen
                 plog("NOTICE", "Zero bw for Guard node "+n.nick+"="+n.idhex)
                 n.new_bw = prev_votes.vote_map[n.idhex].bw + \
-                       K_p*prev_votes.vote_map[n.idhex].bw*pid_error
+                       cs_junk.K_p*prev_votes.vote_map[n.idhex].bw*pid_error
                 n.pid_bw = n.new_bw
               else:
                 n.new_bw = prev_votes.vote_map[n.idhex].pid_bw + \
-                       K_p*prev_votes.vote_map[n.idhex].pid_bw*pid_error
+                       cs_junk.K_p*prev_votes.vote_map[n.idhex].pid_bw*pid_error
           else:
             # Everyone else should be pretty instantenous to respond.
             # Full feedback should be fine for them (we hope),
@@ -453,9 +501,16 @@ def main(argv):
               ("Guard" in prev_consensus[n.idhex].flags \
                and "Exit" in prev_consensus[n.idhex].flags):
               n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex],
-                                      K_p*(1.0-cs_junk.bw_weights["Wgd"]))
+                              cs_junk.K_p*(1.0-cs_junk.bw_weights["Wgd"]),
+                              cs_junk.K_i,
+                              cs_junk.K_d,
+                              cs_junk.K_i_decay)
             else:
-              n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex], K_p)
+              n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex],
+                              cs_junk.K_p,
+                              cs_junk.K_i,
+                              cs_junk.K_d,
+                              cs_junk.K_i_decay)
         else:
           # Reset values. Don't vote/sample this measurement round.
           n.revert_to_vote(prev_votes.vote_map[n.idhex])





More information about the tor-commits mailing list