[tor-commits] [torflow/master] node sample rate idea.

mikeperry at torproject.org mikeperry at torproject.org
Thu Nov 17 23:50:09 UTC 2011


commit 34afb642f6e1cebf386e1034b213b69b0144d7c7
Author: Mike Perry <mikeperry-git at fscked.org>
Date:   Sat Oct 29 01:09:16 2011 -0700

    node sample rate idea.
    
    About to hit to chopping block.
---
 NetworkScanners/BwAuthority/aggregate.py |  283 ++++++++++++++++++++----------
 1 files changed, 194 insertions(+), 89 deletions(-)

diff --git a/NetworkScanners/BwAuthority/aggregate.py b/NetworkScanners/BwAuthority/aggregate.py
index 712a9c5..1462f9b 100755
--- a/NetworkScanners/BwAuthority/aggregate.py
+++ b/NetworkScanners/BwAuthority/aggregate.py
@@ -20,38 +20,24 @@ prev_consensus = {}
 # Hack to kill voting on guards while the network rebalances
 IGNORE_GUARDS = 0
 
-# BETA is the parameter that governs the proportion that we use previous
-# consensus values in creating new bandwitdhs versus descriptor values.
-#    1.0 -> all consensus
-#      0 -> all descriptor,
-#     -1 -> compute from %age of nodes upgraded to 0.2.1.17+
-# Note that guard nodes may not quickly change load in the presence of
-# changes to their advertised bandwidths. To address this and other
-# divergent behaviors, we only apply this value to nodes with ratios < 1.0
-#BETA = -1
-BETA = 0
-
-# GUARD_BETA is the version of BETA used for guard nodes. It needs
-# to be computed similarly to BETA, but using 0.2.1.22+ and 0.2.2.7+
-# because guard nodes were weighted uniformly by clients up until then.
-#GUARD_BETA = -1
-GUARD_BETA = 0
-
-# ALPHA is the parameter that controls the amount measurement
-# values count for. It is used to dampen feedback loops to prevent
-# values from changing too rapidly. Thus it only makes sense to
-# have a value below 1.0 if BETA is nonzero.
-# ALPHA is only applied for nodes with ratio < 1 if BETA is nonzero.
-#ALPHA = 0.25
-ALPHA = 0
-
-# There are cases where our updates may not cause node
-# traffic to quickly relocate, such as Guard-only nodes. These nodes
-# get a special ALPHA when BETA is being used:
-# GUARD_ALPHA is only applied for Guard-only nodes with ratio < 1 if BETA is
-# nonzero.
-#GUARD_ALPHA = 0.1
-GUARD_ALPHA = 0
+# The guard measurement period is based on the client turnover
+# rate for guard nodes
+GUARD_SAMPLE_RATE = 2*7*24*60*60 # 2wks
+
+# PID constants
+# See https://en.wikipedia.org/wiki/PID_controller#Ideal_versus_standard_PID_form
+K_p = 1.0
+
+# We expect to correct steady state error in 4 samples
+T_i = 4
+
+# We can only expect to predict less than one sample into the future, as
+# after 1 sample, clients will have migrated
+# FIXME: This is a function of the consensus time..
+T_d = 0.5
+
+K_i = K_p/T_i
+K_d = K_p*T_d
 
 NODE_CAP = 0.05
 
@@ -112,15 +98,43 @@ class Node:
     self.chosen_fbw = None
     self.sbw_ratio = None
     self.fbw_ratio = None
+    self.pid_error = 0
+    self.prev_error = 0
+    self.prev_voted_at = 0
+    self.pid_error_sum = 0
+    self.derror_dt = 0
     self.ratio = None
     self.new_bw = None
     self.change = None
+    self.bw_idx = 0
     self.strm_bw = []
     self.filt_bw = []
     self.ns_bw = []
     self.desc_bw = []
     self.timestamps = []
 
+  # Derivative of error for pid control
+  def pid_bw(self, bw_idx, dt):
+    return self.ns_bw[bw_idx] \
+             + K_p*self.ns_bw[bw_idx]*self.pid_error \
+             + K_i*self.ns_bw[bw_idx]*self.integral_error(dt) \
+             + K_d*self.ns_bw[bw_idx]*self.d_error_dt(dt)
+
+  # Time-weighted sum of error per unit of time, scaled
+  # to arbitrary units of 'dt' seconds
+  def integral_error(self, dt):
+    return (self.pid_error_sum * GUARD_SAMPLE_RATE) / dt
+
+  # Rate of error per unit of time, scaled to arbitrary 
+  # units of 'dt' seconds
+  def d_error_dt(self, dt):
+    if self.prev_voted_at == 0 or self.prev_error == 0:
+      self.derror_dt = 0
+    else:
+      self.derror_dt = ((dt*self.pid_error - dt*self.prev_error) /    \
+                        (self.chosen_time - self.prev_voted_at))
+    return self.derror_dt
+
   def add_line(self, line):
     if self.idhex and self.idhex != line.idhex:
       raise Exception("Line mismatch")
