[torflow/master] Differentiate measured and update timestamps.

commit e87c3e379992af11d23ed7b916ad65b19e6eda56 Author: Mike Perry <mikeperry-git@fscked.org> Date: Sun Nov 13 18:25:36 2011 -0800 Differentiate measured and update timestamps. This helps us to track Guard node feedback/non-feedback better. We may also end up using this to determine T_d and possibly T_i. Also improve some log messages. --- NetworkScanners/BwAuthority/aggregate.py | 59 ++++++++++++++++++++---------- 1 files changed, 40 insertions(+), 19 deletions(-) diff --git a/NetworkScanners/BwAuthority/aggregate.py b/NetworkScanners/BwAuthority/aggregate.py index 03822de..ca202bb 100755 --- a/NetworkScanners/BwAuthority/aggregate.py +++ b/NetworkScanners/BwAuthority/aggregate.py @@ -47,9 +47,11 @@ NODE_CAP = 0.05 MIN_REPORT = 60 # Percent of the network we must measure before reporting # Keep most measurements in consideration. The code below chooses -# the most recent one. 15 days is just to stop us from choking up +# the most recent one. 28 days is just to stop us from choking up # all the CPU once these things run for a year or so. -MAX_AGE = 60*60*24*15 +# Note that the Guard measurement interval of 2 weeks means that this +# value can't get much below that. +MAX_AGE = 2*GUARD_SAMPLE_RATE # If the resultant scan file is older than 1.5 days, something is wrong MAX_SCAN_AGE = 60*60*24*1.5 @@ -81,7 +83,6 @@ class Node: self.pid_bw = 0 self.pid_error = 0 self.prev_error = 0 - self.prev_measured_at = 0 self.pid_error_sum = 0 self.pid_delta = 0 self.ratio = None @@ -96,6 +97,7 @@ class Node: self.desc_bw = 0 self.circ_fail_rate = 0 self.strm_fail_rate = 0 + self.updated_at = 0 def revert_to_vote(self, vote): self.new_bw = vote.bw*1000 @@ -108,7 +110,6 @@ class Node: # Derivative of error for pid control def get_pid_bw(self, prev_vote, kp, ki, kd, kidecay): self.prev_error = prev_vote.pid_error - self.prev_measured_at = prev_vote.measured_at # We decay the interval each round to keep it bounded. # This decay is non-standard. We do it to avoid overflow self.pid_error_sum = prev_vote.pid_error_sum*kidecay + self.pid_error @@ -127,7 +128,7 @@ class Node: # Rate of change in error from the last measurement sample def d_error_dt(self): - if self.prev_measured_at == 0 or self.prev_error == 0: + if self.prev_error == 0: self.pid_delta = 0 else: self.pid_delta = self.pid_error - self.prev_error @@ -139,7 +140,7 @@ class Node: self.idhex = line.idhex self.nick = line.nick if line.measured_at > self.measured_at: - self.measured_at = line.measured_at + self.measured_at = self.updated_at = line.measured_at self.strm_bw = line.strm_bw self.filt_bw = line.filt_bw self.ns_bw = line.ns_bw @@ -185,6 +186,12 @@ class Vote: self.pid_error = 0 self.pid_delta = 0 self.pid_error_sum = 0 + try: + self.updated_at = int(re.search("[\s]*updated_at=([\S]+)[\s]*", line).group(1)) + except: + plog("INFO", "No updated_at field for "+self.nick+"="+self.idhex) + self.updated_at = self.measured_at + class VoteSet: def __init__(self, filename): @@ -434,19 +441,23 @@ def main(argv): if n.idhex in prev_votes.vote_map and n.idhex in prev_consensus: if "Guard" in prev_consensus[n.idhex].flags and \ "Exit" not in prev_consensus[n.idhex].flags: - guard_cnt += 1 - guard_measure_time += (n.measured_at - \ - prev_votes.vote_map[n.idhex].measured_at) + if n.measured_at != prev_votes.vote_map[n.idhex].measured_at: + guard_cnt += 1 + guard_measure_time += (n.measured_at - \ + prev_votes.vote_map[n.idhex].measured_at) else: - node_cnt += 1 - node_measure_time += (n.measured_at - \ - prev_votes.vote_map[n.idhex].measured_at) + if n.updated_at != prev_votes.vote_map[n.idhex].updated_at: + node_cnt += 1 + node_measure_time += (n.updated_at - \ + prev_votes.vote_map[n.idhex].updated_at) + # TODO: We may want to try to use this info to autocompute T_d and + # maybe T_i? if node_cnt > 0: - plog("INFO", "Average node measurement interval: "+str(node_measure_time/node_cnt)) + plog("INFO", "Avg of "+str(node_cnt)+" node update intervals: "+str((node_measure_time/node_cnt)/3600.0)) if guard_cnt > 0: - plog("INFO", "Average gaurd measurement interval: "+str(guard_measure_time/guard_cnt)) + plog("INFO", "Avg of "+str(guard_cnt)+" guard measurement interval: "+str((guard_measure_time/guard_cnt)/3600.0)) tot_net_bw = 0 for n in nodes.itervalues(): @@ -484,6 +495,7 @@ def main(argv): # new measurement against the previous vote. n.new_bw = prev_votes.vote_map[n.idhex].pid_bw + \ cs_junk.K_p*prev_votes.vote_map[n.idhex].pid_bw*pid_error + else: # Everyone else should be pretty instantenous to respond. # Full feedback should be fine for them (we hope), @@ -510,7 +522,7 @@ def main(argv): # Reset values. Don't vote/sample this measurement round. n.revert_to_vote(prev_votes.vote_map[n.idhex]) else: # No prev vote, pure consensus feedback this round - n.new_bw = n.ns_bw + K_p*n.ns_bw*n.pid_error + n.new_bw = n.ns_bw + cs_junk.K_p*n.ns_bw*n.pid_error n.pid_error_sum = n.pid_error n.pid_bw = n.new_bw plog("INFO", "No prev vote for node "+n.nick+": Consensus feedback") @@ -546,6 +558,10 @@ def main(argv): if n.new_bw >= 0xffffffff*1000: plog("WARN", "Bandwidth of node "+n.nick+"="+n.idhex+" exceeded maxint32: "+str(n.new_bw)) n.new_bw = 0xffffffff*1000 + if cs_junk.T_i > 0 and math.fabs(n.pid_error_sum) > \ + math.fabs(2*cs_junk.T_i*n.pid_error/cs_junk.T_i_decay): + plog("NOTICE", "Large pid_error_sum for node "+n.idhex+"="+n.nick+": "+ + str(n.pid_error_sum)+" vs "+str(n.pid_error)) if n.new_bw > tot_net_bw*NODE_CAP: plog("INFO", "Clipping extremely fast node "+n.idhex+"="+n.nick+ " at "+str(100*NODE_CAP)+"% of network capacity ("+ @@ -561,11 +577,15 @@ def main(argv): plog("INFO", "New node "+n.idhex+"="+n.nick+" has bandwidth < 0: "+str(n.new_bw)) n.new_bw = 1 - # WTF is going on here? - oldest_timestamp = min(map(lambda n: n.measured_at, + oldest_measured = min(map(lambda n: n.measured_at, + filter(lambda n: n.idhex in prev_consensus, + nodes.itervalues()))) + plog("INFO", "Oldest measured node: "+time.ctime(oldest_measured)) + + oldest_updated = min(map(lambda n: n.updated_at, filter(lambda n: n.idhex in prev_consensus, nodes.itervalues()))) - plog("INFO", "Oldest measured node: "+time.ctime(oldest_timestamp)) + plog("INFO", "Oldest updated node: "+time.ctime(oldest_updated)) missed_nodes = 0.0 for n in prev_consensus.itervalues(): @@ -604,7 +624,8 @@ def main(argv): # FIXME: Split out debugging data for n in n_print: if not n.ignore: - out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" nick="+n.nick+ " measured_at="+str(int(n.measured_at))+" pid_error="+str(n.pid_error)+" pid_error_sum="+str(n.pid_error_sum)+" pid_bw="+str(int(n.pid_bw))+" pid_delta="+str(n.pid_delta)+" circ_fail="+str(n.circ_fail_rate)+"\n") + # Turns out str() is more accurate than %lf + out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" nick="+n.nick+ " measured_at="+str(int(n.measured_at))+" updated_at="+str(int(n.updated_at))+" pid_error="+str(n.pid_error)+" pid_error_sum="+str(n.pid_error_sum)+" pid_bw="+str(int(n.pid_bw))+" pid_delta="+str(n.pid_delta)+" circ_fail="+str(n.circ_fail_rate)+"\n") out.close() write_file_list(argv[1])
participants (1)
-
mikeperry@torproject.org