commit efe46c117daaeeb484948d963733b129489aaea3 Author: Mike Perry mikeperry-git@fscked.org Date: Sat Oct 29 01:52:07 2011 -0700
Refactor and remove dead/useless code.
Document some future concerns. --- NetworkScanners/BwAuthority/aggregate.py | 144 +++++++++++++++--------------- 1 files changed, 73 insertions(+), 71 deletions(-)
diff --git a/NetworkScanners/BwAuthority/aggregate.py b/NetworkScanners/BwAuthority/aggregate.py index 1462f9b..ef53c61 100755 --- a/NetworkScanners/BwAuthority/aggregate.py +++ b/NetworkScanners/BwAuthority/aggregate.py @@ -28,12 +28,13 @@ GUARD_SAMPLE_RATE = 2*7*24*60*60 # 2wks # See https://en.wikipedia.org/wiki/PID_controller#Ideal_versus_standard_PID_form K_p = 1.0
-# We expect to correct steady state error in 4 samples -T_i = 4 +# We expect to correct steady state error in 5 samples (guess) +T_i = 5.0
# We can only expect to predict less than one sample into the future, as # after 1 sample, clients will have migrated -# FIXME: This is a function of the consensus time.. +# FIXME: Our prediction ability is a function of the consensus time +# vs measurement rate T_d = 0.5
K_i = K_p/T_i @@ -100,7 +101,7 @@ class Node: self.fbw_ratio = None self.pid_error = 0 self.prev_error = 0 - self.prev_voted_at = 0 + self.prev_measured_at = 0 self.pid_error_sum = 0 self.derror_dt = 0 self.ratio = None @@ -113,26 +114,36 @@ class Node: self.desc_bw = [] self.timestamps = []
+ def revert_to_vote(self, vote): + self.new_bw = vote.bw + self.pid_error = vote.pid_error + self.chosen_time = vote.measured_at + # Derivative of error for pid control - def pid_bw(self, bw_idx, dt): - return self.ns_bw[bw_idx] \ - + K_p*self.ns_bw[bw_idx]*self.pid_error \ - + K_i*self.ns_bw[bw_idx]*self.integral_error(dt) \ - + K_d*self.ns_bw[bw_idx]*self.d_error_dt(dt) - - # Time-weighted sum of error per unit of time, scaled - # to arbitrary units of 'dt' seconds - def integral_error(self, dt): - return (self.pid_error_sum * GUARD_SAMPLE_RATE) / dt - - # Rate of error per unit of time, scaled to arbitrary - # units of 'dt' seconds - def d_error_dt(self, dt): - if self.prev_voted_at == 0 or self.prev_error == 0: + def pid_bw(self, prev_vote, kp): + self.prev_error = prev_vote.pid_error + self.prev_measured_at = prev_vote.measured_at + # We decay the interval by 1/T_i each round to keep it bounded. + # This is non-standard + self.pid_error_sum = prev_vote.pid_error_sum*(1 - 1.0/T_i) + self.pid_error + + return self.ns_bw[self.bw_idx] \ + + kp*(self.ns_bw[self.bw_idx]*self.pid_error \ + + self.ns_bw[self.bw_idx]*self.integral_error()/T_i \ + + self.ns_bw[self.bw_idx]*self.d_error_dt()*T_d) + + # Time-weighted sum of error per unit of time (measurement sample) + def integral_error(self): + if self.prev_error == 0: + return 0 + return self.pid_error_sum + + # Rate of change in error from the last measurement sample + def d_error_dt(self): + if self.prev_measured_at == 0 or self.prev_error == 0: self.derror_dt = 0 else: - self.derror_dt = ((dt*self.pid_error - dt*self.prev_error) / \ - (self.chosen_time - self.prev_voted_at)) + self.derror_dt = self.pid_error - self.prev_error return self.derror_dt
def add_line(self, line): @@ -144,7 +155,7 @@ class Node: or self.node_data[line.slice_file].timestamp < line.timestamp: self.node_data[line.slice_file] = NodeData(line.timestamp)
- # FIXME: This is kinda nutty. Can we simplify? For instance, + # TODO: This is kinda nutty. Can we simplify? For instance, # do these really need to be lists inside the nd? nd = self.node_data[line.slice_file] nd.strm_bw.append(line.strm_bw) @@ -230,12 +241,10 @@ class Vote: try: self.pid_error = float(re.search("[\s]*pid_error=([\S]+)[\s]*", line).group(1)) self.pid_error_sum = float(re.search("[\s]*pid_error_sum=([\S]+)[\s]*", line).group(1)) - self.vote_time = int(re.search("[\s]*vote_time=([\S]+)[\s]*", line).group(1)) except: plog("NOTICE", "No previous PID data.") self.pid_error = 0 self.pid_error_sum = 0 - self.vote_time = 0
class VoteSet: def __init__(self, filename): @@ -299,7 +308,7 @@ def main(argv):
cs_junk = ConsensusJunk(c)
- # FIXME: This is poor form.. We should subclass the Networkstatus class + # TODO: This is poor form.. We should subclass the Networkstatus class # instead of just adding members for i in xrange(max_rank): n = ns_list[i] @@ -421,14 +430,12 @@ def main(argv): node_measure_time += (n.timestamps[n.chosen_fbw] - \ prev_votes.vote_map[n.idhex].measured_at)
- plog("INFO", "Average node measurement interval: "+str(node_measure_time/node_cnt)) - plog("INFO", "Average gaurd measurement interval: "+str(guard_measure_time/guard_cnt)) - # There is a difference between measure period and sample rate. # Measurement period is how fast the bandwidth auths can actually measure # the network. Sample rate is how often we want the PID feedback loop to # run. - NODE_SAMPLE_RATE = node_measure_time/node_cnt + plog("INFO", "Average node measurement interval: "+str(node_measure_time/node_cnt)) + plog("INFO", "Average gaurd measurement interval: "+str(guard_measure_time/guard_cnt))
tot_net_bw = 0 for n in nodes.itervalues(): @@ -447,51 +454,46 @@ def main(argv):
n.chosen_time = n.timestamps[n.bw_idx]
- # XXX: What happens if we fall in and out of pid control due to ides - # uptime issues or whatever if cs_junk.bwauth_pid_control: if n.idhex in prev_votes.vote_map: - n.prev_error = prev_votes.vote_map[n.idhex].pid_error - n.prev_voted_at = prev_votes.vote_map[n.idhex].vote_time - # The integration here uses the measured values, not the vote/sample - # values. Therefore, it requires the measure timespans - n.pid_error_sum = prev_votes.vote_map[n.idhex].pid_error_sum + \ - n.pid_error*(n.chosen_time-prev_votes.vote_map[n.idhex].measured_at)/GUARD_SAMPLE_RATE - - # XXX: No reason to slow this down to NODE_SAMPLE_RATE... - if n.chosen_time - prev_votes.vote_map[n.idhex].vote_time > NODE_SAMPLE_RATE: - # Nodes with the Guard flag will respond - # slowly to feedback. It must be applied less often, - # and in proportion to the appropriate Wgx weight. - if "Guard" in prev_consensus[n.idhex].flags: - # Do full feedback if our previous vote > 2.5 weeks old - if n.idhex not in prev_votes.vote_map or \ - n.chosen_time - prev_votes.vote_map[n.idhex].vote_Time > GUARD_SAMPLE_RATE: - n.new_bw = n.pid_bw(GUARD_SAMPLE_RATE) + # If there is a new sample, let's use it for all but guards + if n.chosen_time > prev_votes.vote_map[n.idhex].measured_at: + # Nodes with the Guard flag will respond slowly to feedback, + # so they should be sampled less often, and in proportion to + # the appropriate Wgx weight. + if n.idhex in prev_consensus and \ + ("Guard" in prev_consensus[n.idhex].flags \ + and "Exit" not in prev_consensus[n.idhex].flags): + # Do full feedback if our previous vote > 2.5 weeks old + if n.idhex not in prev_votes.vote_map or \ + n.chosen_time - prev_votes.vote_map[n.idhex].measured_at > GUARD_SAMPLE_RATE: + n.new_bw = n.pid_bw(prev_votes.vote_map[n.idhex], K_p) + else: + n.revert_to_vote(prev_votes.vote_map[n.idhex]) else: - # XXX: Update any of the n values based on this blend?? - guard_part = prev_votes.vote_map[n.idhex].bw # Use prev vote - if "Exit" in prev_consensus[n.idhex].flags: - n.new_bw = (1.0-cs_junk.bw_weights["Wgd"])*n.pid_bw(NODE_SAMPLE_RATE) + \ - cs_junk.bw_weights["Wgd"]*guard_part + # Everyone else should be pretty instantenous to respond. + # Full feedback should be fine for them (we hope), + # except for Guard+Exits, we want to dampen just a little + # bit for them. Wgd seems a good choice, but might not be exact. + # We really want to magically combine Wgd and something that + # represents the client migration rate for Guards.. But who + # knows how to represent that and still KISS? + if n.idhex in prev_consensus and \ + ("Guard" in prev_consensus[n.idhex].flags \ + and "Exit" not in prev_consensus[n.idhex].flags): + n.new_bw = n.pid_bw(prev_votes.vote_map[n.idhex], 1.0-cs_junk.bw_weights["Wgd"]) else: - n.new_bw = (1.0-cs_junk.bw_weights["Wgg"])*n.pid_bw(NODE_SAMPLE_RATE) + \ - cs_junk.bw_weights["Wgg"]*guard_part - else: - # Everyone else should be pretty instantenous to respond. - # Full feedback should be fine for them (we hope) - n.new_bw = n.pid_bw(NODE_SAMPLE_RATE) - else: - # XXX: Reset any of the n values??? - if n.idhex in prev_votes.vote_map: - n.new_bw = prev_votes.vote_map[n.idhex].bw - n.vote_time = prev_votes.vote_map[n.idhex].vote_time + n.new_bw = n.pid_bw(prev_votes.vote_map[n.idhex], K_p) else: - # This should not happen. - plog("WARN", "No previous vote for recent node "+n.nick+"="+n.idhex) - n.new_bw = 0 - n.ignore = True + # Reset values. Don't vote/sample this measurement round. + n.revert_to_vote(prev_votes.vote_map[n.idhex]) + else: # No prev vote, pure consensus feedback this round + n.new_bw = n.ns_bw[n.bw_idx] + K_p*n.ns_bw[n.bw_idx]*n.pid_error + n.pid_error_sum = n.pid_error + plog("INFO", "No prev vote for node "+n.nick+": Consensus feedback") else: # No PID feedback + n.pid_error = 0 + n.pid_error_sum = 0 n.new_bw = n.desc_bw[n.bw_idx]*n.ratio
n.change = n.new_bw - n.desc_bw[n.bw_idx] @@ -561,12 +563,12 @@ def main(argv): out = file(argv[-1], "w") out.write(str(scan_age)+"\n")
- + # FIXME: Split out debugging data for n in n_print: if not n.ignore: - out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" diff="+str(int(round(n.change/1000.0,0)))+ " nick="+n.nick+ " measured_at="+str(int(n.chosen_time))+" pid_error="+str(n.pid_error)+" pid_error_sum="+str(n.pid_error_sum)+" derror_dt="+str(n.derror_dt)+" vote_time="+str(n.vote_time)+"\n") + out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" nick="+n.nick+ " measured_at="+str(int(n.chosen_time))+" pid_error="+str(n.pid_error)+" pid_error_sum="+str(n.pid_error_sum)+" derror_dt="+str(n.derror_dt)+"\n") out.close() - + if __name__ == "__main__": try: main(sys.argv)