@@ -206,6 +220,63 @@ class Line:
     self.slice_file = slice_file
     self.timestamp = timestamp
 
+class Vote:
+  def __init__(self, line):
+    # node_id=$DB8C6D8E0D51A42BDDA81A9B8A735B41B2CF95D1 bw=231000 diff=209281 nick=rainbowwarrior measured_at=1319822504
+    self.idhex = re.search("[\s]*node_id=([\S]+)[\s]*", line).group(1)
+    self.nick = re.search("[\s]*nick=([\S]+)[\s]*", line).group(1)
+    self.bw = int(re.search("[\s]*bw=([\S]+)[\s]*", line).group(1))
+    self.measured_at = int(re.search("[\s]*measured_at=([\S]+)[\s]*", line).group(1))
+    try:
+      self.pid_error = float(re.search("[\s]*pid_error=([\S]+)[\s]*", line).group(1))
+      self.pid_error_sum = float(re.search("[\s]*pid_error_sum=([\S]+)[\s]*", line).group(1))
+      self.vote_time = int(re.search("[\s]*vote_time=([\S]+)[\s]*", line).group(1))
+    except:
+      plog("NOTICE", "No previous PID data.")
+      self.pid_error = 0
+      self.pid_error_sum = 0
+      self.vote_time = 0
+
+class VoteSet:
+  def __init__(self, filename):
+    self.vote_map = {}
+    try:
+      f = file(filename, "r")
+      f.readline()
+      for line in f.readlines():
+        vote = Vote(line)
+        self.vote_map[vote.idhex] = vote
+    except IOError:
+      plog("NOTICE", "No previous vote data.")
+      pass
+
+# Misc items we need to get out of the consensus
+class ConsensusJunk:
+  def __init__(self, c):
+    cs_bytes = c.sendAndRecv("GETINFO dir/status-vote/current/consensus\r\n")[0][2]
+    self.bwauth_pid_control = False
+    try:
+      cs_params = re.search("^params ((?:[\S]+=[\d]+[\s]?)+)",
+                                     cs_bytes, re.M).split()
+      for p in cs_params:
+        if p == "bwauthpid=1":
+          self.bwauth_pid_control = True
+    except:
+      plog("NOTICE", "Bw auth PID control disabled due to parse error.")
+      traceback.print_exc()
+
+    self.bw_weights = {}
+    try:
+      bw_weights = re.search("^bandwidth-weights ((?:[\S]+=[\d]+[\s]?)+)",
+                           cs_bytes, re.M).groups(1)[0].split()
+      for b in bw_weights:
+        pair = b.split("=")
+        self.bw_weights[pair[0]] = int(pair[1])/10000.0
+    except:
+      plog("WARN", "No bandwidth weights in consensus!")
+      self.bw_weights["Wgd"] = 0
+      self.bw_weights["Wgg"] = 1.0
+
 def main(argv):
   TorUtil.read_config(argv[1]+"/scanner.1/bwauthority.cfg")
   TorUtil.loglevel = "NOTICE"
@@ -226,38 +297,7 @@ def main(argv):
   got_ns_bw = False
   max_rank = len(ns_list)
 
-  global BETA
-  sorted_rlist = None
-  if BETA == -1:
-    # Compute beta based on the upgrade rate for nsbw obeying routers
-    # (karsten's data show this slightly underestimates client upgrade rate)
-    nsbw_yes = VersionRangeRestriction("0.2.1.17")
-    sorted_rlist = c.read_routers(ns_list)
-
-    nsbw_cnt = 0
-    non_nsbw_cnt = 0
-    for r in sorted_rlist:
-      if nsbw_yes.r_is_ok(r): nsbw_cnt += 1
-      else: non_nsbw_cnt += 1
-    BETA = float(nsbw_cnt)/(nsbw_cnt+non_nsbw_cnt)
-
-  global GUARD_BETA
-  if GUARD_BETA == -1:
-    # Compute GUARD_BETA based on the upgrade rate for nsbw obeying routers
-    # (karsten's data show this slightly underestimates client upgrade rate)
-    guardbw_yes = NodeRestrictionList([VersionRangeRestriction("0.2.1.23"),
-       NotNodeRestriction(VersionRangeRestriction("0.2.2.0", "0.2.2.6"))])
-
-    if not sorted_rlist:
-      sorted_rlist = c.read_routers(ns_list)
-
-    guardbw_cnt = 0
-    non_guardbw_cnt = 0
-    for r in sorted_rlist:
-      if guardbw_yes.r_is_ok(r): guardbw_cnt += 1
-      else: non_guardbw_cnt += 1
-    GUARD_BETA = float(guardbw_cnt)/(guardbw_cnt+non_guardbw_cnt)
-
+  cs_junk = ConsensusJunk(c)
 
   # FIXME: This is poor form.. We should subclass the Networkstatus class
   # instead of just adding members
