[torflow/master] Move more parameters to consensus.

commit 13aa3b4268a98a07e76e8ba9cb8cfccac39ff2c0 Author: Mike Perry <mikeperry-git@fscked.org> Date: Sun Nov 13 17:45:01 2011 -0800 Move more parameters to consensus. --- NetworkScanners/BwAuthority/aggregate.py | 103 +++++++++++++++++++++++------- 1 files changed, 79 insertions(+), 24 deletions(-) diff --git a/NetworkScanners/BwAuthority/aggregate.py b/NetworkScanners/BwAuthority/aggregate.py index 1746256..9bed1d0 100755 --- a/NetworkScanners/BwAuthority/aggregate.py +++ b/NetworkScanners/BwAuthority/aggregate.py @@ -23,22 +23,25 @@ IGNORE_GUARDS = 0 # rate for guard nodes GUARD_SAMPLE_RATE = 2*7*24*60*60 # 2wks -# PID constants -# See https://en.wikipedia.org/wiki/PID_controller#Ideal_versus_standard_PID_form +# PID constant defaults. May be overridden by consensus +# https://en.wikipedia.org/wiki/PID_controller#Ideal_versus_standard_PID_form K_p = 1.0 # We expect to correct steady state error in 5 samples (guess) T_i = 5.0 +# T_i_decay is a weight factor to govern how fast integral sums +# decay. For the values of T_i that we care about, T_i_decay represents +# the fraction of integral sum that is eliminated after T_i sample rounds. +# This decay is non-standard, but we do it to avoid overflow +T_i_decay = 0.5 + # We can only expect to predict less than one sample into the future, as # after 1 sample, clients will have migrated -# FIXME: Our prediction ability is a function of the consensus time +# FIXME: Our prediction ability is a function of the consensus uptake time # vs measurement rate T_d = 0.5 -K_i = K_p/T_i -K_d = K_p*T_d - NODE_CAP = 0.05 MIN_REPORT = 60 # Percent of the network we must measure before reporting @@ -94,6 +97,7 @@ class Node: self.circ_fail_rate = 0 self.strm_fail_rate = 0 + # FIXME: Need to set pid_error_sum.. It is getting lost when we don't vote def revert_to_vote(self, vote): self.new_bw = vote.bw*1000 self.pid_bw = vote.pid_bw @@ -101,17 +105,17 @@ class Node: self.measured_at = vote.measured_at # Derivative of error for pid control - def get_pid_bw(self, prev_vote, kp): + def get_pid_bw(self, prev_vote, kp, ki, kd, kidecay): self.prev_error = prev_vote.pid_error self.prev_measured_at = prev_vote.measured_at - # We decay the interval by 1/T_i each round to keep it bounded. - # This is non-standard - self.pid_error_sum = prev_vote.pid_error_sum*(1 - 1.0/T_i) + self.pid_error + # We decay the interval each round to keep it bounded. + # This decay is non-standard. We do it to avoid overflow + self.pid_error_sum = prev_vote.pid_error_sum*kidecay + self.pid_error self.pid_bw = self.ns_bw \ - + kp*(self.ns_bw*self.pid_error \ - + self.ns_bw*self.integral_error()/T_i \ - + self.ns_bw*self.d_error_dt()*T_d) + + kp*self.ns_bw*self.pid_error \ + + ki*self.ns_bw*self.integral_error() \ + + kd*self.ns_bw*self.d_error_dt() return self.pid_bw # Time-weighted sum of error per unit of time (measurement sample) @@ -139,8 +143,7 @@ class Node: self.filt_bw = line.filt_bw self.ns_bw = line.ns_bw self.desc_bw = line.desc_bw - # XXX: Temporary test - self.circ_fail_rate = 0.0 #line.circ_fail_rate + self.circ_fail_rate = line.circ_fail_rate self.strm_fail_rate = line.strm_fail_rate class Line: @@ -197,16 +200,54 @@ class ConsensusJunk: def __init__(self, c): cs_bytes = c.sendAndRecv("GETINFO dir/status-vote/current/consensus\r\n")[0][2] self.bwauth_pid_control = False + self.use_circ_fails = False + self.use_best_ratio = False + + self.K_p = K_p + self.T_i = T_i + self.T_d = T_d + self.T_i_decay = T_i_decay + try: cs_params = re.search("^params ((?:[\S]+=[\d]+[\s]?)+)", cs_bytes, re.M).group(1).split() for p in cs_params: if p == "bwauthpid=1": self.bwauth_pid_control = True + elif p == "bwauthcircs=1": + self.use_circ_fails = True + plog("INFO", "Counting circuit failures") + elif p == "bwauthbestratio=1": + self.use_best_ratio = True + plog("INFO", "Choosing larger of sbw vs fbw") + elif p.startswith("bwauthkp="): + self.K_p = int(p.split("=")[1])/10000.0 + plog("INFO", "Got K_p=%f from consensus." % self.K_p) + elif p.startswith("bwauthti="): + self.T_i = (int(p.split("=")[1])/10000.0) + plog("INFO", "Got T_i=%f from consensus." % self.T_i) + elif p.startswith("bwauthtd="): + self.T_d = (int(p.split("=")[1])/10000.0) + plog("INFO", "Got T_d=%f from consensus." % self.T_d) + elif p.startswith("bwauthtidecay="): + self.T_i_decay = (int(p.split("=")[1])/10000.0) + plog("INFO", "Got T_i_decay=%f from consensus." % self.T_i_decay) except: plog("NOTICE", "Bw auth PID control disabled due to parse error.") traceback.print_exc() + if self.T_i == 0: + self.K_i = 0 + self.K_i_decay = 0 + else: + self.K_i = self.K_p/self.T_i + self.K_i_decay = (1.0-self.T_i_decay/self.T_i) + + self.K_d = self.K_p*self.T_d + + plog("INFO", "Got K_p=%f K_i=%f K_d=%f K_i_decay=%f" % + (self.K_p, self.K_i, self.K_d, self.K_i_decay)) + self.bw_weights = {} try: bw_weights = re.search("^bandwidth-weights ((?:[\S]+=[\d]+[\s]?)+)", @@ -355,6 +396,11 @@ def main(argv): plog("NOTICE", "No scan results yet.") sys.exit(1) + if not cs_junk.use_circ_fails: + plog("INFO", "Ignoring circuit failures") + for n in nodes.itervalues(): + n.circ_fail_rate = 0.0 + if cs_junk.bwauth_pid_control: # Penalize nodes for circuit failure: it indicates CPU pressure # TODO: Potentially penalize for stream failure, if we run into @@ -406,10 +452,7 @@ def main(argv): if cs_junk.bwauth_pid_control: # Penalize nodes for circ failure rate - # n.pid_error = (n.filt_bw*(1.0-n.circ_fail_rate) - true_filt_avg)/true_filt_avg - - # FIXME: For now, let's try this larger-ratio thing.. - if n.sbw_ratio > n.fbw_ratio: + if cs_junk.use_best_ratio and n.sbw_ratio > n.fbw_ratio: n.pid_error = (n.strm_bw*(1.0-n.circ_fail_rate) - true_strm_avg)/true_strm_avg else: n.pid_error = (n.filt_bw*(1.0-n.circ_fail_rate) - true_filt_avg)/true_filt_avg @@ -426,9 +469,14 @@ def main(argv): # Do full feedback if our previous vote > 2.5 weeks old if n.idhex not in prev_votes.vote_map or \ n.measured_at - prev_votes.vote_map[n.idhex].measured_at > GUARD_SAMPLE_RATE: - n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex], K_p) + n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex], + cs_junk.K_p, + cs_junk.K_i, + cs_junk.K_d, + cs_junk.K_i_decay) else: pid_error = n.pid_error + # FIXME: We possibly lose the pid_error_sum here n.revert_to_vote(prev_votes.vote_map[n.idhex]) # Don't use feedback here, but we might as well use our # new measurement against the previous vote. @@ -436,11 +484,11 @@ def main(argv): # This should no longer happen plog("NOTICE", "Zero bw for Guard node "+n.nick+"="+n.idhex) n.new_bw = prev_votes.vote_map[n.idhex].bw + \ - K_p*prev_votes.vote_map[n.idhex].bw*pid_error + cs_junk.K_p*prev_votes.vote_map[n.idhex].bw*pid_error n.pid_bw = n.new_bw else: n.new_bw = prev_votes.vote_map[n.idhex].pid_bw + \ - K_p*prev_votes.vote_map[n.idhex].pid_bw*pid_error + cs_junk.K_p*prev_votes.vote_map[n.idhex].pid_bw*pid_error else: # Everyone else should be pretty instantenous to respond. # Full feedback should be fine for them (we hope), @@ -453,9 +501,16 @@ def main(argv): ("Guard" in prev_consensus[n.idhex].flags \ and "Exit" in prev_consensus[n.idhex].flags): n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex], - K_p*(1.0-cs_junk.bw_weights["Wgd"])) + cs_junk.K_p*(1.0-cs_junk.bw_weights["Wgd"]), + cs_junk.K_i, + cs_junk.K_d, + cs_junk.K_i_decay) else: - n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex], K_p) + n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex], + cs_junk.K_p, + cs_junk.K_i, + cs_junk.K_d, + cs_junk.K_i_decay) else: # Reset values. Don't vote/sample this measurement round. n.revert_to_vote(prev_votes.vote_map[n.idhex])
participants (1)
-
mikeperry@torproject.org