commit 75d34050654175e9f952d98966476c173d40cd2a Author: Sathyanarayanan Gunasekaran gsathya.ceg@gmail.com Date: Fri Sep 21 19:11:55 2012 +0530
Refactor database.py
This is a pretty big commit with lots of refactoring: - Remove certain global vars, use config file - Remove freshen_database() - It's now a part of update_databases() - get_database() is now renamed to get_database_conn(), and removed the unused conn.row_factory variable - update_database() is now renamed to update_databases() - Add more logging info - Make modular insertion statements - get_summary_routers() returns relays/bridges which are now Router objects - Refactor code * Remove unnecessary return statements and whitespace * Make multiline statements more pythonic --- pyonionoo.conf | 2 +- pyonionoo/config.py | 4 +- pyonionoo/database.py | 202 ++++++++++++++++++++-------------------- pyonionoo/handlers/summary.py | 9 +- pyonionoo/parser.py | 18 ++-- pyonionoo/web.py | 2 +- 6 files changed, 118 insertions(+), 119 deletions(-)
diff --git a/pyonionoo.conf b/pyonionoo.conf index 6e93de1..d3bbf60 100644 --- a/pyonionoo.conf +++ b/pyonionoo.conf @@ -30,4 +30,4 @@ debug = no
[metrics] out_dir = /tmp - +summary_file = summary diff --git a/pyonionoo/config.py b/pyonionoo/config.py index 1de03b0..ed8001b 100644 --- a/pyonionoo/config.py +++ b/pyonionoo/config.py @@ -61,6 +61,6 @@ def parse_config(filename): else: settings["mysql_settings"] = None
- settings['metrics_out'] = xget(cfg.get, 'metrics', 'out_dir', None) - + settings['metrics_out'] = xget(cfg.get, 'metrics', 'out_dir', '/tmp') + settings['summary_file'] = xget(cfg.get, 'metrics', 'summary_file', 'summary') return settings diff --git a/pyonionoo/database.py b/pyonionoo/database.py index 8dd369f..1bc555b 100644 --- a/pyonionoo/database.py +++ b/pyonionoo/database.py @@ -7,14 +7,6 @@ import time
from pyonionoo.parser import Router
-# Metrics out/ directory. -METRICS_OUT = None - -# Summary document that will be read into an SQLite database. This should -# probably be defined in a configuration file somewhere. -SUMMARY_FILE = 'summary' -SUMMARY = None - # Name of the SQLite database. This should be defined in a configuration file # somewhere. And it should be ':memory:', not a file. BUT: it seems that # (1) sqlite3.Connection objects are not thread-safe, and (2) if different @@ -90,19 +82,28 @@ def _create_table(conn, tbl_name, schema):
cursor = conn.cursor()
+ logging.info("Creating table %s" % (tbl_name)) # Can only use placeholders in select statements? cursor.execute('DROP TABLE IF EXISTS %s' % tbl_name) cursor.execute('CREATE TABLE %s (%s)' % (tbl_name, schema)) + logging.info("Created table %s" % (tbl_name)) + +def bootstrap_database(metrics_out, summary_file): + """ + Bootstraps the database creation process: + * Create 3 databases - summary, flags, addresses + * Updates the databases
- return + @type metrics_out: string + @param metrics_out: path to metrics data dir
-def create_database(metrics_out): - global METRICS_OUT, SUMMARY + @type summary_file: string + @param summary_file: summary file name + """
- METRICS_OUT = metrics_out - SUMMARY = os.path.join(METRICS_OUT, SUMMARY_FILE) + summary_file = os.path.join(metrics_out, summary_file)
- conn = sqlite3.connect(DBNAME) + conn = get_database_conn()
# Create the tables. _create_table(conn, summary_tbl_name, summary_schema) @@ -111,17 +112,11 @@ def create_database(metrics_out):
conn.commit()
- freshen_database() - - return - + update_databases(summary_file)
-def update_database(): +def update_databases(summary_file=None): """ - Create the database. - - rtype: sqlite3.Connection - return: A connection object for the database that has been created. + Updates all three databases information.
This operation operates as a single transaction. Therefore, the database can be read by other requests while it is being @@ -129,16 +124,19 @@ def update_database(): database. Once this function completes, all future reads will be from the "updated" database. """ - global DB_CREATION_TIME
- logging.info("Updating database.") + if DB_CREATION_TIME >= os.stat(summary_file).st_mtime: + return + + logging.info("Updating database")
# It seems that the default isolation_level is probably OK for # all of this to be done in a single transaction. - conn = sqlite3.connect(DBNAME, isolation_level='IMMEDIATE') + conn = get_database_conn()
# First delete all records. + logging.info("Deleting data from databases") CURSOR = conn.cursor() CURSOR.execute('DELETE FROM %s' % summary_tbl_name) CURSOR.execute('DELETE FROM %s' % flags_tbl_name) @@ -153,68 +151,74 @@ def update_database(): # field in the flags/addresses table. Here we can avoid all those # selects, because the rowid attribute of the cursor is set to that # id field right after we execute the (individual) insert statements. - fields = ('type', 'nickname', 'fingerprint', 'running', - 'time_published', 'OR_port', 'dir_port', 'consensus_weight', - 'country_code', 'hostname', 'time_lookup') - insert_stmt = ('insert into %s (%s) values (%s)' % - (summary_tbl_name, ','.join(fields), ','.join(['?']*len(fields)))) - with open(SUMMARY) as f: + summary_fields = ('type', 'nickname', 'fingerprint', 'running', + 'time_published', 'OR_port', 'dir_port', 'consensus_weight', + 'country_code', 'hostname', 'time_lookup') + addresses_fields = ('id_of_row', 'address') + flags_fields = ('id_of_row', 'flag') + + insert_stmt = 'insert into %s (%s) values (%s)' + + # create insertion statement for summary table + summary_insert_stmt = (insert_stmt % (summary_tbl_name, ','.join(summary_fields), + ','.join(['?']*len(summary_fields)))) + + # create insertion statement for addresses table + addresses_insert_stmt = (insert_stmt % (addresses_tbl_name, ','.join(addresses_fields), + ','.join(['?']*len(addresses_fields)))) + + # create insertion statement for flags table + flags_insert_stmt = (insert_stmt % (flags_tbl_name, ','.join(flags_fields), + ','.join(['?']*len(flags_fields))))
+ if not summary_file: + # raise Exception? + return + + with open(summary_file) as f: for line in f.readlines(): - router = Router(line) - - router_type = 'r' if router.is_relay else 'b' - - # parser.Router.consensus_weight is a string??? - router_tuple = (router_type, router.nickname, router.fingerprint, router.is_running, router.time_published, router.orport, router.dirport, int(router.consensus_weight), router.country_code, router.hostname, router.time_of_lookup) + router = Router() + router.parse(line) + + router_tuple = (router.type, router.nickname, router.fingerprint, + router.running, router.time_published, router.orport, + router.dirport, router.consensus_weight, router.country_code, + router.hostname, router.time_of_lookup)
# TODO: Determine whether sqlite3 optimizes by remembering # this insert command and not parsing it every time it sees # it. This is mentioned in PEP 249, but we aren't sure where # the PEP says that implementations might optimize in this way, # or might allow users to optimize in this way. - CURSOR.execute(insert_stmt, router_tuple) + CURSOR.execute(summary_insert_stmt, router_tuple) id_num = CURSOR.lastrowid
- address_info = (id_num, router.address) - CURSOR.execute('INSERT INTO addresses (id_of_row, address) VALUES (?,?)', address_info) + address_values = (id_num, router.address) + CURSOR.execute(addresses_insert_stmt, address_values)
- flags = router.flags - for flag in flags: - flag_info = (id_num, flag) - CURSOR.execute('INSERT INTO flags (id_of_row, flag) VALUES (?,?)', flag_info) + for flag in router.flags: + flag_values = (id_num, flag) + CURSOR.execute(flags_insert_stmt, flag_values)
conn.commit() - + logging.info("Table updated") DB_CREATION_TIME = time.time()
- return conn - -def freshen_database(): - global FRESHEN_TIMER - - if DB_CREATION_TIME < os.stat(SUMMARY).st_mtime: - update_database() - - FRESHEN_TIMER = threading.Timer( - DB_UPDATE_INTERVAL, freshen_database) + FRESHEN_TIMER = threading.Timer(DB_UPDATE_INTERVAL, update_databases, summary_file) FRESHEN_TIMER.start()
def cancel_freshen(): FRESHEN_TIMER.cancel()
-def get_database(): +def get_database_conn(): conn = sqlite3.connect(DBNAME) - conn.row_factory = sqlite3.Row return conn
-def query_summary_tbl( - running_filter=None, type_filter=None, hex_fingerprint_filter=None, - country_filter=None, search_filter=None, - order_field=None, order_asc=True, offset_value=None, limit_value=None, - fields=('fingerprint',)): - - conn = get_database() +def query_summary_tbl(running_filter=None, type_filter=None, hex_fingerprint_filter=None, + country_filter=None, search_filter=None, order_field=None, + order_asc=True, offset_value=None, limit_value=None, + fields=('fingerprint',)): + conn = get_database_conn()
# Build up a WHERE clause based on the request parameters. We only # consider the case in which the client specifies 'search' or @@ -228,86 +232,82 @@ def query_summary_tbl(
# Actually, this is a moderately painful parameter to implement. # Testing for an IP address probably means using regular expressions. - # SQLite doesn't suppor them without a user-defined function. + # SQLite doesn't support them without a user-defined function. # Matching against a Python RE is easy to do, but then we have # to have a where clause that matches against the beginning of a # field value, and SQLite doesn't appear to support such a search # (unless, of course, you want to write a user-defined match() # function). - pass else: - if running_filter != None: + if running_filter: clauses.append("running = %s" % int(running_filter)) - if type_filter != None: + if type_filter: clauses.append("type = '%s'" % type_filter) - if hex_fingerprint_filter != None: + if hex_fingerprint_filter: clauses.append("fingerprint = '%s'" % hex_fingerprint_filter) - if country_filter != None: + if country_filter: clauses.append("country = '%s'" % country_filter) where_clause = ('WHERE %s' % ' and '.join(clauses)) if clauses else ''
# Construct the ORDER, LIMIT, and OFFSET clauses. order_clause = '' - if order_field != None: - order_clause = 'ORDER BY %s %s' % (order_field, - 'ASC' if order_asc else 'DESC') + if order_field: + order_clause = 'ORDER BY %s %s' % (order_field, + 'ASC' if order_asc else 'DESC') limit_clause = '' - if limit_value != None: + if limit_value: limit_clause = 'LIMIT %s' % limit_value offset_clause = '' - if offset_value != None: + if offset_value: offset_clause = 'OFFSET %s' % offset_value
cursor = conn.cursor() - cursor.execute('SELECT %s FROM summary %s %s %s %s' % (','.join(fields), - where_clause, order_clause, limit_clause, offset_clause)) + cursor.execute('SELECT %s FROM summary %s %s %s %s' % + (','.join(fields), where_clause, order_clause, limit_clause, + offset_clause))
return cursor.fetchall()
- -def get_summary_routers( - running_filter=None, type_filter=None, hex_fingerprint_filter=None, - country_filter=None, search_filter=None, - order_field=None, order_asc=True, offset_value=None, limit_value=None): +def get_summary_routers(running_filter=None, type_filter=None, hex_fingerprint_filter=None, + country_filter=None, search_filter=None, order_field=None, + order_asc=True, offset_value=None, limit_value=None): """ Get summary document according to request parameters.
@rtype: tuple. @return: tuple of form (relays, bridges, relays_time, bridges_time), where - * relays (bridges) is a list of sqlite3.Row, each of which consists of the - various attributes. The important part is that each element of - relays (bridges) can be treated as a dictionary, with keys - the same as the database fields. - * relays_time (bridges_time) is a datetime object with the most - recent timestamp of the relay descriptors in relays. + * relays/bridges is a list of Router objects + * relays_time/bridges_time is a datetime object with the most + recent timestamp of the relay descriptors in relays. """
# Timestamps of most recent relay/bridge in the returned set. relay_timestamp = datetime.datetime(1900, 1, 1, 1, 0) bridge_timestamp = datetime.datetime(1900, 1, 1, 1, 0)
- filtered_relays, filtered_bridges = [], [] - fields = ('type', 'nickname', 'fingerprint', 'running', 'country_code', + relays, bridges = [], [] + fields = ('type', 'nickname', 'fingerprint', 'running', 'country_code', 'time_published', 'consensus_weight') - for row in query_summary_tbl( - running_filter, type_filter, hex_fingerprint_filter, - country_filter, search_filter, - order_field, order_asc, offset_value, limit_value, fields): + for row in query_summary_tbl(running_filter, type_filter, hex_fingerprint_filter, + country_filter, search_filter,order_field, order_asc, + offset_value, limit_value, fields):
- # Set the destination list for this router. - dest = filtered_relays if row[0] == 'r' else filtered_bridges - dest.append(row) + current_router = datetime.datetime.strptime(row[5], "%Y-%m-%d %H:%M:%S")
+ router = Router() + + # This is magic + map(lambda (attr, value): setattr(router, attr, value), zip(fields, row))
- current_router = datetime.datetime.strptime(row[5], "%Y-%m-%d %H:%M:%S") if row[0] == 'r': + relays.append(router) if current_router > relay_timestamp: relay_timestamp = current_router if row[0] == 'b': + bridges.append(router) if current_router > bridge_timestamp: bridge_timestamp = current_router
- total_routers = (filtered_relays, filtered_bridges, relay_timestamp, bridge_timestamp) + total_routers = (relays, bridges, relay_timestamp, bridge_timestamp) return total_routers - diff --git a/pyonionoo/handlers/summary.py b/pyonionoo/handlers/summary.py index 7260c77..ca12a0f 100644 --- a/pyonionoo/handlers/summary.py +++ b/pyonionoo/handlers/summary.py @@ -35,13 +35,12 @@ class SummaryHandler(cyclone.web.RequestHandler): relays, bridges = [], [] filtered_relays, filtered_bridges, relay_timestamp, bridge_timestamp = routers
- for (src, dest) in ((filtered_relays, relays), - (filtered_bridges, bridges)): + for (src, dest) in ((filtered_relays, relays), (filtered_bridges, bridges)): for router in src: dest.append({ - 'n' : router['nickname'], - 'f' : router['fingerprint'], - 'r' : bool(router['running']) + 'n' : router.nickname, + 'f' : router.fingerprint, + 'r' : bool(router.running) })
# response is a dict, so the order is not maintained. should the diff --git a/pyonionoo/parser.py b/pyonionoo/parser.py index e7f27e8..5105782 100644 --- a/pyonionoo/parser.py +++ b/pyonionoo/parser.py @@ -2,7 +2,7 @@ import re import datetime
class Router: - def __init__(self, raw_content): + def __init__(self): self.nickname = None self.fingerprint = None self.address = None @@ -13,21 +13,21 @@ class Router: self.orport = None self.dirport = None self.flags = None - self.is_running = False + self.running = False self.consensus_weight = None self.country_code = None self.hostname = None self.time_of_lookup = None - self.is_relay = None - self._parse(raw_content) + self.type = None
- def _parse(self, raw_content): + def parse(self, raw_content): values = raw_content.split() if len(values) < 9: #raise Exception raise ValueError("Invalid router!") - if values[0] == "r": - self.is_relay = True + if values[0] == "r": self.type = "r" + else: self.type = "b" + self.nickname = values[1] self.fingerprint = values[2]
@@ -51,8 +51,8 @@ class Router: self.flags = values[8].split(',') for flag in self.flags: if flag == "Running": - self.is_running = True - self.consensus_weight = values[9] + self.running = True + self.consensus_weight = int(values[9]) self.country_code = values[10] if values[11] != "null" : self.hostname = values[11] self.time_of_lookup = int(values[12]) diff --git a/pyonionoo/web.py b/pyonionoo/web.py index 72ae9b1..b83cbcd 100644 --- a/pyonionoo/web.py +++ b/pyonionoo/web.py @@ -36,7 +36,7 @@ class Application(cyclone.web.Application): if not settings['metrics_out']: raise ValueError
- database.create_database(settings['metrics_out']) + database.bootstrap_database(settings['metrics_out'], settings['summary_file'])
cyclone.web.Application.__init__(self, handlers, **settings)