@@ -301,6 +341,7 @@ def main(argv):
                 # This filter is just to remove REALLY old files
                 if time.time() - timestamp > MAX_AGE:
                   plog("DEBUG", "Skipping old file "+f)
+                  # FIXME: Unlink this file + sql-
                   continue
                 if timestamp > newest_timestamp:
                   newest_timestamp = timestamp
@@ -361,39 +402,99 @@ def main(argv):
   plog("DEBUG", "Network true_strm_avg: "+str(true_strm_avg))
   plog("DEBUG", "Network true_filt_avg: "+str(true_filt_avg))
 
+  prev_votes = None
+  if cs_junk.bwauth_pid_control:
+    prev_votes = VoteSet(argv[-1])
+
+    guard_cnt = 0
+    node_cnt = 0
+    guard_measure_time = 0
+    node_measure_time = 0
+    for n in nodes.itervalues():
+      if n.idhex in prev_votes.vote_map and n.idhex in prev_consensus:
+        if "Guard" in prev_consensus[n.idhex].flags:
+          guard_cnt += 1
+          guard_measure_time += (n.timestamps[n.chosen_fbw] - \
+                                  prev_votes.vote_map[n.idhex].measured_at)
+        else:
+          node_cnt += 1
+          node_measure_time += (n.timestamps[n.chosen_fbw] - \
+                                  prev_votes.vote_map[n.idhex].measured_at)
+
+  plog("INFO", "Average node measurement interval: "+str(node_measure_time/node_cnt))
+  plog("INFO", "Average gaurd measurement interval: "+str(guard_measure_time/guard_cnt))
+
+  # There is a difference between measure period and sample rate.
+  # Measurement period is how fast the bandwidth auths can actually measure
+  # the network. Sample rate is how often we want the PID feedback loop to
+  # run. 
+  NODE_SAMPLE_RATE = node_measure_time/node_cnt
+
   tot_net_bw = 0
   for n in nodes.itervalues():
     n.fbw_ratio = n.filt_bw[n.chosen_fbw]/true_filt_avg
     n.sbw_ratio = n.strm_bw[n.chosen_sbw]/true_strm_avg
-    chosen_bw_idx = 0
     if n.sbw_ratio > n.fbw_ratio:
+      # Does this ever happen?
+      plog("NOTICE", "sbw > fbw for "+n.nick)
       n.ratio = n.sbw_ratio
-      chosen_bw_idx = n.chosen_sbw
+      n.bw_idx = n.chosen_sbw
+      n.pid_error = (n.strm_bw[n.chosen_sbw] - true_strm_avg)/true_strm_avg
     else:
       n.ratio = n.fbw_ratio
