commit 5331580a114442db79a7af3fd767107e98b4969a Author: Mike Perry mikeperry-git@fscked.org Date: Mon Oct 31 20:07:45 2011 -0700
Make way for fix for #1984.
Clean out some annoying garbage from a previous filtering method. --- NetworkScanners/BwAuthority/aggregate.py | 181 +++++++---------------------- 1 files changed, 44 insertions(+), 137 deletions(-)
diff --git a/NetworkScanners/BwAuthority/aggregate.py b/NetworkScanners/BwAuthority/aggregate.py index 8658d33..0e20c5b 100755 --- a/NetworkScanners/BwAuthority/aggregate.py +++ b/NetworkScanners/BwAuthority/aggregate.py @@ -69,34 +69,11 @@ def base10_round(bw_val): return 1 return ret
- - -def closest_to_one(ratio_list): - min_dist = 0x7fffffff - min_item = -1 - for i in xrange(len(ratio_list)): - if abs(1.0-ratio_list[i]) < min_dist: - min_dist = abs(1.0-ratio_list[i]) - min_item = i - return min_item - -class NodeData: - def __init__(self, timestamp): - self.strm_bw = [] - self.filt_bw = [] - self.ns_bw = [] - self.desc_bw = [] - self.timestamp = timestamp - class Node: def __init__(self): - self.node_data = {} self.ignore = False self.idhex = None self.nick = None - self.chosen_time = None - self.chosen_sbw = None - self.chosen_fbw = None self.sbw_ratio = None self.fbw_ratio = None self.pid_bw = 0 @@ -108,17 +85,20 @@ class Node: self.ratio = None self.new_bw = None self.change = None - self.bw_idx = 0 - self.strm_bw = [] - self.filt_bw = [] - self.ns_bw = [] - self.desc_bw = [] - self.timestamps = [] + + # measurement vars from bwauth lines + self.measured_at = 0 + self.strm_bw = 0 + self.filt_bw = 0 + self.ns_bw = 0 + self.desc_bw = 0 + self.circ_fail_rate = 0 + self.strm_fail_rate = 0
def revert_to_vote(self, vote): self.new_bw = vote.bw self.pid_error = vote.pid_error - self.chosen_time = vote.measured_at + self.measured_at = vote.measured_at
# Derivative of error for pid control def get_pid_bw(self, prev_vote, kp): @@ -128,10 +108,10 @@ class Node: # This is non-standard self.pid_error_sum = prev_vote.pid_error_sum*(1 - 1.0/T_i) + self.pid_error
- self.pid_bw = self.ns_bw[self.bw_idx] \ - + kp*(self.ns_bw[self.bw_idx]*self.pid_error \ - + self.ns_bw[self.bw_idx]*self.integral_error()/T_i \ - + self.ns_bw[self.bw_idx]*self.d_error_dt()*T_d) + self.pid_bw = self.ns_bw \ + + kp*(self.ns_bw*self.pid_error \ + + self.ns_bw*self.integral_error()/T_i \ + + self.ns_bw*self.d_error_dt()*T_d) return self.pid_bw
# Time-weighted sum of error per unit of time (measurement sample) @@ -153,74 +133,14 @@ class Node: raise Exception("Line mismatch") self.idhex = line.idhex self.nick = line.nick - if line.slice_file not in self.node_data \ - or self.node_data[line.slice_file].timestamp < line.timestamp: - self.node_data[line.slice_file] = NodeData(line.timestamp) - - # TODO: This is kinda nutty. Can we simplify? For instance, - # do these really need to be lists inside the nd? - nd = self.node_data[line.slice_file] - nd.strm_bw.append(line.strm_bw) - nd.filt_bw.append(line.filt_bw) - nd.ns_bw.append(line.ns_bw) - nd.desc_bw.append(line.desc_bw) - - self.strm_bw = [] - self.filt_bw = [] - self.ns_bw = [] - self.desc_bw = [] - self.timestamps = [] - - for nd in self.node_data.itervalues(): - self.strm_bw.extend(nd.strm_bw) - self.filt_bw.extend(nd.filt_bw) - self.ns_bw.extend(nd.ns_bw) - self.desc_bw.extend(nd.desc_bw) - for i in xrange(len(nd.ns_bw)): - self.timestamps.append(nd.timestamp) - - def avg_strm_bw(self): - return sum(self.strm_bw)/float(len(self.strm_bw)) - - def avg_filt_bw(self): - return sum(self.filt_bw)/float(len(self.filt_bw)) - - def avg_ns_bw(self): - return sum(self.ns_bw)/float(len(self.ns_bw)) - - def avg_desc_bw(self): - return sum(self.desc_bw)/float(len(self.desc_bw)) - - # This can be bad for bootstrapping or highly bw-variant nodes... - # we will choose an old measurement in that case.. We need - # to build some kind of time-bias here.. - def _choose_strm_bw_one(self, net_avg): - i = closest_to_one(map(lambda f: f/net_avg, self.strm_bw)) - self.chosen_sbw = i - return self.chosen_sbw - - def _choose_filt_bw_one(self, net_avg): - i = closest_to_one(map(lambda f: f/net_avg, self.filt_bw)) - self.chosen_fbw = i - return self.chosen_fbw - - # Simply return the most recent one instead of this - # closest-to-one stuff - def choose_filt_bw(self, net_avg): - max_idx = 0 - for i in xrange(len(self.timestamps)): - if self.timestamps[i] > self.timestamps[max_idx]: - max_idx = i - self.chosen_fbw = max_idx - return self.chosen_fbw - - def choose_strm_bw(self, net_avg): - max_idx = 0 - for i in xrange(len(self.timestamps)): - if self.timestamps[i] > self.timestamps[max_idx]: - max_idx = i - self.chosen_sbw = max_idx - return self.chosen_sbw + if line.measured_at > self.measured_at: + self.measured_at = line.measured_at + self.strm_bw = line.strm_bw + self.filt_bw = line.filt_bw + self.ns_bw = line.ns_bw + self.desc_bw = line.desc_bw + self.circ_fail_rate = line.circ_fail_rate + self.strm_fail_rate = line.strm_fail_rate
class Line: def __init__(self, line, slice_file, timestamp): @@ -231,7 +151,13 @@ class Line: self.ns_bw = int(re.search("[\s]*ns_bw=([\S]+)[\s]*", line).group(1)) self.desc_bw = int(re.search("[\s]*desc_bw=([\S]+)[\s]*", line).group(1)) self.slice_file = slice_file - self.timestamp = timestamp + self.measured_at = timestamp + try: + self.circ_fail_rate = float(re.search("[\s]*circ_fail_rate=([\S]+)[\s]*", line).group(1)) + self.strm_fail_rate = float(re.search("[\s]*strm_fail_rate=([\S]+)[\s]*", line).group(1)) + except: + self.circ_fail_rate = 0 + self.strm_fail_rate = 0
class Vote: def __init__(self, line): @@ -391,25 +317,10 @@ def main(argv): if len(nodes) == 0: plog("NOTICE", "No scan results yet.") sys.exit(1) - - pre_strm_avg = sum(map(lambda n: n.avg_strm_bw(), nodes.itervalues()))/ \ - float(len(nodes)) - pre_filt_avg = sum(map(lambda n: n.avg_filt_bw(), nodes.itervalues()))/ \ - float(len(nodes)) - - plog("DEBUG", "Network pre_strm_avg: "+str(pre_strm_avg)) - plog("DEBUG", "Network pre_filt_avg: "+str(pre_filt_avg))
- for n in nodes.itervalues(): - n.choose_strm_bw(pre_strm_avg) - n.choose_filt_bw(pre_filt_avg) - plog("DEBUG", "Node "+n.nick+" chose sbw: "+\ - str(n.strm_bw[n.chosen_sbw])+" fbw: "+\ - str(n.filt_bw[n.chosen_fbw])) - - true_strm_avg = sum(map(lambda n: n.strm_bw[n.chosen_sbw], + true_strm_avg = sum(map(lambda n: n.strm_bw, nodes.itervalues()))/float(len(nodes)) - true_filt_avg = sum(map(lambda n: n.filt_bw[n.chosen_fbw], + true_filt_avg = sum(map(lambda n: n.filt_bw, nodes.itervalues()))/float(len(nodes))
plog("DEBUG", "Network true_strm_avg: "+str(true_strm_avg)) @@ -427,11 +338,11 @@ def main(argv): if n.idhex in prev_votes.vote_map and n.idhex in prev_consensus: if "Guard" in prev_consensus[n.idhex].flags: guard_cnt += 1 - guard_measure_time += (n.timestamps[n.chosen_fbw] - \ + guard_measure_time += (n.measured_at - \ prev_votes.vote_map[n.idhex].measured_at) else: node_cnt += 1 - node_measure_time += (n.timestamps[n.chosen_fbw] - \ + node_measure_time += (n.measured_at - \ prev_votes.vote_map[n.idhex].measured_at)
# There is a difference between measure period and sample rate. @@ -443,25 +354,21 @@ def main(argv):
tot_net_bw = 0 for n in nodes.itervalues(): - n.fbw_ratio = n.filt_bw[n.chosen_fbw]/true_filt_avg - n.sbw_ratio = n.strm_bw[n.chosen_sbw]/true_strm_avg + n.fbw_ratio = n.filt_bw/true_filt_avg + n.sbw_ratio = n.strm_bw/true_strm_avg if n.sbw_ratio > n.fbw_ratio: # Does this ever happen? plog("NOTICE", "sbw > fbw for "+n.nick) n.ratio = n.sbw_ratio - n.bw_idx = n.chosen_sbw - n.pid_error = (n.strm_bw[n.chosen_sbw] - true_strm_avg)/true_strm_avg + n.pid_error = (n.strm_bw - true_strm_avg)/true_strm_avg else: n.ratio = n.fbw_ratio - n.bw_idx = n.chosen_fbw - n.pid_error = (n.filt_bw[n.chosen_fbw] - true_filt_avg)/true_filt_avg - - n.chosen_time = n.timestamps[n.bw_idx] + n.pid_error = (n.filt_bw - true_filt_avg)/true_filt_avg
if cs_junk.bwauth_pid_control: if n.idhex in prev_votes.vote_map: # If there is a new sample, let's use it for all but guards - if n.chosen_time > prev_votes.vote_map[n.idhex].measured_at: + if n.measured_at > prev_votes.vote_map[n.idhex].measured_at: # Nodes with the Guard flag will respond slowly to feedback, # so they should be sampled less often, and in proportion to # the appropriate Wgx weight. @@ -470,7 +377,7 @@ def main(argv): and "Exit" not in prev_consensus[n.idhex].flags): # Do full feedback if our previous vote > 2.5 weeks old if n.idhex not in prev_votes.vote_map or \ - n.chosen_time - prev_votes.vote_map[n.idhex].measured_at > GUARD_SAMPLE_RATE: + n.measured_at - prev_votes.vote_map[n.idhex].measured_at > GUARD_SAMPLE_RATE: n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex], K_p) else: pid_error = n.pid_error @@ -497,7 +404,7 @@ def main(argv): # Reset values. Don't vote/sample this measurement round. n.revert_to_vote(prev_votes.vote_map[n.idhex]) else: # No prev vote, pure consensus feedback this round - n.new_bw = n.ns_bw[n.bw_idx] + K_p*n.ns_bw[n.bw_idx]*n.pid_error + n.new_bw = n.ns_bw + K_p*n.ns_bw*n.pid_error n.pid_error_sum = n.pid_error n.pid_bw = n.new_bw plog("INFO", "No prev vote for node "+n.nick+": Consensus feedback") @@ -505,9 +412,9 @@ def main(argv): n.pid_bw = 0 n.pid_error = 0 n.pid_error_sum = 0 - n.new_bw = n.desc_bw[n.bw_idx]*n.ratio + n.new_bw = n.desc_bw*n.ratio
- n.change = n.new_bw - n.desc_bw[n.bw_idx] + n.change = n.new_bw - n.desc_bw
if n.idhex in prev_consensus: if prev_consensus[n.idhex].bandwidth != None: @@ -535,7 +442,7 @@ def main(argv): n.pid_error_sum = 0 # Don't let unused error accumulate...
# WTF is going on here? - oldest_timestamp = min(map(lambda n: n.chosen_time, + oldest_timestamp = min(map(lambda n: n.measured_at, filter(lambda n: n.idhex in prev_consensus, nodes.itervalues()))) plog("INFO", "Oldest measured node: "+time.ctime(oldest_timestamp)) @@ -577,7 +484,7 @@ def main(argv): # FIXME: Split out debugging data for n in n_print: if not n.ignore: - out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" nick="+n.nick+ " measured_at="+str(int(n.chosen_time))+" pid_error="+str(n.pid_error)+" pid_error_sum="+str(n.pid_error_sum)+" pid_bw="+str(n.pid_bw)+" pid_delta="+str(n.derror_dt)+"\n") + out.write("node_id="+n.idhex+" bw="+str(base10_round(n.new_bw))+" nick="+n.nick+ " measured_at="+str(int(n.measured_at))+" pid_error="+str(n.pid_error)+" pid_error_sum="+str(n.pid_error_sum)+" pid_bw="+str(n.pid_bw)+" pid_delta="+str(n.derror_dt)+"\n") out.close()
if __name__ == "__main__":
tor-commits@lists.torproject.org