commit 34afb642f6e1cebf386e1034b213b69b0144d7c7 Author: Mike Perry mikeperry-git@fscked.org Date: Sat Oct 29 01:09:16 2011 -0700
node sample rate idea.
About to hit to chopping block. --- NetworkScanners/BwAuthority/aggregate.py | 283 ++++++++++++++++++++---------- 1 files changed, 194 insertions(+), 89 deletions(-)
diff --git a/NetworkScanners/BwAuthority/aggregate.py b/NetworkScanners/BwAuthority/aggregate.py index 712a9c5..1462f9b 100755 --- a/NetworkScanners/BwAuthority/aggregate.py +++ b/NetworkScanners/BwAuthority/aggregate.py @@ -20,38 +20,24 @@ prev_consensus = {} # Hack to kill voting on guards while the network rebalances IGNORE_GUARDS = 0
-# BETA is the parameter that governs the proportion that we use previous -# consensus values in creating new bandwitdhs versus descriptor values. -# 1.0 -> all consensus -# 0 -> all descriptor, -# -1 -> compute from %age of nodes upgraded to 0.2.1.17+ -# Note that guard nodes may not quickly change load in the presence of -# changes to their advertised bandwidths. To address this and other -# divergent behaviors, we only apply this value to nodes with ratios < 1.0 -#BETA = -1 -BETA = 0 - -# GUARD_BETA is the version of BETA used for guard nodes. It needs -# to be computed similarly to BETA, but using 0.2.1.22+ and 0.2.2.7+ -# because guard nodes were weighted uniformly by clients up until then. -#GUARD_BETA = -1 -GUARD_BETA = 0 - -# ALPHA is the parameter that controls the amount measurement -# values count for. It is used to dampen feedback loops to prevent -# values from changing too rapidly. Thus it only makes sense to -# have a value below 1.0 if BETA is nonzero. -# ALPHA is only applied for nodes with ratio < 1 if BETA is nonzero. -#ALPHA = 0.25 -ALPHA = 0 - -# There are cases where our updates may not cause node -# traffic to quickly relocate, such as Guard-only nodes. These nodes -# get a special ALPHA when BETA is being used: -# GUARD_ALPHA is only applied for Guard-only nodes with ratio < 1 if BETA is -# nonzero. -#GUARD_ALPHA = 0.1 -GUARD_ALPHA = 0 +# The guard measurement period is based on the client turnover +# rate for guard nodes +GUARD_SAMPLE_RATE = 2*7*24*60*60 # 2wks + +# PID constants +# See https://en.wikipedia.org/wiki/PID_controller#Ideal_versus_standard_PID_form +K_p = 1.0 + +# We expect to correct steady state error in 4 samples +T_i = 4 + +# We can only expect to predict less than one sample into the future, as +# after 1 sample, clients will have migrated +# FIXME: This is a function of the consensus time.. +T_d = 0.5 + +K_i = K_p/T_i +K_d = K_p*T_d
NODE_CAP = 0.05
@@ -112,15 +98,43 @@ class Node: self.chosen_fbw = None self.sbw_ratio = None self.fbw_ratio = None + self.pid_error = 0 + self.prev_error = 0 + self.prev_voted_at = 0 + self.pid_error_sum = 0 + self.derror_dt = 0 self.ratio = None self.new_bw = None self.change = None + self.bw_idx = 0 self.strm_bw = [] self.filt_bw = [] self.ns_bw = [] self.desc_bw = [] self.timestamps = []
+ # Derivative of error for pid control + def pid_bw(self, bw_idx, dt): + return self.ns_bw[bw_idx] \ + + K_p*self.ns_bw[bw_idx]*self.pid_error \ + + K_i*self.ns_bw[bw_idx]*self.integral_error(dt) \ + + K_d*self.ns_bw[bw_idx]*self.d_error_dt(dt) + + # Time-weighted sum of error per unit of time, scaled + # to arbitrary units of 'dt' seconds + def integral_error(self, dt): + return (self.pid_error_sum * GUARD_SAMPLE_RATE) / dt + + # Rate of error per unit of time, scaled to arbitrary + # units of 'dt' seconds + def d_error_dt(self, dt): + if self.prev_voted_at == 0 or self.prev_error == 0: + self.derror_dt = 0 + else: + self.derror_dt = ((dt*self.pid_error - dt*self.prev_error) / \ + (self.chosen_time - self.prev_voted_at)) + return self.derror_dt + def add_line(self, line): if self.idhex and self.idhex != line.idhex: raise Exception("Line mismatch") @@ -206,6 +220,63 @@ class Line: self.slice_file = slice_file self.timestamp = timestamp
+class Vote: + def __init__(self, line): + # node_id=$DB8C6D8E0D51A42BDDA81A9B8A735B41B2CF95D1 bw=231000 diff=209281 nick=rainbowwarrior measured_at=1319822504 + self.idhex = re.search("[\s]*node_id=([\S]+)[\s]*", line).group(1) + self.nick = re.search("[\s]*nick=([\S]+)[\s]*", line).group(1) + self.bw = int(re.search("[\s]*bw=([\S]+)[\s]*", line).group(1)) + self.measured_at = int(re.search("[\s]*measured_at=([\S]+)[\s]*", line).group(1)) + try: + self.pid_error = float(re.search("[\s]*pid_error=([\S]+)[\s]*", line).group(1)) + self.pid_error_sum = float(re.search("[\s]*pid_error_sum=([\S]+)[\s]*", line).group(1)) + self.vote_time = int(re.search("[\s]*vote_time=([\S]+)[\s]*", line).group(1)) + except: + plog("NOTICE", "No previous PID data.") + self.pid_error = 0 + self.pid_error_sum = 0 + self.vote_time = 0 + +class VoteSet: + def __init__(self, filename): + self.vote_map = {} + try: + f = file(filename, "r") + f.readline() + for line in f.readlines(): + vote = Vote(line) + self.vote_map[vote.idhex] = vote + except IOError: + plog("NOTICE", "No previous vote data.") + pass + +# Misc items we need to get out of the consensus +class ConsensusJunk: + def __init__(self, c): + cs_bytes = c.sendAndRecv("GETINFO dir/status-vote/current/consensus\r\n")[0][2] + self.bwauth_pid_control = False + try: + cs_params = re.search("^params ((?:[\S]+=[\d]+[\s]?)+)", + cs_bytes, re.M).split() + for p in cs_params: + if p == "bwauthpid=1": + self.bwauth_pid_control = True + except: + plog("NOTICE", "Bw auth PID control disabled due to parse error.") + traceback.print_exc() + + self.bw_weights = {} + try: + bw_weights = re.search("^bandwidth-weights ((?:[\S]+=[\d]+[\s]?)+)", + cs_bytes, re.M).groups(1)[0].split() + for b in bw_weights: + pair = b.split("=") + self.bw_weights[pair[0]] = int(pair[1])/10000.0 + except: + plog("WARN", "No bandwidth weights in consensus!") + self.bw_weights["Wgd"] = 0 + self.bw_weights["Wgg"] = 1.0 + def main(argv): TorUtil.read_config(argv[1]+"/scanner.1/bwauthority.cfg") TorUtil.loglevel = "NOTICE" @@ -226,38 +297,7 @@ def main(argv): got_ns_bw = False max_rank = len(ns_list)
- global BETA - sorted_rlist = None - if BETA == -1: - # Compute beta based on the upgrade rate for nsbw obeying routers - # (karsten's data show this slightly underestimates client upgrade rate) - nsbw_yes = VersionRangeRestriction("0.2.1.17") - sorted_rlist = c.read_routers(ns_list) - - nsbw_cnt = 0 - non_nsbw_cnt = 0 - for r in sorted_rlist: - if nsbw_yes.r_is_ok(r): nsbw_cnt += 1 - else: non_nsbw_cnt += 1 - BETA = float(nsbw_cnt)/(nsbw_cnt+non_nsbw_cnt) - - global GUARD_BETA - if GUARD_BETA == -1: - # Compute GUARD_BETA based on the upgrade rate for nsbw obeying routers - # (karsten's data show this slightly underestimates client upgrade rate) - guardbw_yes = NodeRestrictionList([VersionRangeRestriction("0.2.1.23"), - NotNodeRestriction(VersionRangeRestriction("0.2.2.0", "0.2.2.6"))]) - - if not sorted_rlist: - sorted_rlist = c.read_routers(ns_list) - - guardbw_cnt = 0 - non_guardbw_cnt = 0 - for r in sorted_rlist: - if guardbw_yes.r_is_ok(r): guardbw_cnt += 1 - else: non_guardbw_cnt += 1 - GUARD_BETA = float(guardbw_cnt)/(guardbw_cnt+non_guardbw_cnt) - + cs_junk = ConsensusJunk(c)
# FIXME: This is poor form.. We should subclass the Networkstatus class # instead of just adding members @@ -301,6 +341,7 @@ def main(argv): # This filter is just to remove REALLY old files if time.time() - timestamp > MAX_AGE: plog("DEBUG", "Skipping old file "+f) + # FIXME: Unlink this file + sql- continue if timestamp > newest_timestamp: newest_timestamp = timestamp @@ -361,39 +402,99 @@ def main(argv): plog("DEBUG", "Network true_strm_avg: "+str(true_strm_avg)) plog("DEBUG", "Network true_filt_avg: "+str(true_filt_avg))
+ prev_votes = None + if cs_junk.bwauth_pid_control: + prev_votes = VoteSet(argv[-1]) + + guard_cnt = 0 + node_cnt = 0 + guard_measure_time = 0 + node_measure_time = 0 + for n in nodes.itervalues(): + if n.idhex in prev_votes.vote_map and n.idhex in prev_consensus: + if "Guard" in prev_consensus[n.idhex].flags: + guard_cnt += 1 + guard_measure_time += (n.timestamps[n.chosen_fbw] - \ + prev_votes.vote_map[n.idhex].measured_at) + else: + node_cnt += 1 + node_measure_time += (n.timestamps[n.chosen_fbw] - \ + prev_votes.vote_map[n.idhex].measured_at) + + plog("INFO", "Average node measurement interval: "+str(node_measure_time/node_cnt)) + plog("INFO", "Average gaurd measurement interval: "+str(guard_measure_time/guard_cnt)) + + # There is a difference between measure period and sample rate. + # Measurement period is how fast the bandwidth auths can actually measure + # the network. Sample rate is how often we want the PID feedback loop to + # run. + NODE_SAMPLE_RATE = node_measure_time/node_cnt + tot_net_bw = 0 for n in nodes.itervalues(): n.fbw_ratio = n.filt_bw[n.chosen_fbw]/true_filt_avg n.sbw_ratio = n.strm_bw[n.chosen_sbw]/true_strm_avg - chosen_bw_idx = 0 if n.sbw_ratio > n.fbw_ratio: + # Does this ever happen? + plog("NOTICE", "sbw > fbw for "+n.nick) n.ratio = n.sbw_ratio - chosen_bw_idx = n.chosen_sbw + n.bw_idx = n.chosen_sbw + n.pid_error = (n.strm_bw[n.chosen_sbw] - true_strm_avg)/true_strm_avg else: n.ratio = n.fbw_ratio - chosen_bw_idx = n.chosen_fbw - - n.chosen_time = n.timestamps[chosen_bw_idx] - - if n.ratio < 1.0: - # XXX: Blend together BETA and GUARD_BETA for Guard+Exit nodes? - if GUARD_BETA > 0 and n.idhex in prev_consensus \ - and ("Guard" in prev_consensus[n.idhex].flags and not "Exit" in \ - prev_consensus[n.idhex].flags): - use_bw = GUARD_BETA*n.ns_bw[chosen_bw_idx] \ - + (1.0-GUARD_BETA)*n.desc_bw[chosen_bw_idx] - n.new_bw = use_bw*((1.0-GUARD_ALPHA) + GUARD_ALPHA*n.ratio) - elif BETA > 0: - use_bw = BETA*n.ns_bw[chosen_bw_idx] \ - + (1.0-BETA)*n.desc_bw[chosen_bw_idx] - n.new_bw = use_bw*((1.0-ALPHA) + ALPHA*n.ratio) + n.bw_idx = n.chosen_fbw + n.pid_error = (n.filt_bw[n.chosen_fbw] - true_filt_avg)/true_filt_avg + + n.chosen_time = n.timestamps[n.bw_idx] + + # XXX: What happens if we fall in and out of pid control due to ides + # uptime issues or whatever + if cs_junk.bwauth_pid_control: + if n.idhex in prev_votes.vote_map: + n.prev_error = prev_votes.vote_map[n.idhex].pid_error + n.prev_voted_at = prev_votes.vote_map[n.idhex].vote_time + # The integration here uses the measured values, not the vote/sample + # values. Therefore, it requires the measure timespans + n.pid_error_sum = prev_votes.vote_map[n.idhex].pid_error_sum + \ + n.pid_error*(n.chosen_time-prev_votes.vote_map[n.idhex].measured_at)/GUARD_SAMPLE_RATE + + # XXX: No reason to slow this down to NODE_SAMPLE_RATE... + if n.chosen_time - prev_votes.vote_map[n.idhex].vote_time > NODE_SAMPLE_RATE: + # Nodes with the Guard flag will respond + # slowly to feedback. It must be applied less often, + # and in proportion to the appropriate Wgx weight. + if "Guard" in prev_consensus[n.idhex].flags: + # Do full feedback if our previous vote > 2.5 weeks old + if n.idhex not in prev_votes.vote_map or \ + n.chosen_time - prev_votes.vote_map[n.idhex].vote_Time > GUARD_SAMPLE_RATE: + n.new_bw = n.pid_bw(GUARD_SAMPLE_RATE) + else: + # XXX: Update any of the n values based on this blend?? + guard_part = prev_votes.vote_map[n.idhex].bw # Use prev vote + if "Exit" in prev_consensus[n.idhex].flags: + n.new_bw = (1.0-cs_junk.bw_weights["Wgd"])*n.pid_bw(NODE_SAMPLE_RATE) + \ + cs_junk.bw_weights["Wgd"]*guard_part + else: + n.new_bw = (1.0-cs_junk.bw_weights["Wgg"])*n.pid_bw(NODE_SAMPLE_RATE) + \ + cs_junk.bw_weights["Wgg"]*guard_part + else: + # Everyone else should be pretty instantenous to respond. + # Full feedback should be fine for them (we hope) + n.new_bw = n.pid_bw(NODE_SAMPLE_RATE) else: - use_bw = n.desc_bw[chosen_bw_idx] - n.new_bw = use_bw*n.ratio - else: # Use ALPHA=0, BETA=0 for faster nodes. - use_bw = n.desc_bw[chosen_bw_idx] - n.new_bw = use_bw*n.ratio - n.change = n.new_bw - n.desc_bw[chosen_bw_idx] + # XXX: Reset any of the n values??? + if n.idhex in prev_votes.vote_map: + n.new_bw = prev_votes.vote_map[n.idhex].bw + n.vote_time = prev_votes.vote_map[n.idhex].vote_time + else: + # This should not happen. + plog("WARN", "No previous vote for recent node "+n.nick+"="+n.idhex) + n.new_bw = 0 + n.ignore = True + else: # No PID feedback + n.new_bw = n.desc_bw[n.bw_idx]*n.ratio + + n.change = n.new_bw - n.desc_bw[n.bw_idx]
if n.idhex in prev_consensus: if prev_consensus[n.idhex].bandwidth != None: @@ -410,11 +511,15 @@ def main(argv):
# Go through the list and cap them to NODE_CAP for n in nodes.itervalues(): + if n.new_bw >= 0xffffffff*1000: + plog("WARN", "Bandwidth of node "+n.nick+"="+n.idhex+" exceeded maxint32: "+str(n.new_bw)) + n.new_bw = 0xffffffff*1000 if n.new_bw > tot_net_bw*NODE_CAP: plog("INFO", "Clipping extremely fast node "+n.idhex+"="+n.nick+ " at "+str(100*NODE_CAP)+"% of network capacity (" +str(n.new_bw)+"->"+str(int(tot_net_bw*NODE_CAP))+")") n.new_bw = int(tot_net_bw*NODE_CAP) + n.pid_error_sum = 0 # Don't let unused error accumulate...
# WTF is going on here? oldest_timestamp = min(map(lambda n: n.chosen_time, @@ -459,7 +564,7 @@ def main(argv):
for n in n_print: if not n.ignore: - out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" diff="+str(int(round(n.change/1000.0,0)))+ " nick="+n.nick+ " measured_at="+str(int(n.chosen_time))+"\n") + out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" diff="+str(int(round(n.change/1000.0,0)))+ " nick="+n.nick+ " measured_at="+str(int(n.chosen_time))+" pid_error="+str(n.pid_error)+" pid_error_sum="+str(n.pid_error_sum)+" derror_dt="+str(n.derror_dt)+" vote_time="+str(n.vote_time)+"\n") out.close()
if __name__ == "__main__":