-      chosen_bw_idx = n.chosen_fbw
-
-    n.chosen_time = n.timestamps[chosen_bw_idx]
-
-    if n.ratio < 1.0:
-      # XXX: Blend together BETA and GUARD_BETA for Guard+Exit nodes?
-      if GUARD_BETA > 0 and n.idhex in prev_consensus \
-         and ("Guard" in prev_consensus[n.idhex].flags and not "Exit" in \
-                prev_consensus[n.idhex].flags):
-        use_bw = GUARD_BETA*n.ns_bw[chosen_bw_idx] \
-                       + (1.0-GUARD_BETA)*n.desc_bw[chosen_bw_idx]
-        n.new_bw = use_bw*((1.0-GUARD_ALPHA) + GUARD_ALPHA*n.ratio)
-      elif BETA > 0:
-        use_bw = BETA*n.ns_bw[chosen_bw_idx] \
-                    + (1.0-BETA)*n.desc_bw[chosen_bw_idx]
-        n.new_bw = use_bw*((1.0-ALPHA) + ALPHA*n.ratio)
+      n.bw_idx = n.chosen_fbw
+      n.pid_error = (n.filt_bw[n.chosen_fbw] - true_filt_avg)/true_filt_avg
+
+    n.chosen_time = n.timestamps[n.bw_idx]
+
+    # XXX: What happens if we fall in and out of pid control due to ides
+    # uptime issues or whatever
+    if cs_junk.bwauth_pid_control:
+      if n.idhex in prev_votes.vote_map:
+        n.prev_error = prev_votes.vote_map[n.idhex].pid_error
+        n.prev_voted_at = prev_votes.vote_map[n.idhex].vote_time
+        # The integration here uses the measured values, not the vote/sample
+        # values. Therefore, it requires the measure timespans
+        n.pid_error_sum = prev_votes.vote_map[n.idhex].pid_error_sum + \
+               n.pid_error*(n.chosen_time-prev_votes.vote_map[n.idhex].measured_at)/GUARD_SAMPLE_RATE
+
+      # XXX: No reason to slow this down to NODE_SAMPLE_RATE...
+      if n.chosen_time - prev_votes.vote_map[n.idhex].vote_time > NODE_SAMPLE_RATE:
+        # Nodes with the Guard flag will respond
+        # slowly to feedback. It must be applied less often,
+        # and in proportion to the appropriate Wgx weight.
+        if "Guard" in prev_consensus[n.idhex].flags:
+          # Do full feedback if our previous vote > 2.5 weeks old
+          if n.idhex not in prev_votes.vote_map or \
+              n.chosen_time - prev_votes.vote_map[n.idhex].vote_Time > GUARD_SAMPLE_RATE:
+            n.new_bw = n.pid_bw(GUARD_SAMPLE_RATE)
+          else:
+            # XXX: Update any of the n values based on this blend??
+            guard_part = prev_votes.vote_map[n.idhex].bw # Use prev vote
+            if "Exit" in prev_consensus[n.idhex].flags:
+              n.new_bw = (1.0-cs_junk.bw_weights["Wgd"])*n.pid_bw(NODE_SAMPLE_RATE) + \
+                        cs_junk.bw_weights["Wgd"]*guard_part
+            else:
+              n.new_bw = (1.0-cs_junk.bw_weights["Wgg"])*n.pid_bw(NODE_SAMPLE_RATE) + \
+                        cs_junk.bw_weights["Wgg"]*guard_part
+        else:
+          # Everyone else should be pretty instantenous to respond.
+          # Full feedback should be fine for them (we hope)
+          n.new_bw = n.pid_bw(NODE_SAMPLE_RATE)
       else:
-        use_bw = n.desc_bw[chosen_bw_idx]
-        n.new_bw = use_bw*n.ratio
-    else: # Use ALPHA=0, BETA=0 for faster nodes.
-      use_bw = n.desc_bw[chosen_bw_idx]
-      n.new_bw = use_bw*n.ratio
-    n.change = n.new_bw - n.desc_bw[chosen_bw_idx]
+        # XXX: Reset any of the n values???
+        if n.idhex in prev_votes.vote_map:
+          n.new_bw = prev_votes.vote_map[n.idhex].bw
+          n.vote_time = prev_votes.vote_map[n.idhex].vote_time
+        else:
+          # This should not happen.
+          plog("WARN", "No previous vote for recent node "+n.nick+"="+n.idhex)
+          n.new_bw = 0
+          n.ignore = True
+    else: # No PID feedback
+      n.new_bw = n.desc_bw[n.bw_idx]*n.ratio
+
+    n.change = n.new_bw - n.desc_bw[n.bw_idx]
 
     if n.idhex in prev_consensus:
       if prev_consensus[n.idhex].bandwidth != None:
@@ -410,11 +511,15 @@ def main(argv):
 
   # Go through the list and cap them to NODE_CAP
   for n in nodes.itervalues():
+    if n.new_bw >= 0xffffffff*1000:
+      plog("WARN", "Bandwidth of node "+n.nick+"="+n.idhex+" exceeded maxint32: "+str(n.new_bw))
+      n.new_bw = 0xffffffff*1000
     if n.new_bw > tot_net_bw*NODE_CAP:
       plog("INFO", "Clipping extremely fast node "+n.idhex+"="+n.nick+
            " at "+str(100*NODE_CAP)+"% of network capacity ("
            +str(n.new_bw)+"->"+str(int(tot_net_bw*NODE_CAP))+")")
       n.new_bw = int(tot_net_bw*NODE_CAP)
+      n.pid_error_sum = 0 # Don't let unused error accumulate...
 
   # WTF is going on here?
   oldest_timestamp = min(map(lambda n: n.chosen_time,
@@ -459,7 +564,7 @@ def main(argv):
 
   for n in n_print:
     if not n.ignore:
-      out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" diff="+str(int(round(n.change/1000.0,0)))+ " nick="+n.nick+ " measured_at="+str(int(n.chosen_time))+"\n")
+      out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" diff="+str(int(round(n.change/1000.0,0)))+ " nick="+n.nick+ " measured_at="+str(int(n.chosen_time))+" pid_error="+str(n.pid_error)+" pid_error_sum="+str(n.pid_error_sum)+" derror_dt="+str(n.derror_dt)+" vote_time="+str(n.vote_time)+"\n")
   out.close()
  
 if __name__ == "__main__":





More information about the tor-commits mailing list