commit 75d34050654175e9f952d98966476c173d40cd2a
Author: Sathyanarayanan Gunasekaran <gsathya.ceg(a)gmail.com>
Date: Fri Sep 21 19:11:55 2012 +0530
Refactor database.py
This is a pretty big commit with lots of refactoring:
- Remove certain global vars, use config file
- Remove freshen_database() - It's now a part of update_databases()
- get_database() is now renamed to get_database_conn(), and removed
the unused conn.row_factory variable
- update_database() is now renamed to update_databases()
- Add more logging info
- Make modular insertion statements
- get_summary_routers() returns relays/bridges which are now Router
objects
- Refactor code
* Remove unnecessary return statements and whitespace
* Make multiline statements more pythonic
---
pyonionoo.conf | 2 +-
pyonionoo/config.py | 4 +-
pyonionoo/database.py | 202 ++++++++++++++++++++--------------------
pyonionoo/handlers/summary.py | 9 +-
pyonionoo/parser.py | 18 ++--
pyonionoo/web.py | 2 +-
6 files changed, 118 insertions(+), 119 deletions(-)
diff --git a/pyonionoo.conf b/pyonionoo.conf
index 6e93de1..d3bbf60 100644
--- a/pyonionoo.conf
+++ b/pyonionoo.conf
@@ -30,4 +30,4 @@ debug = no
[metrics]
out_dir = /tmp
-
+summary_file = summary
diff --git a/pyonionoo/config.py b/pyonionoo/config.py
index 1de03b0..ed8001b 100644
--- a/pyonionoo/config.py
+++ b/pyonionoo/config.py
@@ -61,6 +61,6 @@ def parse_config(filename):
else:
settings["mysql_settings"] = None
- settings['metrics_out'] = xget(cfg.get, 'metrics', 'out_dir', None)
-
+ settings['metrics_out'] = xget(cfg.get, 'metrics', 'out_dir', '/tmp')
+ settings['summary_file'] = xget(cfg.get, 'metrics', 'summary_file', 'summary')
return settings
diff --git a/pyonionoo/database.py b/pyonionoo/database.py
index 8dd369f..1bc555b 100644
--- a/pyonionoo/database.py
+++ b/pyonionoo/database.py
@@ -7,14 +7,6 @@ import time
from pyonionoo.parser import Router
-# Metrics out/ directory.
-METRICS_OUT = None
-
-# Summary document that will be read into an SQLite database. This should
-# probably be defined in a configuration file somewhere.
-SUMMARY_FILE = 'summary'
-SUMMARY = None
-
# Name of the SQLite database. This should be defined in a configuration file
# somewhere. And it should be ':memory:', not a file. BUT: it seems that
# (1) sqlite3.Connection objects are not thread-safe, and (2) if different
@@ -90,19 +82,28 @@ def _create_table(conn, tbl_name, schema):
cursor = conn.cursor()
+ logging.info("Creating table %s" % (tbl_name))
# Can only use placeholders in select statements?
cursor.execute('DROP TABLE IF EXISTS %s' % tbl_name)
cursor.execute('CREATE TABLE %s (%s)' % (tbl_name, schema))
+ logging.info("Created table %s" % (tbl_name))
+
+def bootstrap_database(metrics_out, summary_file):
+ """
+ Bootstraps the database creation process:
+ * Create 3 databases - summary, flags, addresses
+ * Updates the databases
- return
+ @type metrics_out: string
+ @param metrics_out: path to metrics data dir
-def create_database(metrics_out):
- global METRICS_OUT, SUMMARY
+ @type summary_file: string
+ @param summary_file: summary file name
+ """
- METRICS_OUT = metrics_out
- SUMMARY = os.path.join(METRICS_OUT, SUMMARY_FILE)
+ summary_file = os.path.join(metrics_out, summary_file)
- conn = sqlite3.connect(DBNAME)
+ conn = get_database_conn()
# Create the tables.
_create_table(conn, summary_tbl_name, summary_schema)
@@ -111,17 +112,11 @@ def create_database(metrics_out):
conn.commit()
- freshen_database()
-
- return
-
+ update_databases(summary_file)
-def update_database():
+def update_databases(summary_file=None):
"""
- Create the database.
-
- rtype: sqlite3.Connection
- return: A connection object for the database that has been created.
+ Updates all three databases information.
This operation operates as a single transaction. Therefore,
the database can be read by other requests while it is being
@@ -129,16 +124,19 @@ def update_database():
database. Once this function completes, all future reads will be
from the "updated" database.
"""
-
global DB_CREATION_TIME
- logging.info("Updating database.")
+ if DB_CREATION_TIME >= os.stat(summary_file).st_mtime:
+ return
+
+ logging.info("Updating database")
# It seems that the default isolation_level is probably OK for
# all of this to be done in a single transaction.
- conn = sqlite3.connect(DBNAME, isolation_level='IMMEDIATE')
+ conn = get_database_conn()
# First delete all records.
+ logging.info("Deleting data from databases")
CURSOR = conn.cursor()
CURSOR.execute('DELETE FROM %s' % summary_tbl_name)
CURSOR.execute('DELETE FROM %s' % flags_tbl_name)
@@ -153,68 +151,74 @@ def update_database():
# field in the flags/addresses table. Here we can avoid all those
# selects, because the rowid attribute of the cursor is set to that
# id field right after we execute the (individual) insert statements.
- fields = ('type', 'nickname', 'fingerprint', 'running',
- 'time_published', 'OR_port', 'dir_port', 'consensus_weight',
- 'country_code', 'hostname', 'time_lookup')
- insert_stmt = ('insert into %s (%s) values (%s)' %
- (summary_tbl_name, ','.join(fields), ','.join(['?']*len(fields))))
- with open(SUMMARY) as f:
+ summary_fields = ('type', 'nickname', 'fingerprint', 'running',
+ 'time_published', 'OR_port', 'dir_port', 'consensus_weight',
+ 'country_code', 'hostname', 'time_lookup')
+ addresses_fields = ('id_of_row', 'address')
+ flags_fields = ('id_of_row', 'flag')
+
+ insert_stmt = 'insert into %s (%s) values (%s)'
+
+ # create insertion statement for summary table
+ summary_insert_stmt = (insert_stmt % (summary_tbl_name, ','.join(summary_fields),
+ ','.join(['?']*len(summary_fields))))
+
+ # create insertion statement for addresses table
+ addresses_insert_stmt = (insert_stmt % (addresses_tbl_name, ','.join(addresses_fields),
+ ','.join(['?']*len(addresses_fields))))
+
+ # create insertion statement for flags table
+ flags_insert_stmt = (insert_stmt % (flags_tbl_name, ','.join(flags_fields),
+ ','.join(['?']*len(flags_fields))))
+ if not summary_file:
+ # raise Exception?
+ return
+
+ with open(summary_file) as f:
for line in f.readlines():
- router = Router(line)
-
- router_type = 'r' if router.is_relay else 'b'
-
- # parser.Router.consensus_weight is a string???
- router_tuple = (router_type, router.nickname, router.fingerprint, router.is_running, router.time_published, router.orport, router.dirport, int(router.consensus_weight), router.country_code, router.hostname, router.time_of_lookup)
+ router = Router()
+ router.parse(line)
+
+ router_tuple = (router.type, router.nickname, router.fingerprint,
+ router.running, router.time_published, router.orport,
+ router.dirport, router.consensus_weight, router.country_code,
+ router.hostname, router.time_of_lookup)
# TODO: Determine whether sqlite3 optimizes by remembering
# this insert command and not parsing it every time it sees
# it. This is mentioned in PEP 249, but we aren't sure where
# the PEP says that implementations might optimize in this way,
# or might allow users to optimize in this way.
- CURSOR.execute(insert_stmt, router_tuple)
+ CURSOR.execute(summary_insert_stmt, router_tuple)
id_num = CURSOR.lastrowid
- address_info = (id_num, router.address)
- CURSOR.execute('INSERT INTO addresses (id_of_row, address) VALUES (?,?)', address_info)
+ address_values = (id_num, router.address)
+ CURSOR.execute(addresses_insert_stmt, address_values)
- flags = router.flags
- for flag in flags:
- flag_info = (id_num, flag)
- CURSOR.execute('INSERT INTO flags (id_of_row, flag) VALUES (?,?)', flag_info)
+ for flag in router.flags:
+ flag_values = (id_num, flag)
+ CURSOR.execute(flags_insert_stmt, flag_values)
conn.commit()
-
+ logging.info("Table updated")
DB_CREATION_TIME = time.time()
- return conn
-
-def freshen_database():
- global FRESHEN_TIMER
-
- if DB_CREATION_TIME < os.stat(SUMMARY).st_mtime:
- update_database()
-
- FRESHEN_TIMER = threading.Timer(
- DB_UPDATE_INTERVAL, freshen_database)
+ FRESHEN_TIMER = threading.Timer(DB_UPDATE_INTERVAL, update_databases, summary_file)
FRESHEN_TIMER.start()
def cancel_freshen():
FRESHEN_TIMER.cancel()
-def get_database():
+def get_database_conn():
conn = sqlite3.connect(DBNAME)
- conn.row_factory = sqlite3.Row
return conn
-def query_summary_tbl(
- running_filter=None, type_filter=None, hex_fingerprint_filter=None,
- country_filter=None, search_filter=None,
- order_field=None, order_asc=True, offset_value=None, limit_value=None,
- fields=('fingerprint',)):
-
- conn = get_database()
+def query_summary_tbl(running_filter=None, type_filter=None, hex_fingerprint_filter=None,
+ country_filter=None, search_filter=None, order_field=None,
+ order_asc=True, offset_value=None, limit_value=None,
+ fields=('fingerprint',)):
+ conn = get_database_conn()
# Build up a WHERE clause based on the request parameters. We only
# consider the case in which the client specifies 'search' or
@@ -228,86 +232,82 @@ def query_summary_tbl(
# Actually, this is a moderately painful parameter to implement.
# Testing for an IP address probably means using regular expressions.
- # SQLite doesn't suppor them without a user-defined function.
+ # SQLite doesn't support them without a user-defined function.
# Matching against a Python RE is easy to do, but then we have
# to have a where clause that matches against the beginning of a
# field value, and SQLite doesn't appear to support such a search
# (unless, of course, you want to write a user-defined match()
# function).
-
pass
else:
- if running_filter != None:
+ if running_filter:
clauses.append("running = %s" % int(running_filter))
- if type_filter != None:
+ if type_filter:
clauses.append("type = '%s'" % type_filter)
- if hex_fingerprint_filter != None:
+ if hex_fingerprint_filter:
clauses.append("fingerprint = '%s'" % hex_fingerprint_filter)
- if country_filter != None:
+ if country_filter:
clauses.append("country = '%s'" % country_filter)
where_clause = ('WHERE %s' % ' and '.join(clauses)) if clauses else ''
# Construct the ORDER, LIMIT, and OFFSET clauses.
order_clause = ''
- if order_field != None:
- order_clause = 'ORDER BY %s %s' % (order_field,
- 'ASC' if order_asc else 'DESC')
+ if order_field:
+ order_clause = 'ORDER BY %s %s' % (order_field,
+ 'ASC' if order_asc else 'DESC')
limit_clause = ''
- if limit_value != None:
+ if limit_value:
limit_clause = 'LIMIT %s' % limit_value
offset_clause = ''
- if offset_value != None:
+ if offset_value:
offset_clause = 'OFFSET %s' % offset_value
cursor = conn.cursor()
- cursor.execute('SELECT %s FROM summary %s %s %s %s' % (','.join(fields),
- where_clause, order_clause, limit_clause, offset_clause))
+ cursor.execute('SELECT %s FROM summary %s %s %s %s' %
+ (','.join(fields), where_clause, order_clause, limit_clause,
+ offset_clause))
return cursor.fetchall()
-
-def get_summary_routers(
- running_filter=None, type_filter=None, hex_fingerprint_filter=None,
- country_filter=None, search_filter=None,
- order_field=None, order_asc=True, offset_value=None, limit_value=None):
+def get_summary_routers(running_filter=None, type_filter=None, hex_fingerprint_filter=None,
+ country_filter=None, search_filter=None, order_field=None,
+ order_asc=True, offset_value=None, limit_value=None):
"""
Get summary document according to request parameters.
@rtype: tuple.
@return: tuple of form (relays, bridges, relays_time, bridges_time), where
- * relays (bridges) is a list of sqlite3.Row, each of which consists of the
- various attributes. The important part is that each element of
- relays (bridges) can be treated as a dictionary, with keys
- the same as the database fields.
- * relays_time (bridges_time) is a datetime object with the most
- recent timestamp of the relay descriptors in relays.
+ * relays/bridges is a list of Router objects
+ * relays_time/bridges_time is a datetime object with the most
+ recent timestamp of the relay descriptors in relays.
"""
# Timestamps of most recent relay/bridge in the returned set.
relay_timestamp = datetime.datetime(1900, 1, 1, 1, 0)
bridge_timestamp = datetime.datetime(1900, 1, 1, 1, 0)
- filtered_relays, filtered_bridges = [], []
- fields = ('type', 'nickname', 'fingerprint', 'running', 'country_code',
+ relays, bridges = [], []
+ fields = ('type', 'nickname', 'fingerprint', 'running', 'country_code',
'time_published', 'consensus_weight')
- for row in query_summary_tbl(
- running_filter, type_filter, hex_fingerprint_filter,
- country_filter, search_filter,
- order_field, order_asc, offset_value, limit_value, fields):
+ for row in query_summary_tbl(running_filter, type_filter, hex_fingerprint_filter,
+ country_filter, search_filter,order_field, order_asc,
+ offset_value, limit_value, fields):
- # Set the destination list for this router.
- dest = filtered_relays if row[0] == 'r' else filtered_bridges
- dest.append(row)
+ current_router = datetime.datetime.strptime(row[5], "%Y-%m-%d %H:%M:%S")
+ router = Router()
+
+ # This is magic
+ map(lambda (attr, value): setattr(router, attr, value), zip(fields, row))
- current_router = datetime.datetime.strptime(row[5], "%Y-%m-%d %H:%M:%S")
if row[0] == 'r':
+ relays.append(router)
if current_router > relay_timestamp:
relay_timestamp = current_router
if row[0] == 'b':
+ bridges.append(router)
if current_router > bridge_timestamp:
bridge_timestamp = current_router
- total_routers = (filtered_relays, filtered_bridges, relay_timestamp, bridge_timestamp)
+ total_routers = (relays, bridges, relay_timestamp, bridge_timestamp)
return total_routers
-
diff --git a/pyonionoo/handlers/summary.py b/pyonionoo/handlers/summary.py
index 7260c77..ca12a0f 100644
--- a/pyonionoo/handlers/summary.py
+++ b/pyonionoo/handlers/summary.py
@@ -35,13 +35,12 @@ class SummaryHandler(cyclone.web.RequestHandler):
relays, bridges = [], []
filtered_relays, filtered_bridges, relay_timestamp, bridge_timestamp = routers
- for (src, dest) in ((filtered_relays, relays),
- (filtered_bridges, bridges)):
+ for (src, dest) in ((filtered_relays, relays), (filtered_bridges, bridges)):
for router in src:
dest.append({
- 'n' : router['nickname'],
- 'f' : router['fingerprint'],
- 'r' : bool(router['running'])
+ 'n' : router.nickname,
+ 'f' : router.fingerprint,
+ 'r' : bool(router.running)
})
# response is a dict, so the order is not maintained. should the
diff --git a/pyonionoo/parser.py b/pyonionoo/parser.py
index e7f27e8..5105782 100644
--- a/pyonionoo/parser.py
+++ b/pyonionoo/parser.py
@@ -2,7 +2,7 @@ import re
import datetime
class Router:
- def __init__(self, raw_content):
+ def __init__(self):
self.nickname = None
self.fingerprint = None
self.address = None
@@ -13,21 +13,21 @@ class Router:
self.orport = None
self.dirport = None
self.flags = None
- self.is_running = False
+ self.running = False
self.consensus_weight = None
self.country_code = None
self.hostname = None
self.time_of_lookup = None
- self.is_relay = None
- self._parse(raw_content)
+ self.type = None
- def _parse(self, raw_content):
+ def parse(self, raw_content):
values = raw_content.split()
if len(values) < 9:
#raise Exception
raise ValueError("Invalid router!")
- if values[0] == "r":
- self.is_relay = True
+ if values[0] == "r": self.type = "r"
+ else: self.type = "b"
+
self.nickname = values[1]
self.fingerprint = values[2]
@@ -51,8 +51,8 @@ class Router:
self.flags = values[8].split(',')
for flag in self.flags:
if flag == "Running":
- self.is_running = True
- self.consensus_weight = values[9]
+ self.running = True
+ self.consensus_weight = int(values[9])
self.country_code = values[10]
if values[11] != "null" : self.hostname = values[11]
self.time_of_lookup = int(values[12])
diff --git a/pyonionoo/web.py b/pyonionoo/web.py
index 72ae9b1..b83cbcd 100644
--- a/pyonionoo/web.py
+++ b/pyonionoo/web.py
@@ -36,7 +36,7 @@ class Application(cyclone.web.Application):
if not settings['metrics_out']:
raise ValueError
- database.create_database(settings['metrics_out'])
+ database.bootstrap_database(settings['metrics_out'], settings['summary_file'])
cyclone.web.Application.__init__(self, handlers, **settings)