commit e7a7dce049904664f0c1d7ba99ff76779c23e9ea Author: Tom Ritter tom@ritter.vg Date: Fri Apr 14 23:42:30 2017 -0500
Ingest historical bwauth statistics data
Update the parseOldConsensuses.py script to ingest historical bwauth data. Note that this script is generally run one time and is not intended to be a well-maintained script for future use. It needs care and feeding for each major run.
Additionally create a mergeDatabase script. This script... probably does not do what we want. It's included mostly as a placeholder for future development if we want to correct and use it in the future. --- mergeDatabases.py | 66 ++++++++++++++++ parseOldConsensuses.py | 208 ++++++++++++++++++++++++++++++++++++++++++++----- utility.py | 8 ++ 3 files changed, 263 insertions(+), 19 deletions(-)
diff --git a/mergeDatabases.py b/mergeDatabases.py new file mode 100755 index 0000000..ac025f5 --- /dev/null +++ b/mergeDatabases.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python + +import os +import sys +import time +import sqlite3 +import datetime +import operator +import traceback +import subprocess + +if __name__ == '__main__': + if len(sys.argv) != 3: + print "Usage: ", sys.argv[0], "src.db dest.db" + print "\tMerge all the data from src into dest" + sys.exit(1) + + if not os.path.isfile(sys.argv[1]): + print "Source is not a file" + sys.exit(1) + if not os.path.isfile(sys.argv[2]): + print "Dest is not a file" + sys.exit(1) + + src = sqlite3.connect(sys.argv[1]) + dst = sqlite3.connect(sys.argv[2]) + + s_tbls = src.execute("SELECT name FROM sqlite_master WHERE type = 'table'") + for t in s_tbls: + t = t[0] + skip_table = False + + d_tbl = dst.execute("SELECT name FROM sqlite_master WHERE type = 'table' and name = ?", (t,)) + if not d_tbl.fetchone(): + print "Skipping table", t, "which is in src but not in dst" + continue + + s_cols = src.execute("PRAGMA table_info(" + t + ")") + d_cols = dst.execute("PRAGMA table_info(" + t + ")") + s_cols = s_cols.fetchall() + d_cols = d_cols.fetchall() + if len(s_cols) != len(d_cols): + print "Skipping table", t, "which has", len(s_cols), "columns in src and", len(d_cols) + continue + for i in range(len(s_cols)): + if s_cols[i] != d_cols[i]: + print "Skipping table", t, "because column", 1, "is", s_cols[i], "in src and", d_cols[i], "in dst" + skip_table = True + + if skip_table: + continue + + print "Merging table", t + merged = 0 + s = src.execute("SELECT * FROM " + t) + for r in s.fetchall(): + date = r[0] + has_value = False + for v in r[1:]: + if v: + has_value = True + if has_value: + merged += 1 + dst.execute("INSERT OR REPLACE INTO " + t + " VALUES (" + ",".join("?" * len(r)) + ")", r) + dst.commit() + print "Inserted or updated", merged, "rows" \ No newline at end of file diff --git a/parseOldConsensuses.py b/parseOldConsensuses.py index 834c386..defa2f5 100755 --- a/parseOldConsensuses.py +++ b/parseOldConsensuses.py @@ -18,6 +18,7 @@ import stem.util.conf import stem.util.enum
from stem import Flag +from stem.descriptor.reader import DescriptorReader from stem.util.lru_cache import lru_cache
def get_dirauths_in_tables(): @@ -48,17 +49,23 @@ def get_dirauth_from_filename(filename): return "tor26" elif key == "0232AF901C31A04EE9848595AF9BB7620D4C5B2E" or key == "585769C78764D58426B8B52B6651A5A71137189A": return "dannenberg" - elif key == "27B6B5996C426270A5C95488AA5BCEB6BCC86956": - return "turtles" + elif key == "27B6B5996C426270A5C95488AA5BCEB6BCC86956": + return "turtles" else: raise Exception("Unexpcected dirauth key: " + key + " " + filename)
def unix_time(dt): return (dt - datetime.datetime.utcfromtimestamp(0)).total_seconds() * 1000.0
+def ut_to_datetime(ut): + return datetime.datetime.utcfromtimestamp(ut / 1000) + +def ut_to_datetime_format(ut): + return ut_to_datetime(ut).strftime("%Y-%m-%d-%H-%M-%S") + def get_time_from_filename(filename): voteTime = filename.split('-') - if len(voteTime) < 9: + if len(voteTime) < 7: raise Exception("Strange filename: " + filename)
v = [int(x) for x in filename.split('-')[0:6]] @@ -66,26 +73,30 @@ def get_time_from_filename(filename): voteTime = unix_time(voteTime) return voteTime
-def main(dir): - dirAuths = get_dirauths_in_tables() - dbc = sqlite3.connect(os.path.join('data', 'historical.db')) - +def dirauth_relay_votes(directory, dirAuths, dbc): dirauth_columns = "" dirauth_columns_questions = "" for d in dirAuths: dirauth_columns += d + "_known integer, " + d + "_running integer, " + d + "_bwauth integer, " dirauth_columns_questions += ",?,?,?"
+ dbc.execute("CREATE TABLE IF NOT EXISTS vote_data(date integer, " + dirauth_columns + "PRIMARY KEY(date ASC))") + dbc.commit() + votes = {} - for root, dirs, files in os.walk(dir): + for root, dirs, files in os.walk(directory): for f in files: - filepath = os.path.join(root, f) - print filepath + filepath = os.path.join(root, f) + print filepath
if '"' in f: raise Exception("Potentially malicious filename") - elif "votes-" in f and ".tar" in f: - continue + elif "votes-" in f and ".tar" in f: + continue + elif "consensuses-" in f and ".tar" in f: + continue + elif "-vote-" not in f: + continue
voteTime = get_time_from_filename(f) if voteTime not in votes: @@ -104,11 +115,8 @@ def main(dir): votes[voteTime][dirauth]['running'] = int(subprocess.check_output('egrep "^s " "' + filepath + '" | grep " Running" | wc -l', shell=True)) votes[voteTime][dirauth]['bwlines'] = int(subprocess.check_output('grep Measured= "' + filepath + '" | wc -l', shell=True))
- dbc.execute("CREATE TABLE IF NOT EXISTS vote_data(date integer, " + dirauth_columns + "PRIMARY KEY(date ASC))") - dbc.commit() - for t in votes: - print t + print ut_to_datetime(t) print "\t", len(votes[t]) for d in votes[t]: print "\t", d, votes[t][d]['bwlines'], votes[t][d]['running'] @@ -127,13 +135,175 @@ def main(dir): dbc.execute("INSERT OR REPLACE INTO vote_data VALUES (?" + dirauth_columns_questions + ")", insertValues) dbc.commit()
+def bwauth_measurements(directory, dirAuths, dbc): + #Find all the consensuses and votesrm + votes = {} + consensuses = {} + for root, dirs, files in os.walk(directory): + for f in files: + filepath = os.path.join(root, f) + + if '"' in f: + raise Exception("Potentially malicious filename") + elif "votes-" in f and ".tar" in f: + continue + elif "consensuses-" in f and ".tar" in f: + continue + + if "-consensus" in f: + consensusTime = get_time_from_filename(f) + if consensusTime not in consensuses: + consensuses[consensusTime] = filepath + else: + print "Found two consensuses with the same time:", ut_to_datetime(consensusTime) + + #print "Consensus:", filepath + elif "-vote-" in f: + voteTime = get_time_from_filename(f) + + # Test to see if we already processed this one + cur = dbc.cursor() + cur.execute("SELECT * FROM bwauth_data WHERE date = ?", (voteTime,)) + if cur.fetchone(): + print "Skipping", f, "because we already processed it" + continue + elif voteTime not in votes: + votes[voteTime] = {} + + dirauth = get_dirauth_from_filename(f) + + if dirauth not in dirAuths: + raise Exception("Found a dirauth I don't know about (probably spelling): " + dirauth) + elif dirauth not in votes[voteTime]: + votes[voteTime][dirauth] = filepath + else: + print "Found two votes for dirauth " + dirauth + ":", filepath, "and", votes[voteTime][dirauth] + + #print "Vote:", dirauth, filepath + + #Make sure we have a consensus for each vote + to_del = [] + for v in votes: + if v not in consensuses: + print "Have votes for time", ut_to_datetime(v), "but no consensus!" + to_del.append(v) + #sys.exit(1) + for i in to_del: + del votes[i] + + #Make the table + bwauth_columns = "" + bwauth_columns_questions = "" + for d in dirAuths: + bwauth_columns += d + "_above integer, " + d + "_shared integer, " + d + "_exclusive integer, " + d + "_below integer, " + d + "_unmeasured integer, " + bwauth_columns_questions += ",?,?,?,?,?" + + dbc.execute("CREATE TABLE IF NOT EXISTS bwauth_data(date integer, " + bwauth_columns + "PRIMARY KEY(date ASC))") + dbc.commit() + + reviewed = 0 + for v in votes: + reviewed += 1 + print "Reviewing", consensuses[v], "(" + str(reviewed) + "/" + str(len(votes)) + ")" + + #Get the consensus data + consensusRouters = {} + with DescriptorReader(consensuses[v]) as reader: + reader.register_skip_listener(my_listener) + for relay in reader: + consensusRouters[relay.fingerprint] = "Unmeasured" if relay.is_unmeasured else relay.bandwidth + + #The vote data + bwauthVotes = {} + for d in votes[v]: + if d not in bwauthVotes: + bwauthVotes[d] = {} + + measured_something = False + with DescriptorReader(votes[v][d]) as reader: + reader.register_skip_listener(my_listener) + for relay in reader: + if relay.measured: + bwauthVotes[d][relay.fingerprint] = relay.measured + measured_something = True + if not measured_something: + del bwauthVotes[d] + + #Now match them up and store the data + thisConsensusResults = {} + for r in consensusRouters: + for d in bwauthVotes: + had_any_value = False + if d not in thisConsensusResults: + thisConsensusResults[d] = {'unmeasured' : 0, 'above' : 0, 'below' : 0, 'exclusive' : 0 , 'shared' : 0} + + if consensusRouters[r] == "Unmeasured": + continue + elif r not in bwauthVotes[d]: + had_any_value = True + thisConsensusResults[d]['unmeasured'] += 1 + elif consensusRouters[r] < bwauthVotes[d][r]: + had_any_value = True + thisConsensusResults[d]['above'] += 1 + elif consensusRouters[r] > bwauthVotes[d][r]: + had_any_value = True + thisConsensusResults[d]['below'] += 1 + elif consensusRouters[r] == bwauthVotes[d][r] and \ + 1 == len([1 for d_i in bwauthVotes if d_i in bwauthVotes and r in bwauthVotes[d_i] and bwauthVotes[d_i][r] == consensusRouters[r]]): + had_any_value = True + thisConsensusResults[d]['exclusive'] += 1 + elif consensusRouters[r] == bwauthVotes[d][r] and \ + 1 != len([1 for d_i in bwauthVotes if d_i in bwauthVotes and r in bwauthVotes[d_i] and bwauthVotes[d_i][r] == consensusRouters[r] ]): + had_any_value = True + thisConsensusResults[d]['shared'] += 1 + else: + print "What case am I in???" + sys.exit(1) + + if not had_any_value: + del thisConsensusResults[d] + + insertValues = [v] + for d in dirAuths: + if d in thisConsensusResults: + insertValues.append(thisConsensusResults[d]['above']) + insertValues.append(thisConsensusResults[d]['shared']) + insertValues.append(thisConsensusResults[d]['exclusive']) + insertValues.append(thisConsensusResults[d]['below']) + insertValues.append(thisConsensusResults[d]['unmeasured']) + else: + insertValues.append(None) + insertValues.append(None) + insertValues.append(None) + insertValues.append(None) + insertValues.append(None) + + dbc.execute("INSERT OR REPLACE INTO bwauth_data VALUES (?" + bwauth_columns_questions + ")", insertValues) + dbc.commit() + +def my_listener(path, exception): + print "Skipped!" + print path + print exception + + +def main(itype, directory): + dirAuths = get_dirauths_in_tables() + dbc = sqlite3.connect(os.path.join('data', 'historical.db')) + + if itype == "dirauth_relay_votes": + dirauth_relay_votes(directory, dirAuths, dbc) + elif itype == "bwauth_measurements": + bwauth_measurements(directory, dirAuths, dbc) + else: + print "Unknown ingestion type"
if __name__ == '__main__': try: - if len(sys.argv) != 2: - print "Usage: ", sys.argv[0], "vote-directory" + if len(sys.argv) != 3: + print "Usage: ", sys.argv[0], "ingestion-type vote-directory" else: - main(sys.argv[1]) + main(sys.argv[1], sys.argv[2]) except: msg = "%s failed with:\n\n%s" % (sys.argv[0], traceback.format_exc()) print "Error: %s" % msg diff --git a/utility.py b/utility.py index 6cebe6b..aeb7b8d 100644 --- a/utility.py +++ b/utility.py @@ -84,3 +84,11 @@ def _get_documents(label, resource): def unix_time(dt): return (dt - datetime.datetime.utcfromtimestamp(0)).total_seconds() * 1000.0
+def ut_to_datetime(ut): + return datetime.datetime.utcfromtimestamp(ut / 1000) + +def ut_to_datetime_format(ut): + return consensus_datetime_format(ut_to_datetime(ut)) + +def consensus_datetime_format(dt): + return dt.strftime("%Y-%m-%d-%H-%M-%S")
tor-commits@lists.torproject.org