commit 58cf97dd1bddc1caa00731867ea3300b3a08d141
Author: Karsten Loesing <karsten.loesing(a)gmx.net>
Date: Tue Jan 21 18:27:39 2014 +0100
Move plain Java classes to modules/legacy/ subdirectory.
---
build.xml | 71 --
config.template | 47 -
db/tordir.sql | 1005 ------------------
modules/legacy/build.xml | 55 +
modules/legacy/config.template | 47 +
modules/legacy/db/tordir.sql | 1005 ++++++++++++++++++
.../org/torproject/ernie/cron/Configuration.java | 163 +++
.../src/org/torproject/ernie/cron/LockFile.java | 52 +
.../ernie/cron/LoggingConfiguration.java | 94 ++
.../legacy/src/org/torproject/ernie/cron/Main.java | 100 ++
.../cron/RelayDescriptorDatabaseImporter.java | 1077 ++++++++++++++++++++
.../cron/network/ConsensusStatsFileHandler.java | 380 +++++++
.../cron/performance/PerformanceStatsImporter.java | 271 +++++
.../ernie/cron/performance/TorperfProcessor.java | 374 +++++++
run-web.sh | 3 +
run.sh | 5 -
shared/bin/01-rsync-descriptors.sh | 3 +
shared/bin/10-legacy.sh | 10 +
shared/bin/99-copy-stats-files.sh | 4 +
src/org/torproject/ernie/cron/Configuration.java | 163 ---
src/org/torproject/ernie/cron/LockFile.java | 52 -
.../ernie/cron/LoggingConfiguration.java | 94 --
src/org/torproject/ernie/cron/Main.java | 100 --
.../cron/RelayDescriptorDatabaseImporter.java | 1077 --------------------
.../cron/network/ConsensusStatsFileHandler.java | 380 -------
.../cron/performance/PerformanceStatsImporter.java | 271 -----
.../ernie/cron/performance/TorperfProcessor.java | 374 -------
27 files changed, 3638 insertions(+), 3639 deletions(-)
diff --git a/build.xml b/build.xml
deleted file mode 100644
index 393e2d8..0000000
--- a/build.xml
+++ /dev/null
@@ -1,71 +0,0 @@
-<project default="run" name="metrics-web" basedir=".">
-
- <!-- Define build paths. -->
- <property name="sources" value="src"/>
- <property name="classes" value="classes"/>
- <property name="config" value="etc"/>
- <property name="warfile" value="ernie.war"/>
- <path id="classpath">
- <pathelement path="${classes}"/>
- <fileset dir="/usr/share/java">
- <include name="commons-codec.jar"/>
- <include name="commons-compress.jar"/>
- <include name="postgresql-jdbc3.jar"/>
- <include name="junit4.jar"/>
- <include name="servlet-api-3.0.jar"/>
- <include name="commons-lang.jar"/>
- </fileset>
- <fileset dir="deps/metrics-lib">
- <include name="descriptor.jar"/>
- </fileset>
- <fileset dir="lib">
- <include name="REngine.jar"/>
- <include name="RserveEngine.jar"/>
- </fileset>
- </path>
-
- <target name="init">
- <copy file="config.template" tofile="config"/>
- <mkdir dir="${classes}"/>
- </target>
- <target name="metrics-lib">
- <ant dir="deps/metrics-lib"/>
- </target>
-
- <!-- Compile all plain Java classes. -->
- <target name="compile" depends="metrics-lib,init">
- <javac destdir="${classes}"
- srcdir="${sources}"
- source="1.5"
- target="1.5"
- debug="true"
- deprecation="true"
- optimize="false"
- failonerror="true"
- includeantruntime="false">
- <classpath refid="classpath"/>
- </javac>
- </target>
-
- <!-- Prepare data for being displayed on the website. -->
- <target name="run" depends="compile">
- <java fork="true"
- maxmemory="1024m"
- classname="org.torproject.ernie.cron.Main">
- <classpath refid="classpath"/>
- </java>
- </target>
-
- <!-- Run unit tests. -->
- <target name="test" depends="compile">
- <junit haltonfailure="true" printsummary="off">
- <classpath refid="classpath"/>
- <formatter type="plain" usefile="false"/>
- <batchtest>
- <fileset dir="${classes}"
- includes="**/*Test.class"/>
- </batchtest>
- </junit>
- </target>
-</project>
-
diff --git a/config.template b/config.template
deleted file mode 100644
index 8f0789b..0000000
--- a/config.template
+++ /dev/null
@@ -1,47 +0,0 @@
-## Import directory archives from disk, if available
-#ImportDirectoryArchives 0
-#
-## Relative path to directory to import directory archives from
-#DirectoryArchivesDirectory in/relay-descriptors/
-#
-## Keep a history of imported directory archive files to know which files
-## have been imported before. This history can be useful when importing
-## from a changing source to avoid importing descriptors over and over
-## again, but it can be confusing to users who don't know about it.
-#KeepDirectoryArchiveImportHistory 0
-#
-## Import sanitized bridges from disk, if available
-#ImportSanitizedBridges 0
-#
-## Relative path to directory to import sanitized bridges from
-#SanitizedBridgesDirectory in/bridge-descriptors/
-#
-## Keep a history of imported sanitized bridge descriptors. This history
-## can be useful when importing from a changing data source to avoid
-## importing descriptors more than once, but it can be confusing to users
-## who don't know about it.
-#KeepSanitizedBridgesImportHistory 0
-#
-## Write relay descriptors to the database
-#WriteRelayDescriptorDatabase 0
-#
-## JDBC string for relay descriptor database
-#RelayDescriptorDatabaseJDBC jdbc:postgresql://localhost/tordir?user=metrics&password=password
-#
-## Write relay descriptors to raw text files for importing them into the
-## database using PostgreSQL's \copy command
-#WriteRelayDescriptorsRawFiles 0
-#
-## Relative path to directory to write raw text files; note that existing
-## files will be overwritten!
-#RelayDescriptorRawFilesDirectory pg-import/
-#
-## Write bridge stats to disk
-#WriteBridgeStats 0
-#
-## Import torperf data, if available, and write stats to disk
-#ImportWriteTorperfStats 0
-#
-## Relative path to directory to import torperf results from
-#TorperfDirectory in/torperf/
-#
diff --git a/db/tordir.sql b/db/tordir.sql
deleted file mode 100644
index cd2ed6a..0000000
--- a/db/tordir.sql
+++ /dev/null
@@ -1,1005 +0,0 @@
--- Copyright 2010 The Tor Project
--- See LICENSE for licensing information
-
-CREATE LANGUAGE plpgsql;
-
--- TABLE descriptor
--- Contains all of the descriptors published by routers.
-CREATE TABLE descriptor (
- descriptor CHARACTER(40) NOT NULL,
- nickname CHARACTER VARYING(19) NOT NULL,
- address CHARACTER VARYING(15) NOT NULL,
- orport INTEGER NOT NULL,
- dirport INTEGER NOT NULL,
- fingerprint CHARACTER(40) NOT NULL,
- bandwidthavg BIGINT NOT NULL,
- bandwidthburst BIGINT NOT NULL,
- bandwidthobserved BIGINT NOT NULL,
- platform CHARACTER VARYING(256),
- published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
- uptime BIGINT,
- extrainfo CHARACTER(40),
- CONSTRAINT descriptor_pkey PRIMARY KEY (descriptor)
-);
-
--- Contains bandwidth histories reported by relays in extra-info
--- descriptors. Each row contains the reported bandwidth in 15-minute
--- intervals for each relay and date.
-CREATE TABLE bwhist (
- fingerprint CHARACTER(40) NOT NULL,
- date DATE NOT NULL,
- read BIGINT[],
- read_sum BIGINT,
- written BIGINT[],
- written_sum BIGINT,
- dirread BIGINT[],
- dirread_sum BIGINT,
- dirwritten BIGINT[],
- dirwritten_sum BIGINT,
- CONSTRAINT bwhist_pkey PRIMARY KEY (fingerprint, date)
-);
-
-CREATE INDEX bwhist_date ON bwhist (date);
-
--- TABLE statusentry
--- Contains all of the consensus entries published by the directories.
--- Each statusentry references a valid descriptor.
-CREATE TABLE statusentry (
- validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
- nickname CHARACTER VARYING(19) NOT NULL,
- fingerprint CHARACTER(40) NOT NULL,
- descriptor CHARACTER(40) NOT NULL,
- published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
- address CHARACTER VARYING(15) NOT NULL,
- orport INTEGER NOT NULL,
- dirport INTEGER NOT NULL,
- isauthority BOOLEAN DEFAULT FALSE NOT NULL,
- isbadexit BOOLEAN DEFAULT FALSE NOT NULL,
- isbaddirectory BOOLEAN DEFAULT FALSE NOT NULL,
- isexit BOOLEAN DEFAULT FALSE NOT NULL,
- isfast BOOLEAN DEFAULT FALSE NOT NULL,
- isguard BOOLEAN DEFAULT FALSE NOT NULL,
- ishsdir BOOLEAN DEFAULT FALSE NOT NULL,
- isnamed BOOLEAN DEFAULT FALSE NOT NULL,
- isstable BOOLEAN DEFAULT FALSE NOT NULL,
- isrunning BOOLEAN DEFAULT FALSE NOT NULL,
- isunnamed BOOLEAN DEFAULT FALSE NOT NULL,
- isvalid BOOLEAN DEFAULT FALSE NOT NULL,
- isv2dir BOOLEAN DEFAULT FALSE NOT NULL,
- isv3dir BOOLEAN DEFAULT FALSE NOT NULL,
- version CHARACTER VARYING(50),
- bandwidth BIGINT,
- ports TEXT,
- rawdesc BYTEA NOT NULL
-);
-
-CREATE OR REPLACE FUNCTION statusentry_insert_trigger()
-RETURNS TRIGGER AS $$
-
-DECLARE
- tablename TEXT;
- selectresult TEXT;
- nextmonth TIMESTAMP WITHOUT TIME ZONE;
- v_year INTEGER;
- v_month INTEGER;
- n_year INTEGER;
- n_month INTEGER;
-
-BEGIN
- v_year := extract(YEAR FROM NEW.validafter);
- v_month := extract(MONTH FROM NEW.validafter);
- tablename := 'statusentry_y' || v_year || 'm' ||
- TO_CHAR(NEW.validafter, 'mm');
- EXECUTE 'SELECT relname FROM pg_class WHERE relname = '''|| tablename ||
- '''' INTO selectresult;
- IF selectresult IS NULL THEN
- nextmonth := new.validafter + interval '1 month';
- n_year := extract(YEAR FROM nextmonth);
- n_month := extract(MONTH FROM nextmonth);
- EXECUTE 'CREATE TABLE ' || tablename ||
- ' ( CHECK ( validafter >= ''' || v_year || '-' ||
- TO_CHAR(NEW.validafter, 'mm') || '-01 00:00:00'' ' ||
- 'AND validafter < ''' || n_year || '-' ||
- TO_CHAR(nextmonth, 'mm') ||
- '-01 00:00:00'') ) INHERITS (statusentry)';
- EXECUTE 'ALTER TABLE ' || tablename || ' ADD CONSTRAINT ' ||
- tablename || '_pkey PRIMARY KEY (validafter, fingerprint)';
- EXECUTE 'CREATE INDEX ' || tablename || '_address ON ' ||
- tablename || ' (address)';
- EXECUTE 'CREATE INDEX ' || tablename || '_fingerprint ON ' ||
- tablename || ' (fingerprint)';
- EXECUTE 'CREATE INDEX ' || tablename || '_nickname ON ' ||
- tablename || ' (LOWER(nickname))';
- EXECUTE 'CREATE INDEX ' || tablename || '_validafter ON ' ||
- tablename || ' (validafter)';
- EXECUTE 'CREATE INDEX ' || tablename || '_descriptor ON ' ||
- tablename || ' (descriptor)';
- EXECUTE 'CREATE INDEX ' || tablename || '_validafter_date ON ' ||
- tablename || ' (DATE(validafter))';
- END IF;
- EXECUTE 'INSERT INTO ' || tablename || ' SELECT ($1).*' USING NEW;
- RETURN NULL;
-END;
-$$ LANGUAGE plpgsql;
-
-CREATE TRIGGER insert_statusentry_trigger
- BEFORE INSERT ON statusentry
- FOR EACH ROW EXECUTE PROCEDURE statusentry_insert_trigger();
-
--- TABLE consensus
--- Contains all of the consensuses published by the directories.
-CREATE TABLE consensus (
- validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
- CONSTRAINT consensus_pkey PRIMARY KEY (validafter)
-);
-
--- TABLE connbidirect
--- Contain conn-bi-direct stats strings
-CREATE TABLE connbidirect (
- source CHARACTER(40) NOT NULL,
- statsend TIMESTAMP WITHOUT TIME ZONE NOT NULL,
- seconds INTEGER NOT NULL,
- belownum BIGINT NOT NULL,
- readnum BIGINT NOT NULL,
- writenum BIGINT NOT NULL,
- bothnum BIGINT NOT NULL,
- CONSTRAINT connbidirect_pkey PRIMARY KEY (source, statsend)
-);
-
--- TABLE network_size
-CREATE TABLE network_size (
- date DATE NOT NULL,
- avg_running INTEGER NOT NULL,
- avg_exit INTEGER NOT NULL,
- avg_guard INTEGER NOT NULL,
- avg_fast INTEGER NOT NULL,
- avg_stable INTEGER NOT NULL,
- avg_authority INTEGER NOT NULL,
- avg_badexit INTEGER NOT NULL,
- avg_baddirectory INTEGER NOT NULL,
- avg_hsdir INTEGER NOT NULL,
- avg_named INTEGER NOT NULL,
- avg_unnamed INTEGER NOT NULL,
- avg_valid INTEGER NOT NULL,
- avg_v2dir INTEGER NOT NULL,
- avg_v3dir INTEGER NOT NULL,
- CONSTRAINT network_size_pkey PRIMARY KEY(date)
-);
-
--- TABLE relay_countries
-CREATE TABLE relay_countries (
- date DATE NOT NULL,
- country CHARACTER(2) NOT NULL,
- relays INTEGER NOT NULL,
- CONSTRAINT relay_countries_pkey PRIMARY KEY(date, country)
-);
-
--- TABLE relay_platforms
-CREATE TABLE relay_platforms (
- date DATE NOT NULL,
- avg_linux INTEGER NOT NULL,
- avg_darwin INTEGER NOT NULL,
- avg_bsd INTEGER NOT NULL,
- avg_windows INTEGER NOT NULL,
- avg_other INTEGER NOT NULL,
- CONSTRAINT relay_platforms_pkey PRIMARY KEY(date)
-);
-
--- TABLE relay_versions
-CREATE TABLE relay_versions (
- date DATE NOT NULL,
- version CHARACTER(5) NOT NULL,
- relays INTEGER NOT NULL,
- CONSTRAINT relay_versions_pkey PRIMARY KEY(date, version)
-);
-
--- TABLE total_bandwidth
--- Contains information for the whole network's total bandwidth which is
--- used in the bandwidth graphs.
-CREATE TABLE total_bandwidth (
- date DATE NOT NULL,
- bwavg BIGINT NOT NULL,
- bwburst BIGINT NOT NULL,
- bwobserved BIGINT NOT NULL,
- bwadvertised BIGINT NOT NULL,
- CONSTRAINT total_bandwidth_pkey PRIMARY KEY(date)
-);
-
--- TABLE total_bwhist
--- Contains the total number of read/written and the number of dir bytes
--- read/written by all relays in the network on a given day. The dir bytes
--- are an estimate based on the subset of relays that count dir bytes.
-CREATE TABLE total_bwhist (
- date DATE NOT NULL,
- read BIGINT,
- written BIGINT,
- CONSTRAINT total_bwhist_pkey PRIMARY KEY(date)
-);
-
--- TABLE bandwidth_flags
-CREATE TABLE bandwidth_flags (
- date DATE NOT NULL,
- isexit BOOLEAN NOT NULL,
- isguard BOOLEAN NOT NULL,
- bwadvertised BIGINT NOT NULL,
- CONSTRAINT bandwidth_flags_pkey PRIMARY KEY(date, isexit, isguard)
-);
-
--- TABLE bwhist_flags
-CREATE TABLE bwhist_flags (
- date DATE NOT NULL,
- isexit BOOLEAN NOT NULL,
- isguard BOOLEAN NOT NULL,
- read BIGINT,
- written BIGINT,
- CONSTRAINT bwhist_flags_pkey PRIMARY KEY(date, isexit, isguard)
-);
-
--- TABLE user_stats
--- Aggregate statistics on directory requests and byte histories that we
--- use to estimate user numbers.
-CREATE TABLE user_stats (
- date DATE NOT NULL,
- country CHARACTER(2) NOT NULL,
- r BIGINT,
- dw BIGINT,
- dr BIGINT,
- drw BIGINT,
- drr BIGINT,
- bw BIGINT,
- br BIGINT,
- bwd BIGINT,
- brd BIGINT,
- bwr BIGINT,
- brr BIGINT,
- bwdr BIGINT,
- brdr BIGINT,
- bwp BIGINT,
- brp BIGINT,
- bwn BIGINT,
- brn BIGINT,
- CONSTRAINT user_stats_pkey PRIMARY KEY(date, country)
-);
-
--- TABLE relay_statuses_per_day
--- A helper table which is commonly used to update the tables above in the
--- refresh_* functions.
-CREATE TABLE relay_statuses_per_day (
- date DATE NOT NULL,
- count INTEGER NOT NULL,
- CONSTRAINT relay_statuses_per_day_pkey PRIMARY KEY(date)
-);
-
--- Dates to be included in the next refresh run.
-CREATE TABLE scheduled_updates (
- id SERIAL,
- date DATE NOT NULL
-);
-
--- Dates in the current refresh run. When starting a refresh run, we copy
--- the rows from scheduled_updates here in order to delete just those
--- lines after the refresh run. Otherwise we might forget scheduled dates
--- that have been added during a refresh run. If this happens we're going
--- to update these dates in the next refresh run.
-CREATE TABLE updates (
- id INTEGER,
- date DATE
-);
-
--- FUNCTION refresh_relay_statuses_per_day()
--- Updates helper table which is used to refresh the aggregate tables.
-CREATE OR REPLACE FUNCTION refresh_relay_statuses_per_day()
-RETURNS INTEGER AS $$
- BEGIN
- DELETE FROM relay_statuses_per_day
- WHERE date IN (SELECT date FROM updates);
- INSERT INTO relay_statuses_per_day (date, count)
- SELECT DATE(validafter) AS date, COUNT(*) AS count
- FROM consensus
- WHERE DATE(validafter) >= (SELECT MIN(date) FROM updates)
- AND DATE(validafter) <= (SELECT MAX(date) FROM updates)
- AND DATE(validafter) IN (SELECT date FROM updates)
- GROUP BY DATE(validafter);
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
-CREATE OR REPLACE FUNCTION array_sum (BIGINT[]) RETURNS BIGINT AS $$
- SELECT SUM($1[i])::bigint
- FROM generate_series(array_lower($1, 1), array_upper($1, 1)) index(i);
-$$ LANGUAGE SQL;
-
-CREATE OR REPLACE FUNCTION insert_bwhist(
- insert_fingerprint CHARACTER(40), insert_date DATE,
- insert_read BIGINT[], insert_written BIGINT[],
- insert_dirread BIGINT[], insert_dirwritten BIGINT[])
- RETURNS INTEGER AS $$
- BEGIN
- IF (SELECT COUNT(*) FROM bwhist
- WHERE fingerprint = insert_fingerprint AND date = insert_date) = 0
- THEN
- INSERT INTO bwhist (fingerprint, date, read, written, dirread,
- dirwritten)
- VALUES (insert_fingerprint, insert_date, insert_read, insert_written,
- insert_dirread, insert_dirwritten);
- ELSE
- BEGIN
- UPDATE bwhist
- SET read[array_lower(insert_read, 1):
- array_upper(insert_read, 1)] = insert_read,
- written[array_lower(insert_written, 1):
- array_upper(insert_written, 1)] = insert_written,
- dirread[array_lower(insert_dirread, 1):
- array_upper(insert_dirread, 1)] = insert_dirread,
- dirwritten[array_lower(insert_dirwritten, 1):
- array_upper(insert_dirwritten, 1)] = insert_dirwritten
- WHERE fingerprint = insert_fingerprint AND date = insert_date;
- -- Updating twice is an ugly workaround for PostgreSQL bug 5840
- UPDATE bwhist
- SET read[array_lower(insert_read, 1):
- array_upper(insert_read, 1)] = insert_read,
- written[array_lower(insert_written, 1):
- array_upper(insert_written, 1)] = insert_written,
- dirread[array_lower(insert_dirread, 1):
- array_upper(insert_dirread, 1)] = insert_dirread,
- dirwritten[array_lower(insert_dirwritten, 1):
- array_upper(insert_dirwritten, 1)] = insert_dirwritten
- WHERE fingerprint = insert_fingerprint AND date = insert_date;
- END;
- END IF;
- UPDATE bwhist
- SET read_sum = array_sum(read),
- written_sum = array_sum(written),
- dirread_sum = array_sum(dirread),
- dirwritten_sum = array_sum(dirwritten)
- WHERE fingerprint = insert_fingerprint AND date = insert_date;
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
--- refresh_* functions
--- The following functions keep their corresponding aggregate tables
--- up-to-date. They should be called every time ERNIE is run, or when new
--- data is finished being added to the descriptor or statusentry tables.
--- They find what new data has been entered or updated based on the
--- updates table.
-
--- FUNCTION refresh_network_size()
-CREATE OR REPLACE FUNCTION refresh_network_size() RETURNS INTEGER AS $$
- DECLARE
- min_date TIMESTAMP WITHOUT TIME ZONE;
- max_date TIMESTAMP WITHOUT TIME ZONE;
- BEGIN
-
- min_date := (SELECT MIN(date) FROM updates);
- max_date := (SELECT MAX(date) + 1 FROM updates);
-
- DELETE FROM network_size
- WHERE date IN (SELECT date FROM updates);
-
- EXECUTE '
- INSERT INTO network_size
- (date, avg_running, avg_exit, avg_guard, avg_fast, avg_stable,
- avg_authority, avg_badexit, avg_baddirectory, avg_hsdir,
- avg_named, avg_unnamed, avg_valid, avg_v2dir, avg_v3dir)
- SELECT date,
- isrunning / count AS avg_running,
- isexit / count AS avg_exit,
- isguard / count AS avg_guard,
- isfast / count AS avg_fast,
- isstable / count AS avg_stable,
- isauthority / count as avg_authority,
- isbadexit / count as avg_badexit,
- isbaddirectory / count as avg_baddirectory,
- ishsdir / count as avg_hsdir,
- isnamed / count as avg_named,
- isunnamed / count as avg_unnamed,
- isvalid / count as avg_valid,
- isv2dir / count as avg_v2dir,
- isv3dir / count as avg_v3dir
- FROM (
- SELECT DATE(validafter) AS date,
- COUNT(*) AS isrunning,
- COUNT(NULLIF(isexit, FALSE)) AS isexit,
- COUNT(NULLIF(isguard, FALSE)) AS isguard,
- COUNT(NULLIF(isfast, FALSE)) AS isfast,
- COUNT(NULLIF(isstable, FALSE)) AS isstable,
- COUNT(NULLIF(isauthority, FALSE)) AS isauthority,
- COUNT(NULLIF(isbadexit, FALSE)) AS isbadexit,
- COUNT(NULLIF(isbaddirectory, FALSE)) AS isbaddirectory,
- COUNT(NULLIF(ishsdir, FALSE)) AS ishsdir,
- COUNT(NULLIF(isnamed, FALSE)) AS isnamed,
- COUNT(NULLIF(isunnamed, FALSE)) AS isunnamed,
- COUNT(NULLIF(isvalid, FALSE)) AS isvalid,
- COUNT(NULLIF(isv2dir, FALSE)) AS isv2dir,
- COUNT(NULLIF(isv3dir, FALSE)) AS isv3dir
- FROM statusentry
- WHERE isrunning = TRUE
- AND validafter >= ''' || min_date || '''
- AND validafter < ''' || max_date || '''
- AND DATE(validafter) IN (SELECT date FROM updates)
- GROUP BY DATE(validafter)
- ) b
- NATURAL JOIN relay_statuses_per_day';
-
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
--- FUNCTION refresh_relay_platforms()
-CREATE OR REPLACE FUNCTION refresh_relay_platforms() RETURNS INTEGER AS $$
- DECLARE
- min_date TIMESTAMP WITHOUT TIME ZONE;
- max_date TIMESTAMP WITHOUT TIME ZONE;
- BEGIN
-
- min_date := (SELECT MIN(date) FROM updates);
- max_date := (SELECT MAX(date) + 1 FROM updates);
-
- DELETE FROM relay_platforms
- WHERE date IN (SELECT date FROM updates);
-
- EXECUTE '
- INSERT INTO relay_platforms
- (date, avg_linux, avg_darwin, avg_bsd, avg_windows, avg_other)
- SELECT date,
- linux / count AS avg_linux,
- darwin / count AS avg_darwin,
- bsd / count AS avg_bsd,
- windows / count AS avg_windows,
- other / count AS avg_other
- FROM (
- SELECT DATE(validafter) AS date,
- SUM(CASE WHEN platform LIKE ''%Linux%'' THEN 1 ELSE 0 END)
- AS linux,
- SUM(CASE WHEN platform LIKE ''%Darwin%'' THEN 1 ELSE 0 END)
- AS darwin,
- SUM(CASE WHEN platform LIKE ''%BSD%'' THEN 1 ELSE 0 END)
- AS bsd,
- SUM(CASE WHEN platform LIKE ''%Windows%'' THEN 1 ELSE 0 END)
- AS windows,
- SUM(CASE WHEN platform NOT LIKE ''%Windows%''
- AND platform NOT LIKE ''%Darwin%''
- AND platform NOT LIKE ''%BSD%''
- AND platform NOT LIKE ''%Linux%'' THEN 1 ELSE 0 END)
- AS other
- FROM descriptor
- RIGHT JOIN statusentry
- ON statusentry.descriptor = descriptor.descriptor
- WHERE isrunning = TRUE
- AND validafter >= ''' || min_date || '''
- AND validafter < ''' || max_date || '''
- AND DATE(validafter) IN (SELECT date FROM updates)
- GROUP BY DATE(validafter)
- ) b
- NATURAL JOIN relay_statuses_per_day';
-
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
--- FUNCTION refresh_relay_versions()
-CREATE OR REPLACE FUNCTION refresh_relay_versions() RETURNS INTEGER AS $$
- DECLARE
- min_date TIMESTAMP WITHOUT TIME ZONE;
- max_date TIMESTAMP WITHOUT TIME ZONE;
- BEGIN
-
- min_date := (SELECT MIN(date) FROM updates);
- max_date := (SELECT MAX(date) + 1 FROM updates);
-
- DELETE FROM relay_versions
- WHERE date IN (SELECT date FROM updates);
-
- EXECUTE '
- INSERT INTO relay_versions
- (date, version, relays)
- SELECT date, version, relays / count AS relays
- FROM (
- SELECT DATE(validafter), SUBSTRING(platform, 5, 5) AS version,
- COUNT(*) AS relays
- FROM descriptor RIGHT JOIN statusentry
- ON descriptor.descriptor = statusentry.descriptor
- WHERE isrunning = TRUE
- AND platform IS NOT NULL
- AND validafter >= ''' || min_date || '''
- AND validafter < ''' || max_date || '''
- AND DATE(validafter) IN (SELECT date FROM updates)
- GROUP BY 1, 2
- ) b
- NATURAL JOIN relay_statuses_per_day';
-
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
--- FUNCTION refresh_total_bandwidth()
--- This keeps the table total_bandwidth up-to-date when necessary.
-CREATE OR REPLACE FUNCTION refresh_total_bandwidth() RETURNS INTEGER AS $$
- DECLARE
- min_date TIMESTAMP WITHOUT TIME ZONE;
- max_date TIMESTAMP WITHOUT TIME ZONE;
- BEGIN
-
- min_date := (SELECT MIN(date) FROM updates);
- max_date := (SELECT MAX(date) + 1 FROM updates);
-
- DELETE FROM total_bandwidth
- WHERE date IN (SELECT date FROM updates);
-
- EXECUTE '
- INSERT INTO total_bandwidth
- (bwavg, bwburst, bwobserved, bwadvertised, date)
- SELECT (SUM(bandwidthavg)
- / relay_statuses_per_day.count)::BIGINT AS bwavg,
- (SUM(bandwidthburst)
- / relay_statuses_per_day.count)::BIGINT AS bwburst,
- (SUM(bandwidthobserved)
- / relay_statuses_per_day.count)::BIGINT AS bwobserved,
- (SUM(LEAST(bandwidthavg, bandwidthobserved))
- / relay_statuses_per_day.count)::BIGINT AS bwadvertised,
- DATE(validafter)
- FROM descriptor RIGHT JOIN statusentry
- ON descriptor.descriptor = statusentry.descriptor
- JOIN relay_statuses_per_day
- ON DATE(validafter) = relay_statuses_per_day.date
- WHERE isrunning = TRUE
- AND validafter >= ''' || min_date || '''
- AND validafter < ''' || max_date || '''
- AND DATE(validafter) IN (SELECT date FROM updates)
- AND relay_statuses_per_day.date >= ''' || min_date || '''
- AND relay_statuses_per_day.date < ''' || max_date || '''
- AND DATE(relay_statuses_per_day.date) IN
- (SELECT date FROM updates)
- GROUP BY DATE(validafter), relay_statuses_per_day.count';
-
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
-CREATE OR REPLACE FUNCTION refresh_total_bwhist() RETURNS INTEGER AS $$
- BEGIN
- DELETE FROM total_bwhist WHERE date IN (SELECT date FROM updates);
- INSERT INTO total_bwhist (date, read, written)
- SELECT date, SUM(read_sum) AS read, SUM(written_sum) AS written
- FROM bwhist
- WHERE date >= (SELECT MIN(date) FROM updates)
- AND date <= (SELECT MAX(date) FROM updates)
- AND date IN (SELECT date FROM updates)
- GROUP BY date;
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
-CREATE OR REPLACE FUNCTION refresh_bandwidth_flags() RETURNS INTEGER AS $$
- DECLARE
- min_date TIMESTAMP WITHOUT TIME ZONE;
- max_date TIMESTAMP WITHOUT TIME ZONE;
- BEGIN
-
- min_date := (SELECT MIN(date) FROM updates);
- max_date := (SELECT MAX(date) + 1 FROM updates);
-
- DELETE FROM bandwidth_flags WHERE date IN (SELECT date FROM updates);
- EXECUTE '
- INSERT INTO bandwidth_flags (date, isexit, isguard, bwadvertised)
- SELECT DATE(validafter) AS date,
- BOOL_OR(isexit) AS isexit,
- BOOL_OR(isguard) AS isguard,
- (SUM(LEAST(bandwidthavg, bandwidthobserved))
- / relay_statuses_per_day.count)::BIGINT AS bwadvertised
- FROM descriptor RIGHT JOIN statusentry
- ON descriptor.descriptor = statusentry.descriptor
- JOIN relay_statuses_per_day
- ON DATE(validafter) = relay_statuses_per_day.date
- WHERE isrunning = TRUE
- AND validafter >= ''' || min_date || '''
- AND validafter < ''' || max_date || '''
- AND DATE(validafter) IN (SELECT date FROM updates)
- AND relay_statuses_per_day.date >= ''' || min_date || '''
- AND relay_statuses_per_day.date < ''' || max_date || '''
- AND DATE(relay_statuses_per_day.date) IN
- (SELECT date FROM updates)
- GROUP BY DATE(validafter), isexit, isguard, relay_statuses_per_day.count';
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
-CREATE OR REPLACE FUNCTION refresh_bwhist_flags() RETURNS INTEGER AS $$
- DECLARE
- min_date TIMESTAMP WITHOUT TIME ZONE;
- max_date TIMESTAMP WITHOUT TIME ZONE;
- BEGIN
-
- min_date := (SELECT MIN(date) FROM updates);
- max_date := (SELECT MAX(date) + 1 FROM updates);
-
- DELETE FROM bwhist_flags WHERE date IN (SELECT date FROM updates);
- EXECUTE '
- INSERT INTO bwhist_flags (date, isexit, isguard, read, written)
- SELECT a.date, isexit, isguard, SUM(read_sum) as read,
- SUM(written_sum) AS written
- FROM
- (SELECT DATE(validafter) AS date,
- fingerprint,
- BOOL_OR(isexit) AS isexit,
- BOOL_OR(isguard) AS isguard
- FROM statusentry
- WHERE isrunning = TRUE
- AND validafter >= ''' || min_date || '''
- AND validafter < ''' || max_date || '''
- AND DATE(validafter) IN (SELECT date FROM updates)
- GROUP BY 1, 2) a
- JOIN bwhist
- ON a.date = bwhist.date
- AND a.fingerprint = bwhist.fingerprint
- GROUP BY 1, 2, 3';
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
--- FUNCTION refresh_user_stats()
--- This function refreshes our user statistics by weighting reported
--- directory request statistics of directory mirrors with bandwidth
--- histories.
-CREATE OR REPLACE FUNCTION refresh_user_stats() RETURNS INTEGER AS $$
- DECLARE
- min_date TIMESTAMP WITHOUT TIME ZONE;
- max_date TIMESTAMP WITHOUT TIME ZONE;
- BEGIN
-
- min_date := (SELECT MIN(date) FROM updates);
- max_date := (SELECT MAX(date) + 1 FROM updates);
-
- -- Start by deleting user statistics of the dates we're about to
- -- regenerate.
- DELETE FROM user_stats WHERE date IN (SELECT date FROM updates);
- -- Now insert new user statistics.
- EXECUTE '
- INSERT INTO user_stats (date, country, r, dw, dr, drw, drr, bw, br, bwd,
- brd, bwr, brr, bwdr, brdr, bwp, brp, bwn, brn)
- SELECT
- -- We want to learn about total requests by date and country.
- dirreq_stats_by_country.date AS date,
- dirreq_stats_by_country.country AS country,
- dirreq_stats_by_country.r AS r,
- -- In order to weight the reported directory requests, we are
- -- counting bytes of relays (except directory authorities)
- -- matching certain criteria: whether or not they are reporting
- -- directory requests, whether or not they are reporting
- -- directory bytes, and whether their directory port is open or
- -- closed.
- SUM(CASE WHEN authority IS NOT NULL
- THEN NULL ELSE dirwritten END) AS dw,
- SUM(CASE WHEN authority IS NOT NULL
- THEN NULL ELSE dirread END) AS dr,
- SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL
- THEN NULL ELSE dirwritten END) AS dwr,
- SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL
- THEN NULL ELSE dirread END) AS drr,
- SUM(CASE WHEN authority IS NOT NULL
- THEN NULL ELSE written END) AS bw,
- SUM(CASE WHEN authority IS NOT NULL
- THEN NULL ELSE read END) AS br,
- SUM(CASE WHEN dirwritten = 0 OR authority IS NOT NULL
- THEN NULL ELSE written END) AS bwd,
- SUM(CASE WHEN dirwritten = 0 OR authority IS NOT NULL
- THEN NULL ELSE read END) AS brd,
- SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL
- THEN NULL ELSE written END) AS bwr,
- SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL
- THEN NULL ELSE read END) AS brr,
- SUM(CASE WHEN dirwritten = 0 OR requests IS NULL
- OR authority IS NOT NULL THEN NULL ELSE written END) AS bwdr,
- SUM(CASE WHEN dirwritten = 0 OR requests IS NULL
- OR authority IS NOT NULL THEN NULL ELSE read END) AS brdr,
- SUM(CASE WHEN opendirport IS NULL OR authority IS NOT NULL
- THEN NULL ELSE written END) AS bwp,
- SUM(CASE WHEN opendirport IS NULL OR authority IS NOT NULL
- THEN NULL ELSE read END) AS brp,
- SUM(CASE WHEN opendirport IS NOT NULL OR authority IS NOT NULL
- THEN NULL ELSE written END) AS bwn,
- SUM(CASE WHEN opendirport IS NOT NULL OR authority IS NOT NULL
- THEN NULL ELSE read END) AS brn
- FROM (
- -- The first sub-select tells us the total number of directory
- -- requests per country reported by all directory mirrors.
- SELECT dirreq_stats_by_date.date AS date, country, SUM(requests) AS r
- FROM (
- SELECT fingerprint, date, country, SUM(requests) AS requests
- FROM (
- -- There are two selects here, because in most cases the directory
- -- request statistics cover two calendar dates.
- SELECT LOWER(source) AS fingerprint, DATE(statsend) AS date,
- country, FLOOR(requests * (CASE
- WHEN EXTRACT(EPOCH FROM DATE(statsend)) >
- EXTRACT(EPOCH FROM statsend) - seconds
- THEN EXTRACT(EPOCH FROM statsend) -
- EXTRACT(EPOCH FROM DATE(statsend))
- ELSE seconds END) / seconds) AS requests
- FROM dirreq_stats
- UNION
- SELECT LOWER(source) AS fingerprint, DATE(statsend) - 1 AS date,
- country, FLOOR(requests *
- (EXTRACT(EPOCH FROM DATE(statsend)) -
- EXTRACT(EPOCH FROM statsend) + seconds)
- / seconds) AS requests
- FROM dirreq_stats
- WHERE EXTRACT(EPOCH FROM DATE(statsend)) -
- EXTRACT(EPOCH FROM statsend) + seconds > 0
- ) dirreq_stats_split
- GROUP BY 1, 2, 3
- ) dirreq_stats_by_date
- -- We are only interested in requests by directory mirrors, not
- -- directory authorities, so we exclude all relays with the Authority
- -- flag.
- RIGHT JOIN (
- SELECT fingerprint, DATE(validafter) AS date
- FROM statusentry
- WHERE validafter >= ''' || min_date || '''
- AND validafter < ''' || max_date || '''
- AND DATE(validafter) IN (SELECT date FROM updates)
- AND isauthority IS FALSE
- GROUP BY 1, 2
- ) statusentry_dirmirrors
- ON dirreq_stats_by_date.fingerprint =
- statusentry_dirmirrors.fingerprint
- AND dirreq_stats_by_date.date = statusentry_dirmirrors.date
- GROUP BY 1, 2
- ) dirreq_stats_by_country
- LEFT JOIN (
- -- In the next step, we expand the result by bandwidth histories of
- -- all relays.
- SELECT fingerprint, date, read_sum AS read, written_sum AS written,
- dirread_sum AS dirread, dirwritten_sum AS dirwritten
- FROM bwhist
- WHERE date >= ''' || min_date || '''
- AND date < ''' || max_date || '''
- AND date IN (SELECT date FROM updates)
- ) bwhist_by_relay
- ON dirreq_stats_by_country.date = bwhist_by_relay.date
- LEFT JOIN (
- -- For each relay, tell how often it had an open directory port and
- -- how often it had the Authority flag assigned on a given date.
- SELECT fingerprint, DATE(validafter) AS date,
- SUM(CASE WHEN dirport > 0 THEN 1 ELSE NULL END) AS opendirport,
- SUM(CASE WHEN isauthority IS TRUE THEN 1 ELSE NULL END) AS authority
- FROM statusentry
- WHERE validafter >= ''' || min_date || '''
- AND validafter < ''' || max_date || '''
- AND DATE(validafter) IN (SELECT date FROM updates)
- GROUP BY 1, 2
- ) statusentry_by_relay
- ON bwhist_by_relay.fingerprint = statusentry_by_relay.fingerprint
- AND bwhist_by_relay.date = statusentry_by_relay.date
- LEFT JOIN (
- -- For each relay, tell if it has reported directory request
- -- statistics on a given date. Again, we have to take into account
- -- that statistics intervals cover more than one calendar date in most
- -- cases. The exact number of requests is not relevant here, but only
- -- whether the relay reported directory requests or not.
- SELECT fingerprint, date, 1 AS requests
- FROM (
- SELECT LOWER(source) AS fingerprint, DATE(statsend) AS date
- FROM dirreq_stats
- WHERE DATE(statsend) >= ''' || min_date || '''
- AND DATE(statsend) < ''' || max_date || '''
- AND DATE(statsend) IN (SELECT date FROM updates)
- AND country = ''zy''
- UNION
- SELECT LOWER(source) AS fingerprint, DATE(statsend) - 1 AS date
- FROM dirreq_stats
- WHERE DATE(statsend) - 1 >= ''' || min_date || '''
- AND DATE(statsend) - 1 < ''' || max_date || '''
- AND DATE(statsend) IN (SELECT date FROM updates)
- AND country = ''zy''
- AND EXTRACT(EPOCH FROM DATE(statsend)) -
- EXTRACT(EPOCH FROM statsend) + seconds > 0
- ) dirreq_stats_split
- GROUP BY 1, 2
- ) dirreq_stats_by_relay
- ON bwhist_by_relay.fingerprint = dirreq_stats_by_relay.fingerprint
- AND bwhist_by_relay.date = dirreq_stats_by_relay.date
- WHERE dirreq_stats_by_country.country IS NOT NULL
- -- Group by date, country, and total reported directory requests,
- -- summing up the bandwidth histories.
- GROUP BY 1, 2, 3';
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
--- non-relay statistics
--- The following tables contain pre-aggregated statistics that are not
--- based on relay descriptors or that are not yet derived from the relay
--- descriptors in the database.
-
--- TABLE bridge_network_size
--- Contains average number of running bridges.
-CREATE TABLE bridge_network_size (
- "date" DATE NOT NULL,
- avg_running INTEGER NOT NULL,
- avg_running_ec2 INTEGER NOT NULL,
- CONSTRAINT bridge_network_size_pkey PRIMARY KEY(date)
-);
-
--- TABLE dirreq_stats
--- Contains daily users by country.
-CREATE TABLE dirreq_stats (
- source CHARACTER(40) NOT NULL,
- statsend TIMESTAMP WITHOUT TIME ZONE NOT NULL,
- seconds INTEGER NOT NULL,
- country CHARACTER(2) NOT NULL,
- requests INTEGER NOT NULL,
- CONSTRAINT dirreq_stats_pkey
- PRIMARY KEY (source, statsend, seconds, country)
-);
-
--- TABLE torperf_stats
--- Quantiles and medians of daily torperf results.
-CREATE TABLE torperf_stats (
- "date" DATE NOT NULL,
- source CHARACTER VARYING(32) NOT NULL,
- q1 INTEGER NOT NULL,
- md INTEGER NOT NULL,
- q3 INTEGER NOT NULL,
- timeouts INTEGER NOT NULL,
- failures INTEGER NOT NULL,
- requests INTEGER NOT NULL,
- CONSTRAINT torperf_stats_pkey PRIMARY KEY("date", source)
-);
-
--- Refresh all statistics in the database.
-CREATE OR REPLACE FUNCTION refresh_all() RETURNS INTEGER AS $$
- BEGIN
- RAISE NOTICE '% Starting refresh run.', timeofday();
- RAISE NOTICE '% Deleting old dates from updates.', timeofday();
- DELETE FROM updates;
- RAISE NOTICE '% Copying scheduled dates.', timeofday();
- INSERT INTO updates SELECT * FROM scheduled_updates;
- RAISE NOTICE '% Refreshing relay statuses per day.', timeofday();
- PERFORM refresh_relay_statuses_per_day();
- RAISE NOTICE '% Refreshing network size.', timeofday();
- PERFORM refresh_network_size();
- RAISE NOTICE '% Refreshing relay platforms.', timeofday();
- PERFORM refresh_relay_platforms();
- RAISE NOTICE '% Refreshing relay versions.', timeofday();
- PERFORM refresh_relay_versions();
- RAISE NOTICE '% Refreshing total relay bandwidth.', timeofday();
- PERFORM refresh_total_bandwidth();
- RAISE NOTICE '% Refreshing relay bandwidth history.', timeofday();
- PERFORM refresh_total_bwhist();
- RAISE NOTICE '% Refreshing total relay bandwidth by flags.', timeofday();
- PERFORM refresh_bandwidth_flags();
- RAISE NOTICE '% Refreshing bandwidth history by flags.', timeofday();
- PERFORM refresh_bwhist_flags();
- RAISE NOTICE '% Refreshing user statistics.', timeofday();
- PERFORM refresh_user_stats();
- RAISE NOTICE '% Deleting processed dates.', timeofday();
- DELETE FROM scheduled_updates WHERE id IN (SELECT id FROM updates);
- RAISE NOTICE '% Terminating refresh run.', timeofday();
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
--- View for exporting server statistics.
-CREATE VIEW stats_servers AS
- (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
- NULL AS platform, TRUE AS ec2bridge, NULL AS relays,
- avg_running_ec2 AS bridges FROM bridge_network_size
- WHERE date < current_date - 1)
-UNION ALL
- (SELECT COALESCE(network_size.date, bridge_network_size.date) AS date,
- NULL AS flag, NULL AS country, NULL AS version, NULL AS platform,
- NULL AS ec2bridge, network_size.avg_running AS relays,
- bridge_network_size.avg_running AS bridges FROM network_size
- FULL OUTER JOIN bridge_network_size
- ON network_size.date = bridge_network_size.date
- WHERE COALESCE(network_size.date, bridge_network_size.date) <
- current_date - 1)
-UNION ALL
- (SELECT date, 'Exit' AS flag, NULL AS country, NULL AS version,
- NULL AS platform, NULL AS ec2bridge, avg_exit AS relays,
- NULL AS bridges FROM network_size WHERE date < current_date - 1)
-UNION ALL
- (SELECT date, 'Guard' AS flag, NULL AS country, NULL AS version,
- NULL AS platform, NULL AS ec2bridge, avg_guard AS relays,
- NULL AS bridges FROM network_size WHERE date < current_date - 1)
-UNION ALL
- (SELECT date, 'Fast' AS flag, NULL AS country, NULL AS version,
- NULL AS platform, NULL AS ec2bridge, avg_fast AS relays,
- NULL AS bridges FROM network_size WHERE date < current_date - 1)
-UNION ALL
- (SELECT date, 'Stable' AS flag, NULL AS country, NULL AS version,
- NULL AS platform, NULL AS ec2bridge, avg_stable AS relays,
- NULL AS bridges FROM network_size WHERE date < current_date - 1)
-UNION ALL
- (SELECT date, 'HSDir' AS flag, NULL AS country, NULL AS version,
- NULL AS platform, NULL AS ec2bridge, avg_hsdir AS relays,
- NULL AS bridges FROM network_size WHERE date < current_date - 1)
-UNION ALL
- (SELECT date, NULL AS flag, CASE WHEN country != 'zz' THEN country
- ELSE '??' END AS country, NULL AS version, NULL AS platform,
- NULL AS ec2bridge, relays, NULL AS bridges FROM relay_countries
- WHERE date < current_date - 1)
-UNION ALL
- (SELECT date, NULL AS flag, NULL AS country, version, NULL AS platform,
- NULL AS ec2bridge, relays, NULL AS bridges FROM relay_versions
- WHERE date < current_date - 1)
-UNION ALL
- (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
- 'Linux' AS platform, NULL AS ec2bridge, avg_linux AS relays,
- NULL AS bridges FROM relay_platforms WHERE date < current_date - 1)
-UNION ALL
- (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
- 'Darwin' AS platform, NULL AS ec2bridge, avg_darwin AS relays,
- NULL AS bridges FROM relay_platforms WHERE date < current_date - 1)
-UNION ALL
- (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
- 'FreeBSD' AS platform, NULL AS ec2bridge, avg_bsd AS relays,
- NULL AS bridges FROM relay_platforms WHERE date < current_date - 1)
-UNION ALL
- (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
- 'Windows' AS platform, NULL AS ec2bridge, avg_windows AS relays,
- NULL AS bridges FROM relay_platforms WHERE date < current_date - 1)
-UNION ALL
- (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
- 'Other' AS platform, NULL AS ec2bridge, avg_other AS relays,
- NULL AS bridges FROM relay_platforms WHERE date < current_date - 1)
-ORDER BY 1, 2, 3, 4, 5, 6;
-
--- View for exporting bandwidth statistics.
-CREATE VIEW stats_bandwidth AS
- (SELECT COALESCE(bandwidth_flags.date, bwhist_flags.date) AS date,
- COALESCE(bandwidth_flags.isexit, bwhist_flags.isexit) AS isexit,
- COALESCE(bandwidth_flags.isguard, bwhist_flags.isguard) AS isguard,
- bandwidth_flags.bwadvertised AS advbw,
- CASE WHEN bwhist_flags.read IS NOT NULL
- THEN bwhist_flags.read / 86400 END AS bwread,
- CASE WHEN bwhist_flags.written IS NOT NULL
- THEN bwhist_flags.written / 86400 END AS bwwrite,
- NULL AS dirread, NULL AS dirwrite
- FROM bandwidth_flags FULL OUTER JOIN bwhist_flags
- ON bandwidth_flags.date = bwhist_flags.date
- AND bandwidth_flags.isexit = bwhist_flags.isexit
- AND bandwidth_flags.isguard = bwhist_flags.isguard
- WHERE COALESCE(bandwidth_flags.date, bwhist_flags.date) <
- current_date - 3)
-UNION ALL
- (SELECT COALESCE(total_bandwidth.date, total_bwhist.date, u.date)
- AS date, NULL AS isexit, NULL AS isguard,
- total_bandwidth.bwadvertised AS advbw,
- CASE WHEN total_bwhist.read IS NOT NULL
- THEN total_bwhist.read / 86400 END AS bwread,
- CASE WHEN total_bwhist.written IS NOT NULL
- THEN total_bwhist.written / 86400 END AS bwwrite,
- CASE WHEN u.date IS NOT NULL
- THEN FLOOR(CAST(u.dr AS NUMERIC) * CAST(u.brp AS NUMERIC) /
- CAST(u.brd AS NUMERIC) / CAST(86400 AS NUMERIC)) END AS dirread,
- CASE WHEN u.date IS NOT NULL
- THEN FLOOR(CAST(u.dw AS NUMERIC) * CAST(u.bwp AS NUMERIC) /
- CAST(u.bwd AS NUMERIC) / CAST(86400 AS NUMERIC)) END AS dirwrite
- FROM total_bandwidth FULL OUTER JOIN total_bwhist
- ON total_bandwidth.date = total_bwhist.date
- FULL OUTER JOIN (SELECT * FROM user_stats WHERE country = 'zy'
- AND bwp / bwd <= 3) u
- ON COALESCE(total_bandwidth.date, total_bwhist.date) = u.date
- WHERE COALESCE(total_bandwidth.date, total_bwhist.date, u.date) <
- current_date - 3)
-ORDER BY 1, 2, 3;
-
--- View for exporting torperf statistics.
-CREATE VIEW stats_torperf AS
-SELECT date, CASE WHEN source LIKE '%-50kb' THEN 50 * 1024
- WHEN source LIKE '%-1mb' THEN 1024 * 1024
- WHEN source LIKE '%-5mb' THEN 5 * 1024 * 1024 END AS size,
- CASE WHEN source NOT LIKE 'all-%'
- THEN split_part(source, '-', 1) END AS source, q1, md, q3, timeouts,
- failures, requests FROM torperf_stats WHERE date < current_date - 1
- ORDER BY 1, 2, 3;
-
--- View for exporting connbidirect statistics.
-CREATE VIEW stats_connbidirect AS
-SELECT DATE(statsend) AS date, source, belownum AS below, readnum AS read,
- writenum AS write, bothnum AS "both" FROM connbidirect
- WHERE DATE(statsend) < current_date - 1 ORDER BY 1, 2;
-
diff --git a/modules/legacy/build.xml b/modules/legacy/build.xml
new file mode 100644
index 0000000..bf0b65d
--- /dev/null
+++ b/modules/legacy/build.xml
@@ -0,0 +1,55 @@
+<project default="run" name="metrics-web" basedir=".">
+
+ <!-- Define build paths. -->
+ <property name="sources" value="src"/>
+ <property name="classes" value="classes"/>
+ <property name="config" value="etc"/>
+ <property name="warfile" value="ernie.war"/>
+ <path id="classpath">
+ <pathelement path="${classes}"/>
+ <fileset dir="/usr/share/java">
+ <include name="commons-codec.jar"/>
+ <include name="commons-compress.jar"/>
+ <include name="postgresql-jdbc3.jar"/>
+ <include name="junit4.jar"/>
+ <include name="servlet-api-3.0.jar"/>
+ <include name="commons-lang.jar"/>
+ </fileset>
+ <fileset dir="../../deps/metrics-lib">
+ <include name="descriptor.jar"/>
+ </fileset>
+ </path>
+
+ <target name="init">
+ <copy file="config.template" tofile="config"/>
+ <mkdir dir="${classes}"/>
+ </target>
+ <target name="metrics-lib">
+ <ant dir="../../deps/metrics-lib"/>
+ </target>
+
+ <!-- Compile all plain Java classes. -->
+ <target name="compile" depends="metrics-lib,init">
+ <javac destdir="${classes}"
+ srcdir="${sources}"
+ source="1.5"
+ target="1.5"
+ debug="true"
+ deprecation="true"
+ optimize="false"
+ failonerror="true"
+ includeantruntime="false">
+ <classpath refid="classpath"/>
+ </javac>
+ </target>
+
+ <!-- Prepare data for being displayed on the website. -->
+ <target name="run" depends="compile">
+ <java fork="true"
+ maxmemory="1024m"
+ classname="org.torproject.ernie.cron.Main">
+ <classpath refid="classpath"/>
+ </java>
+ </target>
+</project>
+
diff --git a/modules/legacy/config.template b/modules/legacy/config.template
new file mode 100644
index 0000000..8f0789b
--- /dev/null
+++ b/modules/legacy/config.template
@@ -0,0 +1,47 @@
+## Import directory archives from disk, if available
+#ImportDirectoryArchives 0
+#
+## Relative path to directory to import directory archives from
+#DirectoryArchivesDirectory in/relay-descriptors/
+#
+## Keep a history of imported directory archive files to know which files
+## have been imported before. This history can be useful when importing
+## from a changing source to avoid importing descriptors over and over
+## again, but it can be confusing to users who don't know about it.
+#KeepDirectoryArchiveImportHistory 0
+#
+## Import sanitized bridges from disk, if available
+#ImportSanitizedBridges 0
+#
+## Relative path to directory to import sanitized bridges from
+#SanitizedBridgesDirectory in/bridge-descriptors/
+#
+## Keep a history of imported sanitized bridge descriptors. This history
+## can be useful when importing from a changing data source to avoid
+## importing descriptors more than once, but it can be confusing to users
+## who don't know about it.
+#KeepSanitizedBridgesImportHistory 0
+#
+## Write relay descriptors to the database
+#WriteRelayDescriptorDatabase 0
+#
+## JDBC string for relay descriptor database
+#RelayDescriptorDatabaseJDBC jdbc:postgresql://localhost/tordir?user=metrics&password=password
+#
+## Write relay descriptors to raw text files for importing them into the
+## database using PostgreSQL's \copy command
+#WriteRelayDescriptorsRawFiles 0
+#
+## Relative path to directory to write raw text files; note that existing
+## files will be overwritten!
+#RelayDescriptorRawFilesDirectory pg-import/
+#
+## Write bridge stats to disk
+#WriteBridgeStats 0
+#
+## Import torperf data, if available, and write stats to disk
+#ImportWriteTorperfStats 0
+#
+## Relative path to directory to import torperf results from
+#TorperfDirectory in/torperf/
+#
diff --git a/modules/legacy/db/tordir.sql b/modules/legacy/db/tordir.sql
new file mode 100644
index 0000000..cd2ed6a
--- /dev/null
+++ b/modules/legacy/db/tordir.sql
@@ -0,0 +1,1005 @@
+-- Copyright 2010 The Tor Project
+-- See LICENSE for licensing information
+
+CREATE LANGUAGE plpgsql;
+
+-- TABLE descriptor
+-- Contains all of the descriptors published by routers.
+CREATE TABLE descriptor (
+ descriptor CHARACTER(40) NOT NULL,
+ nickname CHARACTER VARYING(19) NOT NULL,
+ address CHARACTER VARYING(15) NOT NULL,
+ orport INTEGER NOT NULL,
+ dirport INTEGER NOT NULL,
+ fingerprint CHARACTER(40) NOT NULL,
+ bandwidthavg BIGINT NOT NULL,
+ bandwidthburst BIGINT NOT NULL,
+ bandwidthobserved BIGINT NOT NULL,
+ platform CHARACTER VARYING(256),
+ published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ uptime BIGINT,
+ extrainfo CHARACTER(40),
+ CONSTRAINT descriptor_pkey PRIMARY KEY (descriptor)
+);
+
+-- Contains bandwidth histories reported by relays in extra-info
+-- descriptors. Each row contains the reported bandwidth in 15-minute
+-- intervals for each relay and date.
+CREATE TABLE bwhist (
+ fingerprint CHARACTER(40) NOT NULL,
+ date DATE NOT NULL,
+ read BIGINT[],
+ read_sum BIGINT,
+ written BIGINT[],
+ written_sum BIGINT,
+ dirread BIGINT[],
+ dirread_sum BIGINT,
+ dirwritten BIGINT[],
+ dirwritten_sum BIGINT,
+ CONSTRAINT bwhist_pkey PRIMARY KEY (fingerprint, date)
+);
+
+CREATE INDEX bwhist_date ON bwhist (date);
+
+-- TABLE statusentry
+-- Contains all of the consensus entries published by the directories.
+-- Each statusentry references a valid descriptor.
+CREATE TABLE statusentry (
+ validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ nickname CHARACTER VARYING(19) NOT NULL,
+ fingerprint CHARACTER(40) NOT NULL,
+ descriptor CHARACTER(40) NOT NULL,
+ published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ address CHARACTER VARYING(15) NOT NULL,
+ orport INTEGER NOT NULL,
+ dirport INTEGER NOT NULL,
+ isauthority BOOLEAN DEFAULT FALSE NOT NULL,
+ isbadexit BOOLEAN DEFAULT FALSE NOT NULL,
+ isbaddirectory BOOLEAN DEFAULT FALSE NOT NULL,
+ isexit BOOLEAN DEFAULT FALSE NOT NULL,
+ isfast BOOLEAN DEFAULT FALSE NOT NULL,
+ isguard BOOLEAN DEFAULT FALSE NOT NULL,
+ ishsdir BOOLEAN DEFAULT FALSE NOT NULL,
+ isnamed BOOLEAN DEFAULT FALSE NOT NULL,
+ isstable BOOLEAN DEFAULT FALSE NOT NULL,
+ isrunning BOOLEAN DEFAULT FALSE NOT NULL,
+ isunnamed BOOLEAN DEFAULT FALSE NOT NULL,
+ isvalid BOOLEAN DEFAULT FALSE NOT NULL,
+ isv2dir BOOLEAN DEFAULT FALSE NOT NULL,
+ isv3dir BOOLEAN DEFAULT FALSE NOT NULL,
+ version CHARACTER VARYING(50),
+ bandwidth BIGINT,
+ ports TEXT,
+ rawdesc BYTEA NOT NULL
+);
+
+CREATE OR REPLACE FUNCTION statusentry_insert_trigger()
+RETURNS TRIGGER AS $$
+
+DECLARE
+ tablename TEXT;
+ selectresult TEXT;
+ nextmonth TIMESTAMP WITHOUT TIME ZONE;
+ v_year INTEGER;
+ v_month INTEGER;
+ n_year INTEGER;
+ n_month INTEGER;
+
+BEGIN
+ v_year := extract(YEAR FROM NEW.validafter);
+ v_month := extract(MONTH FROM NEW.validafter);
+ tablename := 'statusentry_y' || v_year || 'm' ||
+ TO_CHAR(NEW.validafter, 'mm');
+ EXECUTE 'SELECT relname FROM pg_class WHERE relname = '''|| tablename ||
+ '''' INTO selectresult;
+ IF selectresult IS NULL THEN
+ nextmonth := new.validafter + interval '1 month';
+ n_year := extract(YEAR FROM nextmonth);
+ n_month := extract(MONTH FROM nextmonth);
+ EXECUTE 'CREATE TABLE ' || tablename ||
+ ' ( CHECK ( validafter >= ''' || v_year || '-' ||
+ TO_CHAR(NEW.validafter, 'mm') || '-01 00:00:00'' ' ||
+ 'AND validafter < ''' || n_year || '-' ||
+ TO_CHAR(nextmonth, 'mm') ||
+ '-01 00:00:00'') ) INHERITS (statusentry)';
+ EXECUTE 'ALTER TABLE ' || tablename || ' ADD CONSTRAINT ' ||
+ tablename || '_pkey PRIMARY KEY (validafter, fingerprint)';
+ EXECUTE 'CREATE INDEX ' || tablename || '_address ON ' ||
+ tablename || ' (address)';
+ EXECUTE 'CREATE INDEX ' || tablename || '_fingerprint ON ' ||
+ tablename || ' (fingerprint)';
+ EXECUTE 'CREATE INDEX ' || tablename || '_nickname ON ' ||
+ tablename || ' (LOWER(nickname))';
+ EXECUTE 'CREATE INDEX ' || tablename || '_validafter ON ' ||
+ tablename || ' (validafter)';
+ EXECUTE 'CREATE INDEX ' || tablename || '_descriptor ON ' ||
+ tablename || ' (descriptor)';
+ EXECUTE 'CREATE INDEX ' || tablename || '_validafter_date ON ' ||
+ tablename || ' (DATE(validafter))';
+ END IF;
+ EXECUTE 'INSERT INTO ' || tablename || ' SELECT ($1).*' USING NEW;
+ RETURN NULL;
+END;
+$$ LANGUAGE plpgsql;
+
+CREATE TRIGGER insert_statusentry_trigger
+ BEFORE INSERT ON statusentry
+ FOR EACH ROW EXECUTE PROCEDURE statusentry_insert_trigger();
+
+-- TABLE consensus
+-- Contains all of the consensuses published by the directories.
+CREATE TABLE consensus (
+ validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ CONSTRAINT consensus_pkey PRIMARY KEY (validafter)
+);
+
+-- TABLE connbidirect
+-- Contain conn-bi-direct stats strings
+CREATE TABLE connbidirect (
+ source CHARACTER(40) NOT NULL,
+ statsend TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ seconds INTEGER NOT NULL,
+ belownum BIGINT NOT NULL,
+ readnum BIGINT NOT NULL,
+ writenum BIGINT NOT NULL,
+ bothnum BIGINT NOT NULL,
+ CONSTRAINT connbidirect_pkey PRIMARY KEY (source, statsend)
+);
+
+-- TABLE network_size
+CREATE TABLE network_size (
+ date DATE NOT NULL,
+ avg_running INTEGER NOT NULL,
+ avg_exit INTEGER NOT NULL,
+ avg_guard INTEGER NOT NULL,
+ avg_fast INTEGER NOT NULL,
+ avg_stable INTEGER NOT NULL,
+ avg_authority INTEGER NOT NULL,
+ avg_badexit INTEGER NOT NULL,
+ avg_baddirectory INTEGER NOT NULL,
+ avg_hsdir INTEGER NOT NULL,
+ avg_named INTEGER NOT NULL,
+ avg_unnamed INTEGER NOT NULL,
+ avg_valid INTEGER NOT NULL,
+ avg_v2dir INTEGER NOT NULL,
+ avg_v3dir INTEGER NOT NULL,
+ CONSTRAINT network_size_pkey PRIMARY KEY(date)
+);
+
+-- TABLE relay_countries
+CREATE TABLE relay_countries (
+ date DATE NOT NULL,
+ country CHARACTER(2) NOT NULL,
+ relays INTEGER NOT NULL,
+ CONSTRAINT relay_countries_pkey PRIMARY KEY(date, country)
+);
+
+-- TABLE relay_platforms
+CREATE TABLE relay_platforms (
+ date DATE NOT NULL,
+ avg_linux INTEGER NOT NULL,
+ avg_darwin INTEGER NOT NULL,
+ avg_bsd INTEGER NOT NULL,
+ avg_windows INTEGER NOT NULL,
+ avg_other INTEGER NOT NULL,
+ CONSTRAINT relay_platforms_pkey PRIMARY KEY(date)
+);
+
+-- TABLE relay_versions
+CREATE TABLE relay_versions (
+ date DATE NOT NULL,
+ version CHARACTER(5) NOT NULL,
+ relays INTEGER NOT NULL,
+ CONSTRAINT relay_versions_pkey PRIMARY KEY(date, version)
+);
+
+-- TABLE total_bandwidth
+-- Contains information for the whole network's total bandwidth which is
+-- used in the bandwidth graphs.
+CREATE TABLE total_bandwidth (
+ date DATE NOT NULL,
+ bwavg BIGINT NOT NULL,
+ bwburst BIGINT NOT NULL,
+ bwobserved BIGINT NOT NULL,
+ bwadvertised BIGINT NOT NULL,
+ CONSTRAINT total_bandwidth_pkey PRIMARY KEY(date)
+);
+
+-- TABLE total_bwhist
+-- Contains the total number of read/written and the number of dir bytes
+-- read/written by all relays in the network on a given day. The dir bytes
+-- are an estimate based on the subset of relays that count dir bytes.
+CREATE TABLE total_bwhist (
+ date DATE NOT NULL,
+ read BIGINT,
+ written BIGINT,
+ CONSTRAINT total_bwhist_pkey PRIMARY KEY(date)
+);
+
+-- TABLE bandwidth_flags
+CREATE TABLE bandwidth_flags (
+ date DATE NOT NULL,
+ isexit BOOLEAN NOT NULL,
+ isguard BOOLEAN NOT NULL,
+ bwadvertised BIGINT NOT NULL,
+ CONSTRAINT bandwidth_flags_pkey PRIMARY KEY(date, isexit, isguard)
+);
+
+-- TABLE bwhist_flags
+CREATE TABLE bwhist_flags (
+ date DATE NOT NULL,
+ isexit BOOLEAN NOT NULL,
+ isguard BOOLEAN NOT NULL,
+ read BIGINT,
+ written BIGINT,
+ CONSTRAINT bwhist_flags_pkey PRIMARY KEY(date, isexit, isguard)
+);
+
+-- TABLE user_stats
+-- Aggregate statistics on directory requests and byte histories that we
+-- use to estimate user numbers.
+CREATE TABLE user_stats (
+ date DATE NOT NULL,
+ country CHARACTER(2) NOT NULL,
+ r BIGINT,
+ dw BIGINT,
+ dr BIGINT,
+ drw BIGINT,
+ drr BIGINT,
+ bw BIGINT,
+ br BIGINT,
+ bwd BIGINT,
+ brd BIGINT,
+ bwr BIGINT,
+ brr BIGINT,
+ bwdr BIGINT,
+ brdr BIGINT,
+ bwp BIGINT,
+ brp BIGINT,
+ bwn BIGINT,
+ brn BIGINT,
+ CONSTRAINT user_stats_pkey PRIMARY KEY(date, country)
+);
+
+-- TABLE relay_statuses_per_day
+-- A helper table which is commonly used to update the tables above in the
+-- refresh_* functions.
+CREATE TABLE relay_statuses_per_day (
+ date DATE NOT NULL,
+ count INTEGER NOT NULL,
+ CONSTRAINT relay_statuses_per_day_pkey PRIMARY KEY(date)
+);
+
+-- Dates to be included in the next refresh run.
+CREATE TABLE scheduled_updates (
+ id SERIAL,
+ date DATE NOT NULL
+);
+
+-- Dates in the current refresh run. When starting a refresh run, we copy
+-- the rows from scheduled_updates here in order to delete just those
+-- lines after the refresh run. Otherwise we might forget scheduled dates
+-- that have been added during a refresh run. If this happens we're going
+-- to update these dates in the next refresh run.
+CREATE TABLE updates (
+ id INTEGER,
+ date DATE
+);
+
+-- FUNCTION refresh_relay_statuses_per_day()
+-- Updates helper table which is used to refresh the aggregate tables.
+CREATE OR REPLACE FUNCTION refresh_relay_statuses_per_day()
+RETURNS INTEGER AS $$
+ BEGIN
+ DELETE FROM relay_statuses_per_day
+ WHERE date IN (SELECT date FROM updates);
+ INSERT INTO relay_statuses_per_day (date, count)
+ SELECT DATE(validafter) AS date, COUNT(*) AS count
+ FROM consensus
+ WHERE DATE(validafter) >= (SELECT MIN(date) FROM updates)
+ AND DATE(validafter) <= (SELECT MAX(date) FROM updates)
+ AND DATE(validafter) IN (SELECT date FROM updates)
+ GROUP BY DATE(validafter);
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION array_sum (BIGINT[]) RETURNS BIGINT AS $$
+ SELECT SUM($1[i])::bigint
+ FROM generate_series(array_lower($1, 1), array_upper($1, 1)) index(i);
+$$ LANGUAGE SQL;
+
+CREATE OR REPLACE FUNCTION insert_bwhist(
+ insert_fingerprint CHARACTER(40), insert_date DATE,
+ insert_read BIGINT[], insert_written BIGINT[],
+ insert_dirread BIGINT[], insert_dirwritten BIGINT[])
+ RETURNS INTEGER AS $$
+ BEGIN
+ IF (SELECT COUNT(*) FROM bwhist
+ WHERE fingerprint = insert_fingerprint AND date = insert_date) = 0
+ THEN
+ INSERT INTO bwhist (fingerprint, date, read, written, dirread,
+ dirwritten)
+ VALUES (insert_fingerprint, insert_date, insert_read, insert_written,
+ insert_dirread, insert_dirwritten);
+ ELSE
+ BEGIN
+ UPDATE bwhist
+ SET read[array_lower(insert_read, 1):
+ array_upper(insert_read, 1)] = insert_read,
+ written[array_lower(insert_written, 1):
+ array_upper(insert_written, 1)] = insert_written,
+ dirread[array_lower(insert_dirread, 1):
+ array_upper(insert_dirread, 1)] = insert_dirread,
+ dirwritten[array_lower(insert_dirwritten, 1):
+ array_upper(insert_dirwritten, 1)] = insert_dirwritten
+ WHERE fingerprint = insert_fingerprint AND date = insert_date;
+ -- Updating twice is an ugly workaround for PostgreSQL bug 5840
+ UPDATE bwhist
+ SET read[array_lower(insert_read, 1):
+ array_upper(insert_read, 1)] = insert_read,
+ written[array_lower(insert_written, 1):
+ array_upper(insert_written, 1)] = insert_written,
+ dirread[array_lower(insert_dirread, 1):
+ array_upper(insert_dirread, 1)] = insert_dirread,
+ dirwritten[array_lower(insert_dirwritten, 1):
+ array_upper(insert_dirwritten, 1)] = insert_dirwritten
+ WHERE fingerprint = insert_fingerprint AND date = insert_date;
+ END;
+ END IF;
+ UPDATE bwhist
+ SET read_sum = array_sum(read),
+ written_sum = array_sum(written),
+ dirread_sum = array_sum(dirread),
+ dirwritten_sum = array_sum(dirwritten)
+ WHERE fingerprint = insert_fingerprint AND date = insert_date;
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
+-- refresh_* functions
+-- The following functions keep their corresponding aggregate tables
+-- up-to-date. They should be called every time ERNIE is run, or when new
+-- data is finished being added to the descriptor or statusentry tables.
+-- They find what new data has been entered or updated based on the
+-- updates table.
+
+-- FUNCTION refresh_network_size()
+CREATE OR REPLACE FUNCTION refresh_network_size() RETURNS INTEGER AS $$
+ DECLARE
+ min_date TIMESTAMP WITHOUT TIME ZONE;
+ max_date TIMESTAMP WITHOUT TIME ZONE;
+ BEGIN
+
+ min_date := (SELECT MIN(date) FROM updates);
+ max_date := (SELECT MAX(date) + 1 FROM updates);
+
+ DELETE FROM network_size
+ WHERE date IN (SELECT date FROM updates);
+
+ EXECUTE '
+ INSERT INTO network_size
+ (date, avg_running, avg_exit, avg_guard, avg_fast, avg_stable,
+ avg_authority, avg_badexit, avg_baddirectory, avg_hsdir,
+ avg_named, avg_unnamed, avg_valid, avg_v2dir, avg_v3dir)
+ SELECT date,
+ isrunning / count AS avg_running,
+ isexit / count AS avg_exit,
+ isguard / count AS avg_guard,
+ isfast / count AS avg_fast,
+ isstable / count AS avg_stable,
+ isauthority / count as avg_authority,
+ isbadexit / count as avg_badexit,
+ isbaddirectory / count as avg_baddirectory,
+ ishsdir / count as avg_hsdir,
+ isnamed / count as avg_named,
+ isunnamed / count as avg_unnamed,
+ isvalid / count as avg_valid,
+ isv2dir / count as avg_v2dir,
+ isv3dir / count as avg_v3dir
+ FROM (
+ SELECT DATE(validafter) AS date,
+ COUNT(*) AS isrunning,
+ COUNT(NULLIF(isexit, FALSE)) AS isexit,
+ COUNT(NULLIF(isguard, FALSE)) AS isguard,
+ COUNT(NULLIF(isfast, FALSE)) AS isfast,
+ COUNT(NULLIF(isstable, FALSE)) AS isstable,
+ COUNT(NULLIF(isauthority, FALSE)) AS isauthority,
+ COUNT(NULLIF(isbadexit, FALSE)) AS isbadexit,
+ COUNT(NULLIF(isbaddirectory, FALSE)) AS isbaddirectory,
+ COUNT(NULLIF(ishsdir, FALSE)) AS ishsdir,
+ COUNT(NULLIF(isnamed, FALSE)) AS isnamed,
+ COUNT(NULLIF(isunnamed, FALSE)) AS isunnamed,
+ COUNT(NULLIF(isvalid, FALSE)) AS isvalid,
+ COUNT(NULLIF(isv2dir, FALSE)) AS isv2dir,
+ COUNT(NULLIF(isv3dir, FALSE)) AS isv3dir
+ FROM statusentry
+ WHERE isrunning = TRUE
+ AND validafter >= ''' || min_date || '''
+ AND validafter < ''' || max_date || '''
+ AND DATE(validafter) IN (SELECT date FROM updates)
+ GROUP BY DATE(validafter)
+ ) b
+ NATURAL JOIN relay_statuses_per_day';
+
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
+-- FUNCTION refresh_relay_platforms()
+CREATE OR REPLACE FUNCTION refresh_relay_platforms() RETURNS INTEGER AS $$
+ DECLARE
+ min_date TIMESTAMP WITHOUT TIME ZONE;
+ max_date TIMESTAMP WITHOUT TIME ZONE;
+ BEGIN
+
+ min_date := (SELECT MIN(date) FROM updates);
+ max_date := (SELECT MAX(date) + 1 FROM updates);
+
+ DELETE FROM relay_platforms
+ WHERE date IN (SELECT date FROM updates);
+
+ EXECUTE '
+ INSERT INTO relay_platforms
+ (date, avg_linux, avg_darwin, avg_bsd, avg_windows, avg_other)
+ SELECT date,
+ linux / count AS avg_linux,
+ darwin / count AS avg_darwin,
+ bsd / count AS avg_bsd,
+ windows / count AS avg_windows,
+ other / count AS avg_other
+ FROM (
+ SELECT DATE(validafter) AS date,
+ SUM(CASE WHEN platform LIKE ''%Linux%'' THEN 1 ELSE 0 END)
+ AS linux,
+ SUM(CASE WHEN platform LIKE ''%Darwin%'' THEN 1 ELSE 0 END)
+ AS darwin,
+ SUM(CASE WHEN platform LIKE ''%BSD%'' THEN 1 ELSE 0 END)
+ AS bsd,
+ SUM(CASE WHEN platform LIKE ''%Windows%'' THEN 1 ELSE 0 END)
+ AS windows,
+ SUM(CASE WHEN platform NOT LIKE ''%Windows%''
+ AND platform NOT LIKE ''%Darwin%''
+ AND platform NOT LIKE ''%BSD%''
+ AND platform NOT LIKE ''%Linux%'' THEN 1 ELSE 0 END)
+ AS other
+ FROM descriptor
+ RIGHT JOIN statusentry
+ ON statusentry.descriptor = descriptor.descriptor
+ WHERE isrunning = TRUE
+ AND validafter >= ''' || min_date || '''
+ AND validafter < ''' || max_date || '''
+ AND DATE(validafter) IN (SELECT date FROM updates)
+ GROUP BY DATE(validafter)
+ ) b
+ NATURAL JOIN relay_statuses_per_day';
+
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
+-- FUNCTION refresh_relay_versions()
+CREATE OR REPLACE FUNCTION refresh_relay_versions() RETURNS INTEGER AS $$
+ DECLARE
+ min_date TIMESTAMP WITHOUT TIME ZONE;
+ max_date TIMESTAMP WITHOUT TIME ZONE;
+ BEGIN
+
+ min_date := (SELECT MIN(date) FROM updates);
+ max_date := (SELECT MAX(date) + 1 FROM updates);
+
+ DELETE FROM relay_versions
+ WHERE date IN (SELECT date FROM updates);
+
+ EXECUTE '
+ INSERT INTO relay_versions
+ (date, version, relays)
+ SELECT date, version, relays / count AS relays
+ FROM (
+ SELECT DATE(validafter), SUBSTRING(platform, 5, 5) AS version,
+ COUNT(*) AS relays
+ FROM descriptor RIGHT JOIN statusentry
+ ON descriptor.descriptor = statusentry.descriptor
+ WHERE isrunning = TRUE
+ AND platform IS NOT NULL
+ AND validafter >= ''' || min_date || '''
+ AND validafter < ''' || max_date || '''
+ AND DATE(validafter) IN (SELECT date FROM updates)
+ GROUP BY 1, 2
+ ) b
+ NATURAL JOIN relay_statuses_per_day';
+
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
+-- FUNCTION refresh_total_bandwidth()
+-- This keeps the table total_bandwidth up-to-date when necessary.
+CREATE OR REPLACE FUNCTION refresh_total_bandwidth() RETURNS INTEGER AS $$
+ DECLARE
+ min_date TIMESTAMP WITHOUT TIME ZONE;
+ max_date TIMESTAMP WITHOUT TIME ZONE;
+ BEGIN
+
+ min_date := (SELECT MIN(date) FROM updates);
+ max_date := (SELECT MAX(date) + 1 FROM updates);
+
+ DELETE FROM total_bandwidth
+ WHERE date IN (SELECT date FROM updates);
+
+ EXECUTE '
+ INSERT INTO total_bandwidth
+ (bwavg, bwburst, bwobserved, bwadvertised, date)
+ SELECT (SUM(bandwidthavg)
+ / relay_statuses_per_day.count)::BIGINT AS bwavg,
+ (SUM(bandwidthburst)
+ / relay_statuses_per_day.count)::BIGINT AS bwburst,
+ (SUM(bandwidthobserved)
+ / relay_statuses_per_day.count)::BIGINT AS bwobserved,
+ (SUM(LEAST(bandwidthavg, bandwidthobserved))
+ / relay_statuses_per_day.count)::BIGINT AS bwadvertised,
+ DATE(validafter)
+ FROM descriptor RIGHT JOIN statusentry
+ ON descriptor.descriptor = statusentry.descriptor
+ JOIN relay_statuses_per_day
+ ON DATE(validafter) = relay_statuses_per_day.date
+ WHERE isrunning = TRUE
+ AND validafter >= ''' || min_date || '''
+ AND validafter < ''' || max_date || '''
+ AND DATE(validafter) IN (SELECT date FROM updates)
+ AND relay_statuses_per_day.date >= ''' || min_date || '''
+ AND relay_statuses_per_day.date < ''' || max_date || '''
+ AND DATE(relay_statuses_per_day.date) IN
+ (SELECT date FROM updates)
+ GROUP BY DATE(validafter), relay_statuses_per_day.count';
+
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION refresh_total_bwhist() RETURNS INTEGER AS $$
+ BEGIN
+ DELETE FROM total_bwhist WHERE date IN (SELECT date FROM updates);
+ INSERT INTO total_bwhist (date, read, written)
+ SELECT date, SUM(read_sum) AS read, SUM(written_sum) AS written
+ FROM bwhist
+ WHERE date >= (SELECT MIN(date) FROM updates)
+ AND date <= (SELECT MAX(date) FROM updates)
+ AND date IN (SELECT date FROM updates)
+ GROUP BY date;
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION refresh_bandwidth_flags() RETURNS INTEGER AS $$
+ DECLARE
+ min_date TIMESTAMP WITHOUT TIME ZONE;
+ max_date TIMESTAMP WITHOUT TIME ZONE;
+ BEGIN
+
+ min_date := (SELECT MIN(date) FROM updates);
+ max_date := (SELECT MAX(date) + 1 FROM updates);
+
+ DELETE FROM bandwidth_flags WHERE date IN (SELECT date FROM updates);
+ EXECUTE '
+ INSERT INTO bandwidth_flags (date, isexit, isguard, bwadvertised)
+ SELECT DATE(validafter) AS date,
+ BOOL_OR(isexit) AS isexit,
+ BOOL_OR(isguard) AS isguard,
+ (SUM(LEAST(bandwidthavg, bandwidthobserved))
+ / relay_statuses_per_day.count)::BIGINT AS bwadvertised
+ FROM descriptor RIGHT JOIN statusentry
+ ON descriptor.descriptor = statusentry.descriptor
+ JOIN relay_statuses_per_day
+ ON DATE(validafter) = relay_statuses_per_day.date
+ WHERE isrunning = TRUE
+ AND validafter >= ''' || min_date || '''
+ AND validafter < ''' || max_date || '''
+ AND DATE(validafter) IN (SELECT date FROM updates)
+ AND relay_statuses_per_day.date >= ''' || min_date || '''
+ AND relay_statuses_per_day.date < ''' || max_date || '''
+ AND DATE(relay_statuses_per_day.date) IN
+ (SELECT date FROM updates)
+ GROUP BY DATE(validafter), isexit, isguard, relay_statuses_per_day.count';
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
+CREATE OR REPLACE FUNCTION refresh_bwhist_flags() RETURNS INTEGER AS $$
+ DECLARE
+ min_date TIMESTAMP WITHOUT TIME ZONE;
+ max_date TIMESTAMP WITHOUT TIME ZONE;
+ BEGIN
+
+ min_date := (SELECT MIN(date) FROM updates);
+ max_date := (SELECT MAX(date) + 1 FROM updates);
+
+ DELETE FROM bwhist_flags WHERE date IN (SELECT date FROM updates);
+ EXECUTE '
+ INSERT INTO bwhist_flags (date, isexit, isguard, read, written)
+ SELECT a.date, isexit, isguard, SUM(read_sum) as read,
+ SUM(written_sum) AS written
+ FROM
+ (SELECT DATE(validafter) AS date,
+ fingerprint,
+ BOOL_OR(isexit) AS isexit,
+ BOOL_OR(isguard) AS isguard
+ FROM statusentry
+ WHERE isrunning = TRUE
+ AND validafter >= ''' || min_date || '''
+ AND validafter < ''' || max_date || '''
+ AND DATE(validafter) IN (SELECT date FROM updates)
+ GROUP BY 1, 2) a
+ JOIN bwhist
+ ON a.date = bwhist.date
+ AND a.fingerprint = bwhist.fingerprint
+ GROUP BY 1, 2, 3';
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
+-- FUNCTION refresh_user_stats()
+-- This function refreshes our user statistics by weighting reported
+-- directory request statistics of directory mirrors with bandwidth
+-- histories.
+CREATE OR REPLACE FUNCTION refresh_user_stats() RETURNS INTEGER AS $$
+ DECLARE
+ min_date TIMESTAMP WITHOUT TIME ZONE;
+ max_date TIMESTAMP WITHOUT TIME ZONE;
+ BEGIN
+
+ min_date := (SELECT MIN(date) FROM updates);
+ max_date := (SELECT MAX(date) + 1 FROM updates);
+
+ -- Start by deleting user statistics of the dates we're about to
+ -- regenerate.
+ DELETE FROM user_stats WHERE date IN (SELECT date FROM updates);
+ -- Now insert new user statistics.
+ EXECUTE '
+ INSERT INTO user_stats (date, country, r, dw, dr, drw, drr, bw, br, bwd,
+ brd, bwr, brr, bwdr, brdr, bwp, brp, bwn, brn)
+ SELECT
+ -- We want to learn about total requests by date and country.
+ dirreq_stats_by_country.date AS date,
+ dirreq_stats_by_country.country AS country,
+ dirreq_stats_by_country.r AS r,
+ -- In order to weight the reported directory requests, we are
+ -- counting bytes of relays (except directory authorities)
+ -- matching certain criteria: whether or not they are reporting
+ -- directory requests, whether or not they are reporting
+ -- directory bytes, and whether their directory port is open or
+ -- closed.
+ SUM(CASE WHEN authority IS NOT NULL
+ THEN NULL ELSE dirwritten END) AS dw,
+ SUM(CASE WHEN authority IS NOT NULL
+ THEN NULL ELSE dirread END) AS dr,
+ SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL
+ THEN NULL ELSE dirwritten END) AS dwr,
+ SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL
+ THEN NULL ELSE dirread END) AS drr,
+ SUM(CASE WHEN authority IS NOT NULL
+ THEN NULL ELSE written END) AS bw,
+ SUM(CASE WHEN authority IS NOT NULL
+ THEN NULL ELSE read END) AS br,
+ SUM(CASE WHEN dirwritten = 0 OR authority IS NOT NULL
+ THEN NULL ELSE written END) AS bwd,
+ SUM(CASE WHEN dirwritten = 0 OR authority IS NOT NULL
+ THEN NULL ELSE read END) AS brd,
+ SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL
+ THEN NULL ELSE written END) AS bwr,
+ SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL
+ THEN NULL ELSE read END) AS brr,
+ SUM(CASE WHEN dirwritten = 0 OR requests IS NULL
+ OR authority IS NOT NULL THEN NULL ELSE written END) AS bwdr,
+ SUM(CASE WHEN dirwritten = 0 OR requests IS NULL
+ OR authority IS NOT NULL THEN NULL ELSE read END) AS brdr,
+ SUM(CASE WHEN opendirport IS NULL OR authority IS NOT NULL
+ THEN NULL ELSE written END) AS bwp,
+ SUM(CASE WHEN opendirport IS NULL OR authority IS NOT NULL
+ THEN NULL ELSE read END) AS brp,
+ SUM(CASE WHEN opendirport IS NOT NULL OR authority IS NOT NULL
+ THEN NULL ELSE written END) AS bwn,
+ SUM(CASE WHEN opendirport IS NOT NULL OR authority IS NOT NULL
+ THEN NULL ELSE read END) AS brn
+ FROM (
+ -- The first sub-select tells us the total number of directory
+ -- requests per country reported by all directory mirrors.
+ SELECT dirreq_stats_by_date.date AS date, country, SUM(requests) AS r
+ FROM (
+ SELECT fingerprint, date, country, SUM(requests) AS requests
+ FROM (
+ -- There are two selects here, because in most cases the directory
+ -- request statistics cover two calendar dates.
+ SELECT LOWER(source) AS fingerprint, DATE(statsend) AS date,
+ country, FLOOR(requests * (CASE
+ WHEN EXTRACT(EPOCH FROM DATE(statsend)) >
+ EXTRACT(EPOCH FROM statsend) - seconds
+ THEN EXTRACT(EPOCH FROM statsend) -
+ EXTRACT(EPOCH FROM DATE(statsend))
+ ELSE seconds END) / seconds) AS requests
+ FROM dirreq_stats
+ UNION
+ SELECT LOWER(source) AS fingerprint, DATE(statsend) - 1 AS date,
+ country, FLOOR(requests *
+ (EXTRACT(EPOCH FROM DATE(statsend)) -
+ EXTRACT(EPOCH FROM statsend) + seconds)
+ / seconds) AS requests
+ FROM dirreq_stats
+ WHERE EXTRACT(EPOCH FROM DATE(statsend)) -
+ EXTRACT(EPOCH FROM statsend) + seconds > 0
+ ) dirreq_stats_split
+ GROUP BY 1, 2, 3
+ ) dirreq_stats_by_date
+ -- We are only interested in requests by directory mirrors, not
+ -- directory authorities, so we exclude all relays with the Authority
+ -- flag.
+ RIGHT JOIN (
+ SELECT fingerprint, DATE(validafter) AS date
+ FROM statusentry
+ WHERE validafter >= ''' || min_date || '''
+ AND validafter < ''' || max_date || '''
+ AND DATE(validafter) IN (SELECT date FROM updates)
+ AND isauthority IS FALSE
+ GROUP BY 1, 2
+ ) statusentry_dirmirrors
+ ON dirreq_stats_by_date.fingerprint =
+ statusentry_dirmirrors.fingerprint
+ AND dirreq_stats_by_date.date = statusentry_dirmirrors.date
+ GROUP BY 1, 2
+ ) dirreq_stats_by_country
+ LEFT JOIN (
+ -- In the next step, we expand the result by bandwidth histories of
+ -- all relays.
+ SELECT fingerprint, date, read_sum AS read, written_sum AS written,
+ dirread_sum AS dirread, dirwritten_sum AS dirwritten
+ FROM bwhist
+ WHERE date >= ''' || min_date || '''
+ AND date < ''' || max_date || '''
+ AND date IN (SELECT date FROM updates)
+ ) bwhist_by_relay
+ ON dirreq_stats_by_country.date = bwhist_by_relay.date
+ LEFT JOIN (
+ -- For each relay, tell how often it had an open directory port and
+ -- how often it had the Authority flag assigned on a given date.
+ SELECT fingerprint, DATE(validafter) AS date,
+ SUM(CASE WHEN dirport > 0 THEN 1 ELSE NULL END) AS opendirport,
+ SUM(CASE WHEN isauthority IS TRUE THEN 1 ELSE NULL END) AS authority
+ FROM statusentry
+ WHERE validafter >= ''' || min_date || '''
+ AND validafter < ''' || max_date || '''
+ AND DATE(validafter) IN (SELECT date FROM updates)
+ GROUP BY 1, 2
+ ) statusentry_by_relay
+ ON bwhist_by_relay.fingerprint = statusentry_by_relay.fingerprint
+ AND bwhist_by_relay.date = statusentry_by_relay.date
+ LEFT JOIN (
+ -- For each relay, tell if it has reported directory request
+ -- statistics on a given date. Again, we have to take into account
+ -- that statistics intervals cover more than one calendar date in most
+ -- cases. The exact number of requests is not relevant here, but only
+ -- whether the relay reported directory requests or not.
+ SELECT fingerprint, date, 1 AS requests
+ FROM (
+ SELECT LOWER(source) AS fingerprint, DATE(statsend) AS date
+ FROM dirreq_stats
+ WHERE DATE(statsend) >= ''' || min_date || '''
+ AND DATE(statsend) < ''' || max_date || '''
+ AND DATE(statsend) IN (SELECT date FROM updates)
+ AND country = ''zy''
+ UNION
+ SELECT LOWER(source) AS fingerprint, DATE(statsend) - 1 AS date
+ FROM dirreq_stats
+ WHERE DATE(statsend) - 1 >= ''' || min_date || '''
+ AND DATE(statsend) - 1 < ''' || max_date || '''
+ AND DATE(statsend) IN (SELECT date FROM updates)
+ AND country = ''zy''
+ AND EXTRACT(EPOCH FROM DATE(statsend)) -
+ EXTRACT(EPOCH FROM statsend) + seconds > 0
+ ) dirreq_stats_split
+ GROUP BY 1, 2
+ ) dirreq_stats_by_relay
+ ON bwhist_by_relay.fingerprint = dirreq_stats_by_relay.fingerprint
+ AND bwhist_by_relay.date = dirreq_stats_by_relay.date
+ WHERE dirreq_stats_by_country.country IS NOT NULL
+ -- Group by date, country, and total reported directory requests,
+ -- summing up the bandwidth histories.
+ GROUP BY 1, 2, 3';
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
+-- non-relay statistics
+-- The following tables contain pre-aggregated statistics that are not
+-- based on relay descriptors or that are not yet derived from the relay
+-- descriptors in the database.
+
+-- TABLE bridge_network_size
+-- Contains average number of running bridges.
+CREATE TABLE bridge_network_size (
+ "date" DATE NOT NULL,
+ avg_running INTEGER NOT NULL,
+ avg_running_ec2 INTEGER NOT NULL,
+ CONSTRAINT bridge_network_size_pkey PRIMARY KEY(date)
+);
+
+-- TABLE dirreq_stats
+-- Contains daily users by country.
+CREATE TABLE dirreq_stats (
+ source CHARACTER(40) NOT NULL,
+ statsend TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ seconds INTEGER NOT NULL,
+ country CHARACTER(2) NOT NULL,
+ requests INTEGER NOT NULL,
+ CONSTRAINT dirreq_stats_pkey
+ PRIMARY KEY (source, statsend, seconds, country)
+);
+
+-- TABLE torperf_stats
+-- Quantiles and medians of daily torperf results.
+CREATE TABLE torperf_stats (
+ "date" DATE NOT NULL,
+ source CHARACTER VARYING(32) NOT NULL,
+ q1 INTEGER NOT NULL,
+ md INTEGER NOT NULL,
+ q3 INTEGER NOT NULL,
+ timeouts INTEGER NOT NULL,
+ failures INTEGER NOT NULL,
+ requests INTEGER NOT NULL,
+ CONSTRAINT torperf_stats_pkey PRIMARY KEY("date", source)
+);
+
+-- Refresh all statistics in the database.
+CREATE OR REPLACE FUNCTION refresh_all() RETURNS INTEGER AS $$
+ BEGIN
+ RAISE NOTICE '% Starting refresh run.', timeofday();
+ RAISE NOTICE '% Deleting old dates from updates.', timeofday();
+ DELETE FROM updates;
+ RAISE NOTICE '% Copying scheduled dates.', timeofday();
+ INSERT INTO updates SELECT * FROM scheduled_updates;
+ RAISE NOTICE '% Refreshing relay statuses per day.', timeofday();
+ PERFORM refresh_relay_statuses_per_day();
+ RAISE NOTICE '% Refreshing network size.', timeofday();
+ PERFORM refresh_network_size();
+ RAISE NOTICE '% Refreshing relay platforms.', timeofday();
+ PERFORM refresh_relay_platforms();
+ RAISE NOTICE '% Refreshing relay versions.', timeofday();
+ PERFORM refresh_relay_versions();
+ RAISE NOTICE '% Refreshing total relay bandwidth.', timeofday();
+ PERFORM refresh_total_bandwidth();
+ RAISE NOTICE '% Refreshing relay bandwidth history.', timeofday();
+ PERFORM refresh_total_bwhist();
+ RAISE NOTICE '% Refreshing total relay bandwidth by flags.', timeofday();
+ PERFORM refresh_bandwidth_flags();
+ RAISE NOTICE '% Refreshing bandwidth history by flags.', timeofday();
+ PERFORM refresh_bwhist_flags();
+ RAISE NOTICE '% Refreshing user statistics.', timeofday();
+ PERFORM refresh_user_stats();
+ RAISE NOTICE '% Deleting processed dates.', timeofday();
+ DELETE FROM scheduled_updates WHERE id IN (SELECT id FROM updates);
+ RAISE NOTICE '% Terminating refresh run.', timeofday();
+ RETURN 1;
+ END;
+$$ LANGUAGE plpgsql;
+
+-- View for exporting server statistics.
+CREATE VIEW stats_servers AS
+ (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
+ NULL AS platform, TRUE AS ec2bridge, NULL AS relays,
+ avg_running_ec2 AS bridges FROM bridge_network_size
+ WHERE date < current_date - 1)
+UNION ALL
+ (SELECT COALESCE(network_size.date, bridge_network_size.date) AS date,
+ NULL AS flag, NULL AS country, NULL AS version, NULL AS platform,
+ NULL AS ec2bridge, network_size.avg_running AS relays,
+ bridge_network_size.avg_running AS bridges FROM network_size
+ FULL OUTER JOIN bridge_network_size
+ ON network_size.date = bridge_network_size.date
+ WHERE COALESCE(network_size.date, bridge_network_size.date) <
+ current_date - 1)
+UNION ALL
+ (SELECT date, 'Exit' AS flag, NULL AS country, NULL AS version,
+ NULL AS platform, NULL AS ec2bridge, avg_exit AS relays,
+ NULL AS bridges FROM network_size WHERE date < current_date - 1)
+UNION ALL
+ (SELECT date, 'Guard' AS flag, NULL AS country, NULL AS version,
+ NULL AS platform, NULL AS ec2bridge, avg_guard AS relays,
+ NULL AS bridges FROM network_size WHERE date < current_date - 1)
+UNION ALL
+ (SELECT date, 'Fast' AS flag, NULL AS country, NULL AS version,
+ NULL AS platform, NULL AS ec2bridge, avg_fast AS relays,
+ NULL AS bridges FROM network_size WHERE date < current_date - 1)
+UNION ALL
+ (SELECT date, 'Stable' AS flag, NULL AS country, NULL AS version,
+ NULL AS platform, NULL AS ec2bridge, avg_stable AS relays,
+ NULL AS bridges FROM network_size WHERE date < current_date - 1)
+UNION ALL
+ (SELECT date, 'HSDir' AS flag, NULL AS country, NULL AS version,
+ NULL AS platform, NULL AS ec2bridge, avg_hsdir AS relays,
+ NULL AS bridges FROM network_size WHERE date < current_date - 1)
+UNION ALL
+ (SELECT date, NULL AS flag, CASE WHEN country != 'zz' THEN country
+ ELSE '??' END AS country, NULL AS version, NULL AS platform,
+ NULL AS ec2bridge, relays, NULL AS bridges FROM relay_countries
+ WHERE date < current_date - 1)
+UNION ALL
+ (SELECT date, NULL AS flag, NULL AS country, version, NULL AS platform,
+ NULL AS ec2bridge, relays, NULL AS bridges FROM relay_versions
+ WHERE date < current_date - 1)
+UNION ALL
+ (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
+ 'Linux' AS platform, NULL AS ec2bridge, avg_linux AS relays,
+ NULL AS bridges FROM relay_platforms WHERE date < current_date - 1)
+UNION ALL
+ (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
+ 'Darwin' AS platform, NULL AS ec2bridge, avg_darwin AS relays,
+ NULL AS bridges FROM relay_platforms WHERE date < current_date - 1)
+UNION ALL
+ (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
+ 'FreeBSD' AS platform, NULL AS ec2bridge, avg_bsd AS relays,
+ NULL AS bridges FROM relay_platforms WHERE date < current_date - 1)
+UNION ALL
+ (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
+ 'Windows' AS platform, NULL AS ec2bridge, avg_windows AS relays,
+ NULL AS bridges FROM relay_platforms WHERE date < current_date - 1)
+UNION ALL
+ (SELECT date, NULL AS flag, NULL AS country, NULL AS version,
+ 'Other' AS platform, NULL AS ec2bridge, avg_other AS relays,
+ NULL AS bridges FROM relay_platforms WHERE date < current_date - 1)
+ORDER BY 1, 2, 3, 4, 5, 6;
+
+-- View for exporting bandwidth statistics.
+CREATE VIEW stats_bandwidth AS
+ (SELECT COALESCE(bandwidth_flags.date, bwhist_flags.date) AS date,
+ COALESCE(bandwidth_flags.isexit, bwhist_flags.isexit) AS isexit,
+ COALESCE(bandwidth_flags.isguard, bwhist_flags.isguard) AS isguard,
+ bandwidth_flags.bwadvertised AS advbw,
+ CASE WHEN bwhist_flags.read IS NOT NULL
+ THEN bwhist_flags.read / 86400 END AS bwread,
+ CASE WHEN bwhist_flags.written IS NOT NULL
+ THEN bwhist_flags.written / 86400 END AS bwwrite,
+ NULL AS dirread, NULL AS dirwrite
+ FROM bandwidth_flags FULL OUTER JOIN bwhist_flags
+ ON bandwidth_flags.date = bwhist_flags.date
+ AND bandwidth_flags.isexit = bwhist_flags.isexit
+ AND bandwidth_flags.isguard = bwhist_flags.isguard
+ WHERE COALESCE(bandwidth_flags.date, bwhist_flags.date) <
+ current_date - 3)
+UNION ALL
+ (SELECT COALESCE(total_bandwidth.date, total_bwhist.date, u.date)
+ AS date, NULL AS isexit, NULL AS isguard,
+ total_bandwidth.bwadvertised AS advbw,
+ CASE WHEN total_bwhist.read IS NOT NULL
+ THEN total_bwhist.read / 86400 END AS bwread,
+ CASE WHEN total_bwhist.written IS NOT NULL
+ THEN total_bwhist.written / 86400 END AS bwwrite,
+ CASE WHEN u.date IS NOT NULL
+ THEN FLOOR(CAST(u.dr AS NUMERIC) * CAST(u.brp AS NUMERIC) /
+ CAST(u.brd AS NUMERIC) / CAST(86400 AS NUMERIC)) END AS dirread,
+ CASE WHEN u.date IS NOT NULL
+ THEN FLOOR(CAST(u.dw AS NUMERIC) * CAST(u.bwp AS NUMERIC) /
+ CAST(u.bwd AS NUMERIC) / CAST(86400 AS NUMERIC)) END AS dirwrite
+ FROM total_bandwidth FULL OUTER JOIN total_bwhist
+ ON total_bandwidth.date = total_bwhist.date
+ FULL OUTER JOIN (SELECT * FROM user_stats WHERE country = 'zy'
+ AND bwp / bwd <= 3) u
+ ON COALESCE(total_bandwidth.date, total_bwhist.date) = u.date
+ WHERE COALESCE(total_bandwidth.date, total_bwhist.date, u.date) <
+ current_date - 3)
+ORDER BY 1, 2, 3;
+
+-- View for exporting torperf statistics.
+CREATE VIEW stats_torperf AS
+SELECT date, CASE WHEN source LIKE '%-50kb' THEN 50 * 1024
+ WHEN source LIKE '%-1mb' THEN 1024 * 1024
+ WHEN source LIKE '%-5mb' THEN 5 * 1024 * 1024 END AS size,
+ CASE WHEN source NOT LIKE 'all-%'
+ THEN split_part(source, '-', 1) END AS source, q1, md, q3, timeouts,
+ failures, requests FROM torperf_stats WHERE date < current_date - 1
+ ORDER BY 1, 2, 3;
+
+-- View for exporting connbidirect statistics.
+CREATE VIEW stats_connbidirect AS
+SELECT DATE(statsend) AS date, source, belownum AS below, readnum AS read,
+ writenum AS write, bothnum AS "both" FROM connbidirect
+ WHERE DATE(statsend) < current_date - 1 ORDER BY 1, 2;
+
diff --git a/modules/legacy/src/org/torproject/ernie/cron/Configuration.java b/modules/legacy/src/org/torproject/ernie/cron/Configuration.java
new file mode 100644
index 0000000..878e882
--- /dev/null
+++ b/modules/legacy/src/org/torproject/ernie/cron/Configuration.java
@@ -0,0 +1,163 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.ernie.cron;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Initialize configuration with hard-coded defaults, overwrite with
+ * configuration in config file, if exists, and answer Main.java about our
+ * configuration.
+ */
+public class Configuration {
+ private boolean importDirectoryArchives = false;
+ private String directoryArchivesDirectory = "in/relay-descriptors/";
+ private boolean keepDirectoryArchiveImportHistory = false;
+ private boolean importSanitizedBridges = false;
+ private String sanitizedBridgesDirectory = "in/bridge-descriptors/";
+ private boolean keepSanitizedBridgesImportHistory = false;
+ private boolean writeRelayDescriptorDatabase = false;
+ private String relayDescriptorDatabaseJdbc =
+ "jdbc:postgresql://localhost/tordir?user=metrics&password=password";
+ private boolean writeRelayDescriptorsRawFiles = false;
+ private String relayDescriptorRawFilesDirectory = "pg-import/";
+ private boolean writeBridgeStats = false;
+ private boolean importWriteTorperfStats = false;
+ private String torperfDirectory = "in/torperf/";
+ private String exoneraTorDatabaseJdbc = "jdbc:postgresql:"
+ + "//localhost/exonerator?user=metrics&password=password";
+ private String exoneraTorImportDirectory = "exonerator-import/";
+
+ public Configuration() {
+
+ /* Initialize logger. */
+ Logger logger = Logger.getLogger(Configuration.class.getName());
+
+ /* Read config file, if present. */
+ File configFile = new File("config");
+ if (!configFile.exists()) {
+ logger.warning("Could not find config file.");
+ return;
+ }
+ String line = null;
+ try {
+ BufferedReader br = new BufferedReader(new FileReader(configFile));
+ while ((line = br.readLine()) != null) {
+ if (line.startsWith("#") || line.length() < 1) {
+ continue;
+ } else if (line.startsWith("ImportDirectoryArchives")) {
+ this.importDirectoryArchives = Integer.parseInt(
+ line.split(" ")[1]) != 0;
+ } else if (line.startsWith("DirectoryArchivesDirectory")) {
+ this.directoryArchivesDirectory = line.split(" ")[1];
+ } else if (line.startsWith("KeepDirectoryArchiveImportHistory")) {
+ this.keepDirectoryArchiveImportHistory = Integer.parseInt(
+ line.split(" ")[1]) != 0;
+ } else if (line.startsWith("ImportSanitizedBridges")) {
+ this.importSanitizedBridges = Integer.parseInt(
+ line.split(" ")[1]) != 0;
+ } else if (line.startsWith("SanitizedBridgesDirectory")) {
+ this.sanitizedBridgesDirectory = line.split(" ")[1];
+ } else if (line.startsWith("KeepSanitizedBridgesImportHistory")) {
+ this.keepSanitizedBridgesImportHistory = Integer.parseInt(
+ line.split(" ")[1]) != 0;
+ } else if (line.startsWith("WriteRelayDescriptorDatabase")) {
+ this.writeRelayDescriptorDatabase = Integer.parseInt(
+ line.split(" ")[1]) != 0;
+ } else if (line.startsWith("RelayDescriptorDatabaseJDBC")) {
+ this.relayDescriptorDatabaseJdbc = line.split(" ")[1];
+ } else if (line.startsWith("WriteRelayDescriptorsRawFiles")) {
+ this.writeRelayDescriptorsRawFiles = Integer.parseInt(
+ line.split(" ")[1]) != 0;
+ } else if (line.startsWith("RelayDescriptorRawFilesDirectory")) {
+ this.relayDescriptorRawFilesDirectory = line.split(" ")[1];
+ } else if (line.startsWith("WriteBridgeStats")) {
+ this.writeBridgeStats = Integer.parseInt(
+ line.split(" ")[1]) != 0;
+ } else if (line.startsWith("ImportWriteTorperfStats")) {
+ this.importWriteTorperfStats = Integer.parseInt(
+ line.split(" ")[1]) != 0;
+ } else if (line.startsWith("TorperfDirectory")) {
+ this.torperfDirectory = line.split(" ")[1];
+ } else if (line.startsWith("ExoneraTorDatabaseJdbc")) {
+ this.exoneraTorDatabaseJdbc = line.split(" ")[1];
+ } else if (line.startsWith("ExoneraTorImportDirectory")) {
+ this.exoneraTorImportDirectory = line.split(" ")[1];
+ } else {
+ logger.severe("Configuration file contains unrecognized "
+ + "configuration key in line '" + line + "'! Exiting!");
+ System.exit(1);
+ }
+ }
+ br.close();
+ } catch (ArrayIndexOutOfBoundsException e) {
+ logger.severe("Configuration file contains configuration key "
+ + "without value in line '" + line + "'. Exiting!");
+ System.exit(1);
+ } catch (MalformedURLException e) {
+ logger.severe("Configuration file contains illegal URL or IP:port "
+ + "pair in line '" + line + "'. Exiting!");
+ System.exit(1);
+ } catch (NumberFormatException e) {
+ logger.severe("Configuration file contains illegal value in line '"
+ + line + "' with legal values being 0 or 1. Exiting!");
+ System.exit(1);
+ } catch (IOException e) {
+ logger.log(Level.SEVERE, "Unknown problem while reading config "
+ + "file! Exiting!", e);
+ System.exit(1);
+ }
+ }
+ public boolean getImportDirectoryArchives() {
+ return this.importDirectoryArchives;
+ }
+ public String getDirectoryArchivesDirectory() {
+ return this.directoryArchivesDirectory;
+ }
+ public boolean getKeepDirectoryArchiveImportHistory() {
+ return this.keepDirectoryArchiveImportHistory;
+ }
+ public boolean getWriteRelayDescriptorDatabase() {
+ return this.writeRelayDescriptorDatabase;
+ }
+ public boolean getImportSanitizedBridges() {
+ return this.importSanitizedBridges;
+ }
+ public String getSanitizedBridgesDirectory() {
+ return this.sanitizedBridgesDirectory;
+ }
+ public boolean getKeepSanitizedBridgesImportHistory() {
+ return this.keepSanitizedBridgesImportHistory;
+ }
+ public String getRelayDescriptorDatabaseJDBC() {
+ return this.relayDescriptorDatabaseJdbc;
+ }
+ public boolean getWriteRelayDescriptorsRawFiles() {
+ return this.writeRelayDescriptorsRawFiles;
+ }
+ public String getRelayDescriptorRawFilesDirectory() {
+ return this.relayDescriptorRawFilesDirectory;
+ }
+ public boolean getWriteBridgeStats() {
+ return this.writeBridgeStats;
+ }
+ public boolean getImportWriteTorperfStats() {
+ return this.importWriteTorperfStats;
+ }
+ public String getTorperfDirectory() {
+ return this.torperfDirectory;
+ }
+ public String getExoneraTorDatabaseJdbc() {
+ return this.exoneraTorDatabaseJdbc;
+ }
+ public String getExoneraTorImportDirectory() {
+ return this.exoneraTorImportDirectory;
+ }
+}
+
diff --git a/modules/legacy/src/org/torproject/ernie/cron/LockFile.java b/modules/legacy/src/org/torproject/ernie/cron/LockFile.java
new file mode 100644
index 0000000..4de8da0
--- /dev/null
+++ b/modules/legacy/src/org/torproject/ernie/cron/LockFile.java
@@ -0,0 +1,52 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.ernie.cron;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.logging.Logger;
+
+public class LockFile {
+
+ private File lockFile;
+ private Logger logger;
+
+ public LockFile() {
+ this.lockFile = new File("lock");
+ this.logger = Logger.getLogger(LockFile.class.getName());
+ }
+
+ public boolean acquireLock() {
+ this.logger.fine("Trying to acquire lock...");
+ try {
+ if (this.lockFile.exists()) {
+ BufferedReader br = new BufferedReader(new FileReader("lock"));
+ long runStarted = Long.parseLong(br.readLine());
+ br.close();
+ if (System.currentTimeMillis() - runStarted < 55L * 60L * 1000L) {
+ return false;
+ }
+ }
+ BufferedWriter bw = new BufferedWriter(new FileWriter("lock"));
+ bw.append("" + System.currentTimeMillis() + "\n");
+ bw.close();
+ this.logger.fine("Acquired lock.");
+ return true;
+ } catch (IOException e) {
+ this.logger.warning("Caught exception while trying to acquire "
+ + "lock!");
+ return false;
+ }
+ }
+
+ public void releaseLock() {
+ this.logger.fine("Releasing lock...");
+ this.lockFile.delete();
+ this.logger.fine("Released lock.");
+ }
+}
+
diff --git a/modules/legacy/src/org/torproject/ernie/cron/LoggingConfiguration.java b/modules/legacy/src/org/torproject/ernie/cron/LoggingConfiguration.java
new file mode 100644
index 0000000..c261d95
--- /dev/null
+++ b/modules/legacy/src/org/torproject/ernie/cron/LoggingConfiguration.java
@@ -0,0 +1,94 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.ernie.cron;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.FileHandler;
+import java.util.logging.Formatter;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+/**
+ * Initialize logging configuration.
+ *
+ * Log levels used by ERNIE:
+ *
+ * - SEVERE: An event made it impossible to continue program execution.
+ * - WARNING: A potential problem occurred that requires the operator to
+ * look after the otherwise unattended setup
+ * - INFO: Messages on INFO level are meant to help the operator in making
+ * sure that operation works as expected.
+ * - FINE: Debug messages that are used to identify problems and which are
+ * turned on by default.
+ * - FINER: More detailed debug messages to investigate problems in more
+ * detail. Not turned on by default. Increase log file limit when using
+ * FINER.
+ * - FINEST: Most detailed debug messages. Not used.
+ */
+public class LoggingConfiguration {
+
+ public LoggingConfiguration() {
+
+ /* Remove default console handler. */
+ for (Handler h : Logger.getLogger("").getHandlers()) {
+ Logger.getLogger("").removeHandler(h);
+ }
+
+ /* Disable logging of internal Sun classes. */
+ Logger.getLogger("sun").setLevel(Level.OFF);
+
+ /* Set minimum log level we care about from INFO to FINER. */
+ Logger.getLogger("").setLevel(Level.FINER);
+
+ /* Create log handler that writes messages on WARNING or higher to the
+ * console. */
+ final SimpleDateFormat dateTimeFormat =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ Formatter cf = new Formatter() {
+ public String format(LogRecord record) {
+ return dateTimeFormat.format(new Date(record.getMillis())) + " "
+ + record.getMessage() + "\n";
+ }
+ };
+ Handler ch = new ConsoleHandler();
+ ch.setFormatter(cf);
+ ch.setLevel(Level.WARNING);
+ Logger.getLogger("").addHandler(ch);
+
+ /* Initialize own logger for this class. */
+ Logger logger = Logger.getLogger(
+ LoggingConfiguration.class.getName());
+
+ /* Create log handler that writes all messages on FINE or higher to a
+ * local file. */
+ Formatter ff = new Formatter() {
+ public String format(LogRecord record) {
+ return dateTimeFormat.format(new Date(record.getMillis())) + " "
+ + record.getLevel() + " " + record.getSourceClassName() + " "
+ + record.getSourceMethodName() + " " + record.getMessage()
+ + (record.getThrown() != null ? " " + record.getThrown() : "")
+ + "\n";
+ }
+ };
+ try {
+ FileHandler fh = new FileHandler("log", 5000000, 5, true);
+ fh.setFormatter(ff);
+ fh.setLevel(Level.FINE);
+ Logger.getLogger("").addHandler(fh);
+ } catch (SecurityException e) {
+ logger.log(Level.WARNING, "No permission to create log file. "
+ + "Logging to file is disabled.", e);
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "Could not write to log file. Logging to "
+ + "file is disabled.", e);
+ }
+ }
+}
+
diff --git a/modules/legacy/src/org/torproject/ernie/cron/Main.java b/modules/legacy/src/org/torproject/ernie/cron/Main.java
new file mode 100644
index 0000000..5d561a6
--- /dev/null
+++ b/modules/legacy/src/org/torproject/ernie/cron/Main.java
@@ -0,0 +1,100 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.ernie.cron;
+
+import java.io.File;
+import java.util.logging.Logger;
+
+import org.torproject.ernie.cron.network.ConsensusStatsFileHandler;
+import org.torproject.ernie.cron.performance.PerformanceStatsImporter;
+import org.torproject.ernie.cron.performance.TorperfProcessor;
+
+/**
+ * Coordinate downloading and parsing of descriptors and extraction of
+ * statistically relevant data for later processing with R.
+ */
+public class Main {
+ public static void main(String[] args) {
+
+ /* Initialize logging configuration. */
+ new LoggingConfiguration();
+
+ Logger logger = Logger.getLogger(Main.class.getName());
+ logger.info("Starting ERNIE.");
+
+ // Initialize configuration
+ Configuration config = new Configuration();
+
+ // Use lock file to avoid overlapping runs
+ LockFile lf = new LockFile();
+ if (!lf.acquireLock()) {
+ logger.severe("Warning: ERNIE is already running or has not exited "
+ + "cleanly! Exiting!");
+ System.exit(1);
+ }
+
+ // Define stats directory for temporary files
+ File statsDirectory = new File("stats");
+
+ // Import relay descriptors
+ if (config.getImportDirectoryArchives()) {
+ RelayDescriptorDatabaseImporter rddi =
+ config.getWriteRelayDescriptorDatabase() ||
+ config.getWriteRelayDescriptorsRawFiles() ?
+ new RelayDescriptorDatabaseImporter(
+ config.getWriteRelayDescriptorDatabase() ?
+ config.getRelayDescriptorDatabaseJDBC() : null,
+ config.getWriteRelayDescriptorsRawFiles() ?
+ config.getRelayDescriptorRawFilesDirectory() : null,
+ new File(config.getDirectoryArchivesDirectory()),
+ statsDirectory,
+ config.getKeepDirectoryArchiveImportHistory()) : null;
+ if (rddi != null) {
+ rddi.importRelayDescriptors();
+ }
+ rddi.closeConnection();
+
+ // Import conn-bi-direct statistics.
+ PerformanceStatsImporter psi = new PerformanceStatsImporter(
+ config.getWriteRelayDescriptorDatabase() ?
+ config.getRelayDescriptorDatabaseJDBC() : null,
+ config.getWriteRelayDescriptorsRawFiles() ?
+ config.getRelayDescriptorRawFilesDirectory() : null,
+ new File(config.getDirectoryArchivesDirectory()),
+ statsDirectory,
+ config.getKeepDirectoryArchiveImportHistory());
+ psi.importRelayDescriptors();
+ psi.closeConnection();
+ }
+
+ // Prepare consensus stats file handler (used for stats on running
+ // bridges only)
+ ConsensusStatsFileHandler csfh = config.getWriteBridgeStats() ?
+ new ConsensusStatsFileHandler(
+ config.getRelayDescriptorDatabaseJDBC(),
+ new File(config.getSanitizedBridgesDirectory()),
+ statsDirectory, config.getKeepSanitizedBridgesImportHistory()) :
+ null;
+
+ // Import sanitized bridges and write updated stats files to disk
+ if (csfh != null) {
+ if (config.getImportSanitizedBridges()) {
+ csfh.importSanitizedBridges();
+ }
+ csfh.writeFiles();
+ csfh = null;
+ }
+
+ // Import and process torperf stats
+ if (config.getImportWriteTorperfStats()) {
+ new TorperfProcessor(new File(config.getTorperfDirectory()),
+ statsDirectory, config.getRelayDescriptorDatabaseJDBC());
+ }
+
+ // Remove lock file
+ lf.releaseLock();
+
+ logger.info("Terminating ERNIE.");
+ }
+}
+
diff --git a/modules/legacy/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java b/modules/legacy/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
new file mode 100644
index 0000000..a51092e
--- /dev/null
+++ b/modules/legacy/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
@@ -0,0 +1,1077 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.ernie.cron;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.postgresql.util.PGbytea;
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorFile;
+import org.torproject.descriptor.DescriptorReader;
+import org.torproject.descriptor.DescriptorSourceFactory;
+import org.torproject.descriptor.ExtraInfoDescriptor;
+import org.torproject.descriptor.NetworkStatusEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
+import org.torproject.descriptor.ServerDescriptor;
+
+/**
+ * Parse directory data.
+ */
+
+/* TODO Split up this class and move its parts to cron.network,
+ * cron.users, and status.relaysearch packages. Requires extensive
+ * changes to the database schema though. */
+public final class RelayDescriptorDatabaseImporter {
+
+ /**
+ * How many records to commit with each database transaction.
+ */
+ private final long autoCommitCount = 500;
+
+ /**
+ * Keep track of the number of records committed before each transaction
+ */
+ private int rdsCount = 0;
+ private int resCount = 0;
+ private int rhsCount = 0;
+ private int rrsCount = 0;
+ private int rcsCount = 0;
+ private int rvsCount = 0;
+ private int rqsCount = 0;
+
+ /**
+ * Relay descriptor database connection.
+ */
+ private Connection conn;
+
+ /**
+ * Prepared statement to check whether any network status consensus
+ * entries matching a given valid-after time have been imported into the
+ * database before.
+ */
+ private PreparedStatement psSs;
+
+ /**
+ * Prepared statement to check whether a given network status consensus
+ * entry has been imported into the database before.
+ */
+ private PreparedStatement psRs;
+
+ /**
+ * Prepared statement to check whether a given server descriptor has
+ * been imported into the database before.
+ */
+ private PreparedStatement psDs;
+
+ /**
+ * Prepared statement to check whether a given network status consensus
+ * has been imported into the database before.
+ */
+ private PreparedStatement psCs;
+
+ /**
+ * Prepared statement to check whether a given dirreq stats string has
+ * been imported into the database before.
+ */
+ private PreparedStatement psQs;
+
+ /**
+ * Set of dates that have been inserted into the database for being
+ * included in the next refresh run.
+ */
+ private Set<Long> scheduledUpdates;
+
+ /**
+ * Prepared statement to insert a date into the database that shall be
+ * included in the next refresh run.
+ */
+ private PreparedStatement psU;
+
+ /**
+ * Prepared statement to insert a network status consensus entry into
+ * the database.
+ */
+ private PreparedStatement psR;
+
+ /**
+ * Prepared statement to insert a server descriptor into the database.
+ */
+ private PreparedStatement psD;
+
+ /**
+ * Callable statement to insert the bandwidth history of an extra-info
+ * descriptor into the database.
+ */
+ private CallableStatement csH;
+
+ /**
+ * Prepared statement to insert a network status consensus into the
+ * database.
+ */
+ private PreparedStatement psC;
+
+ /**
+ * Prepared statement to insert a given dirreq stats string into the
+ * database.
+ */
+ private PreparedStatement psQ;
+
+ /**
+ * Logger for this class.
+ */
+ private Logger logger;
+
+ /**
+ * Directory for writing raw import files.
+ */
+ private String rawFilesDirectory;
+
+ /**
+ * Raw import file containing status entries.
+ */
+ private BufferedWriter statusentryOut;
+
+ /**
+ * Raw import file containing server descriptors.
+ */
+ private BufferedWriter descriptorOut;
+
+ /**
+ * Raw import file containing bandwidth histories.
+ */
+ private BufferedWriter bwhistOut;
+
+ /**
+ * Raw import file containing consensuses.
+ */
+ private BufferedWriter consensusOut;
+
+ /**
+ * Raw import file containing dirreq stats.
+ */
+ private BufferedWriter dirReqOut;
+
+ /**
+ * Date format to parse timestamps.
+ */
+ private SimpleDateFormat dateTimeFormat;
+
+ /**
+ * The last valid-after time for which we checked whether they have been
+ * any network status entries in the database.
+ */
+ private long lastCheckedStatusEntries;
+
+ /**
+ * Set of fingerprints that we imported for the valid-after time in
+ * <code>lastCheckedStatusEntries</code>.
+ */
+ private Set<String> insertedStatusEntries;
+
+ /**
+ * Flag that tells us whether we need to check whether a network status
+ * entry is already contained in the database or not.
+ */
+ private boolean separateStatusEntryCheckNecessary;
+
+ private boolean importIntoDatabase;
+ private boolean writeRawImportFiles;
+
+ private File archivesDirectory;
+ private File statsDirectory;
+ private boolean keepImportHistory;
+
+ /**
+ * Initialize database importer by connecting to the database and
+ * preparing statements.
+ */
+ public RelayDescriptorDatabaseImporter(String connectionURL,
+ String rawFilesDirectory, File archivesDirectory,
+ File statsDirectory, boolean keepImportHistory) {
+
+ if (archivesDirectory == null ||
+ statsDirectory == null) {
+ throw new IllegalArgumentException();
+ }
+ this.archivesDirectory = archivesDirectory;
+ this.statsDirectory = statsDirectory;
+ this.keepImportHistory = keepImportHistory;
+
+ /* Initialize logger. */
+ this.logger = Logger.getLogger(
+ RelayDescriptorDatabaseImporter.class.getName());
+
+ if (connectionURL != null) {
+ try {
+ /* Connect to database. */
+ this.conn = DriverManager.getConnection(connectionURL);
+
+ /* Turn autocommit off */
+ this.conn.setAutoCommit(false);
+
+ /* Prepare statements. */
+ this.psSs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM statusentry WHERE validafter = ?");
+ this.psRs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM statusentry WHERE validafter = ? AND "
+ + "fingerprint = ?");
+ this.psDs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM descriptor WHERE descriptor = ?");
+ this.psCs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM consensus WHERE validafter = ?");
+ this.psQs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM dirreq_stats WHERE source = ? AND statsend = ?");
+ this.psR = conn.prepareStatement("INSERT INTO statusentry "
+ + "(validafter, nickname, fingerprint, descriptor, "
+ + "published, address, orport, dirport, isauthority, "
+ + "isbadexit, isbaddirectory, isexit, isfast, isguard, "
+ + "ishsdir, isnamed, isstable, isrunning, isunnamed, "
+ + "isvalid, isv2dir, isv3dir, version, bandwidth, ports, "
+ + "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
+ + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+ this.psD = conn.prepareStatement("INSERT INTO descriptor "
+ + "(descriptor, nickname, address, orport, dirport, "
+ + "fingerprint, bandwidthavg, bandwidthburst, "
+ + "bandwidthobserved, platform, published, uptime, "
+ + "extrainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
+ + "?)");
+ this.csH = conn.prepareCall("{call insert_bwhist(?, ?, ?, ?, ?, "
+ + "?)}");
+ this.psC = conn.prepareStatement("INSERT INTO consensus "
+ + "(validafter) VALUES (?)");
+ this.psQ = conn.prepareStatement("INSERT INTO dirreq_stats "
+ + "(source, statsend, seconds, country, requests) VALUES "
+ + "(?, ?, ?, ?, ?)");
+ this.psU = conn.prepareStatement("INSERT INTO scheduled_updates "
+ + "(date) VALUES (?)");
+ this.scheduledUpdates = new HashSet<Long>();
+ this.importIntoDatabase = true;
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not connect to database or "
+ + "prepare statements.", e);
+ }
+
+ /* Initialize set of fingerprints to remember which status entries
+ * we already imported. */
+ this.insertedStatusEntries = new HashSet<String>();
+ }
+
+ /* Remember where we want to write raw import files. */
+ if (rawFilesDirectory != null) {
+ this.rawFilesDirectory = rawFilesDirectory;
+ this.writeRawImportFiles = true;
+ }
+
+ /* Initialize date format, so that we can format timestamps. */
+ this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+
+ private void addDateToScheduledUpdates(long timestamp)
+ throws SQLException {
+ if (!this.importIntoDatabase) {
+ return;
+ }
+ long dateMillis = 0L;
+ try {
+ dateMillis = this.dateTimeFormat.parse(
+ this.dateTimeFormat.format(timestamp).substring(0, 10)
+ + " 00:00:00").getTime();
+ } catch (ParseException e) {
+ this.logger.log(Level.WARNING, "Internal parsing error.", e);
+ return;
+ }
+ if (!this.scheduledUpdates.contains(dateMillis)) {
+ this.psU.setDate(1, new java.sql.Date(dateMillis));
+ this.psU.execute();
+ this.scheduledUpdates.add(dateMillis);
+ }
+ }
+
+ /**
+ * Insert network status consensus entry into database.
+ */
+ public void addStatusEntry(long validAfter, String nickname,
+ String fingerprint, String descriptor, long published,
+ String address, long orPort, long dirPort,
+ SortedSet<String> flags, String version, long bandwidth,
+ String ports, byte[] rawDescriptor) {
+ if (this.importIntoDatabase) {
+ try {
+ this.addDateToScheduledUpdates(validAfter);
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ Timestamp validAfterTimestamp = new Timestamp(validAfter);
+ if (lastCheckedStatusEntries != validAfter) {
+ this.psSs.setTimestamp(1, validAfterTimestamp, cal);
+ ResultSet rs = psSs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) == 0) {
+ separateStatusEntryCheckNecessary = false;
+ insertedStatusEntries.clear();
+ } else {
+ separateStatusEntryCheckNecessary = true;
+ }
+ rs.close();
+ lastCheckedStatusEntries = validAfter;
+ }
+ boolean alreadyContained = false;
+ if (separateStatusEntryCheckNecessary ||
+ insertedStatusEntries.contains(fingerprint)) {
+ this.psRs.setTimestamp(1, validAfterTimestamp, cal);
+ this.psRs.setString(2, fingerprint);
+ ResultSet rs = psRs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) > 0) {
+ alreadyContained = true;
+ }
+ rs.close();
+ } else {
+ insertedStatusEntries.add(fingerprint);
+ }
+ if (!alreadyContained) {
+ this.psR.clearParameters();
+ this.psR.setTimestamp(1, validAfterTimestamp, cal);
+ this.psR.setString(2, nickname);
+ this.psR.setString(3, fingerprint);
+ this.psR.setString(4, descriptor);
+ this.psR.setTimestamp(5, new Timestamp(published), cal);
+ this.psR.setString(6, address);
+ this.psR.setLong(7, orPort);
+ this.psR.setLong(8, dirPort);
+ this.psR.setBoolean(9, flags.contains("Authority"));
+ this.psR.setBoolean(10, flags.contains("BadExit"));
+ this.psR.setBoolean(11, flags.contains("BadDirectory"));
+ this.psR.setBoolean(12, flags.contains("Exit"));
+ this.psR.setBoolean(13, flags.contains("Fast"));
+ this.psR.setBoolean(14, flags.contains("Guard"));
+ this.psR.setBoolean(15, flags.contains("HSDir"));
+ this.psR.setBoolean(16, flags.contains("Named"));
+ this.psR.setBoolean(17, flags.contains("Stable"));
+ this.psR.setBoolean(18, flags.contains("Running"));
+ this.psR.setBoolean(19, flags.contains("Unnamed"));
+ this.psR.setBoolean(20, flags.contains("Valid"));
+ this.psR.setBoolean(21, flags.contains("V2Dir"));
+ this.psR.setBoolean(22, flags.contains("V3Dir"));
+ this.psR.setString(23, version);
+ this.psR.setLong(24, bandwidth);
+ this.psR.setString(25, ports);
+ this.psR.setBytes(26, rawDescriptor);
+ this.psR.executeUpdate();
+ rrsCount++;
+ if (rrsCount % autoCommitCount == 0) {
+ this.conn.commit();
+ }
+ }
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not add network status "
+ + "consensus entry. We won't make any further SQL requests "
+ + "in this execution.", e);
+ this.importIntoDatabase = false;
+ }
+ }
+ if (this.writeRawImportFiles) {
+ try {
+ if (this.statusentryOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.statusentryOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/statusentry.sql"));
+ this.statusentryOut.write(" COPY statusentry (validafter, "
+ + "nickname, fingerprint, descriptor, published, address, "
+ + "orport, dirport, isauthority, isbadExit, "
+ + "isbaddirectory, isexit, isfast, isguard, ishsdir, "
+ + "isnamed, isstable, isrunning, isunnamed, isvalid, "
+ + "isv2dir, isv3dir, version, bandwidth, ports, rawdesc) "
+ + "FROM stdin;\n");
+ }
+ this.statusentryOut.write(
+ this.dateTimeFormat.format(validAfter) + "\t" + nickname
+ + "\t" + fingerprint.toLowerCase() + "\t"
+ + descriptor.toLowerCase() + "\t"
+ + this.dateTimeFormat.format(published) + "\t" + address
+ + "\t" + orPort + "\t" + dirPort + "\t"
+ + (flags.contains("Authority") ? "t" : "f") + "\t"
+ + (flags.contains("BadExit") ? "t" : "f") + "\t"
+ + (flags.contains("BadDirectory") ? "t" : "f") + "\t"
+ + (flags.contains("Exit") ? "t" : "f") + "\t"
+ + (flags.contains("Fast") ? "t" : "f") + "\t"
+ + (flags.contains("Guard") ? "t" : "f") + "\t"
+ + (flags.contains("HSDir") ? "t" : "f") + "\t"
+ + (flags.contains("Named") ? "t" : "f") + "\t"
+ + (flags.contains("Stable") ? "t" : "f") + "\t"
+ + (flags.contains("Running") ? "t" : "f") + "\t"
+ + (flags.contains("Unnamed") ? "t" : "f") + "\t"
+ + (flags.contains("Valid") ? "t" : "f") + "\t"
+ + (flags.contains("V2Dir") ? "t" : "f") + "\t"
+ + (flags.contains("V3Dir") ? "t" : "f") + "\t"
+ + (version != null ? version : "\\N") + "\t"
+ + (bandwidth >= 0 ? bandwidth : "\\N") + "\t"
+ + (ports != null ? ports : "\\N") + "\t");
+ this.statusentryOut.write(PGbytea.toPGString(rawDescriptor).
+ replaceAll("\\\\", "\\\\\\\\") + "\n");
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not write network status "
+ + "consensus entry to raw database import file. We won't "
+ + "make any further attempts to write raw import files in "
+ + "this execution.", e);
+ this.writeRawImportFiles = false;
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write network status "
+ + "consensus entry to raw database import file. We won't "
+ + "make any further attempts to write raw import files in "
+ + "this execution.", e);
+ this.writeRawImportFiles = false;
+ }
+ }
+ }
+
+ /**
+ * Insert server descriptor into database.
+ */
+ public void addServerDescriptor(String descriptor, String nickname,
+ String address, int orPort, int dirPort, String relayIdentifier,
+ long bandwidthAvg, long bandwidthBurst, long bandwidthObserved,
+ String platform, long published, long uptime,
+ String extraInfoDigest) {
+ if (this.importIntoDatabase) {
+ try {
+ this.addDateToScheduledUpdates(published);
+ this.addDateToScheduledUpdates(
+ published + 24L * 60L * 60L * 1000L);
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ this.psDs.setString(1, descriptor);
+ ResultSet rs = psDs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) == 0) {
+ this.psD.clearParameters();
+ this.psD.setString(1, descriptor);
+ this.psD.setString(2, nickname);
+ this.psD.setString(3, address);
+ this.psD.setInt(4, orPort);
+ this.psD.setInt(5, dirPort);
+ this.psD.setString(6, relayIdentifier);
+ this.psD.setLong(7, bandwidthAvg);
+ this.psD.setLong(8, bandwidthBurst);
+ this.psD.setLong(9, bandwidthObserved);
+ /* Remove all non-ASCII characters from the platform string, or
+ * we'll make Postgres unhappy. Sun's JDK and OpenJDK behave
+ * differently when creating a new String with a given encoding.
+ * That's what the regexp below is for. */
+ this.psD.setString(10, new String(platform.getBytes(),
+ "US-ASCII").replaceAll("[^\\p{ASCII}]",""));
+ this.psD.setTimestamp(11, new Timestamp(published), cal);
+ this.psD.setLong(12, uptime);
+ this.psD.setString(13, extraInfoDigest);
+ this.psD.executeUpdate();
+ rdsCount++;
+ if (rdsCount % autoCommitCount == 0) {
+ this.conn.commit();
+ }
+ }
+ } catch (UnsupportedEncodingException e) {
+ // US-ASCII is supported for sure
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not add server "
+ + "descriptor. We won't make any further SQL requests in "
+ + "this execution.", e);
+ this.importIntoDatabase = false;
+ }
+ }
+ if (this.writeRawImportFiles) {
+ try {
+ if (this.descriptorOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.descriptorOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/descriptor.sql"));
+ this.descriptorOut.write(" COPY descriptor (descriptor, "
+ + "nickname, address, orport, dirport, fingerprint, "
+ + "bandwidthavg, bandwidthburst, bandwidthobserved, "
+ + "platform, published, uptime, extrainfo) FROM stdin;\n");
+ }
+ this.descriptorOut.write(descriptor.toLowerCase() + "\t"
+ + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
+ + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t"
+ + bandwidthBurst + "\t" + bandwidthObserved + "\t"
+ + (platform != null && platform.length() > 0
+ ? new String(platform.getBytes(), "US-ASCII") : "\\N")
+ + "\t" + this.dateTimeFormat.format(published) + "\t"
+ + (uptime >= 0 ? uptime : "\\N") + "\t"
+ + (extraInfoDigest != null ? extraInfoDigest : "\\N")
+ + "\n");
+ } catch (UnsupportedEncodingException e) {
+ // US-ASCII is supported for sure
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write server "
+ + "descriptor to raw database import file. We won't make "
+ + "any further attempts to write raw import files in this "
+ + "execution.", e);
+ this.writeRawImportFiles = false;
+ }
+ }
+ }
+
+ /**
+ * Insert extra-info descriptor into database.
+ */
+ public void addExtraInfoDescriptor(String extraInfoDigest,
+ String nickname, String fingerprint, long published,
+ List<String> bandwidthHistoryLines) {
+ if (!bandwidthHistoryLines.isEmpty()) {
+ this.addBandwidthHistory(fingerprint.toLowerCase(), published,
+ bandwidthHistoryLines);
+ }
+ }
+
+ private static class BigIntArray implements java.sql.Array {
+
+ private final String stringValue;
+
+ public BigIntArray(long[] array, int offset) {
+ if (array == null) {
+ this.stringValue = "[-1:-1]={0}";
+ } else {
+ StringBuilder sb = new StringBuilder("[" + offset + ":"
+ + (offset + array.length - 1) + "]={");
+ for (int i = 0; i < array.length; i++) {
+ sb.append((i > 0 ? "," : "") + array[i]);
+ }
+ sb.append('}');
+ this.stringValue = sb.toString();
+ }
+ }
+
+ public String toString() {
+ return stringValue;
+ }
+
+ public String getBaseTypeName() {
+ return "int8";
+ }
+
+ /* The other methods are never called; no need to implement them. */
+ public void free() {
+ throw new UnsupportedOperationException();
+ }
+ public Object getArray() {
+ throw new UnsupportedOperationException();
+ }
+ public Object getArray(long index, int count) {
+ throw new UnsupportedOperationException();
+ }
+ public Object getArray(long index, int count,
+ Map<String, Class<?>> map) {
+ throw new UnsupportedOperationException();
+ }
+ public Object getArray(Map<String, Class<?>> map) {
+ throw new UnsupportedOperationException();
+ }
+ public int getBaseType() {
+ throw new UnsupportedOperationException();
+ }
+ public ResultSet getResultSet() {
+ throw new UnsupportedOperationException();
+ }
+ public ResultSet getResultSet(long index, int count) {
+ throw new UnsupportedOperationException();
+ }
+ public ResultSet getResultSet(long index, int count,
+ Map<String, Class<?>> map) {
+ throw new UnsupportedOperationException();
+ }
+ public ResultSet getResultSet(Map<String, Class<?>> map) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public void addBandwidthHistory(String fingerprint, long published,
+ List<String> bandwidthHistoryStrings) {
+
+ /* Split history lines by date and rewrite them so that the date
+ * comes first. */
+ SortedSet<String> historyLinesByDate = new TreeSet<String>();
+ for (String bandwidthHistoryString : bandwidthHistoryStrings) {
+ String[] parts = bandwidthHistoryString.split(" ");
+ if (parts.length != 6) {
+ this.logger.finer("Bandwidth history line does not have expected "
+ + "number of elements. Ignoring this line.");
+ continue;
+ }
+ long intervalLength = 0L;
+ try {
+ intervalLength = Long.parseLong(parts[3].substring(1));
+ } catch (NumberFormatException e) {
+ this.logger.fine("Bandwidth history line does not have valid "
+ + "interval length '" + parts[3] + " " + parts[4] + "'. "
+ + "Ignoring this line.");
+ continue;
+ }
+ if (intervalLength != 900L) {
+ this.logger.fine("Bandwidth history line does not consist of "
+ + "15-minute intervals. Ignoring this line.");
+ continue;
+ }
+ String type = parts[0];
+ String intervalEndTime = parts[1] + " " + parts[2];
+ long intervalEnd, dateStart;
+ try {
+ intervalEnd = dateTimeFormat.parse(intervalEndTime).getTime();
+ dateStart = dateTimeFormat.parse(parts[1] + " 00:00:00").
+ getTime();
+ } catch (ParseException e) {
+ this.logger.fine("Parse exception while parsing timestamp in "
+ + "bandwidth history line. Ignoring this line.");
+ continue;
+ }
+ if (Math.abs(published - intervalEnd) >
+ 7L * 24L * 60L * 60L * 1000L) {
+ this.logger.fine("Extra-info descriptor publication time "
+ + dateTimeFormat.format(published) + " and last interval "
+ + "time " + intervalEndTime + " in " + type + " line differ "
+ + "by more than 7 days! Not adding this line!");
+ continue;
+ }
+ long currentIntervalEnd = intervalEnd;
+ StringBuilder sb = new StringBuilder();
+ String[] values = parts[5].split(",");
+ SortedSet<String> newHistoryLines = new TreeSet<String>();
+ try {
+ for (int i = values.length - 1; i >= -1; i--) {
+ if (i == -1 || currentIntervalEnd < dateStart) {
+ sb.insert(0, intervalEndTime + " " + type + " ("
+ + intervalLength + " s) ");
+ sb.setLength(sb.length() - 1);
+ String historyLine = sb.toString();
+ newHistoryLines.add(historyLine);
+ sb = new StringBuilder();
+ dateStart -= 24L * 60L * 60L * 1000L;
+ intervalEndTime = dateTimeFormat.format(currentIntervalEnd);
+ }
+ if (i == -1) {
+ break;
+ }
+ Long.parseLong(values[i]);
+ sb.insert(0, values[i] + ",");
+ currentIntervalEnd -= intervalLength * 1000L;
+ }
+ } catch (NumberFormatException e) {
+ this.logger.fine("Number format exception while parsing "
+ + "bandwidth history line. Ignoring this line.");
+ continue;
+ }
+ historyLinesByDate.addAll(newHistoryLines);
+ }
+
+ /* Add split history lines to database. */
+ String lastDate = null;
+ historyLinesByDate.add("EOL");
+ long[] readArray = null, writtenArray = null, dirreadArray = null,
+ dirwrittenArray = null;
+ int readOffset = 0, writtenOffset = 0, dirreadOffset = 0,
+ dirwrittenOffset = 0;
+ for (String historyLine : historyLinesByDate) {
+ String[] parts = historyLine.split(" ");
+ String currentDate = parts[0];
+ if (lastDate != null && (historyLine.equals("EOL") ||
+ !currentDate.equals(lastDate))) {
+ BigIntArray readIntArray = new BigIntArray(readArray,
+ readOffset);
+ BigIntArray writtenIntArray = new BigIntArray(writtenArray,
+ writtenOffset);
+ BigIntArray dirreadIntArray = new BigIntArray(dirreadArray,
+ dirreadOffset);
+ BigIntArray dirwrittenIntArray = new BigIntArray(dirwrittenArray,
+ dirwrittenOffset);
+ if (this.importIntoDatabase) {
+ try {
+ long dateMillis = dateTimeFormat.parse(lastDate
+ + " 00:00:00").getTime();
+ this.addDateToScheduledUpdates(dateMillis);
+ this.csH.setString(1, fingerprint);
+ this.csH.setDate(2, new java.sql.Date(dateMillis));
+ this.csH.setArray(3, readIntArray);
+ this.csH.setArray(4, writtenIntArray);
+ this.csH.setArray(5, dirreadIntArray);
+ this.csH.setArray(6, dirwrittenIntArray);
+ this.csH.addBatch();
+ rhsCount++;
+ if (rhsCount % autoCommitCount == 0) {
+ this.csH.executeBatch();
+ }
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not insert bandwidth "
+ + "history line into database. We won't make any "
+ + "further SQL requests in this execution.", e);
+ this.importIntoDatabase = false;
+ } catch (ParseException e) {
+ this.logger.log(Level.WARNING, "Could not insert bandwidth "
+ + "history line into database. We won't make any "
+ + "further SQL requests in this execution.", e);
+ this.importIntoDatabase = false;
+ }
+ }
+ if (this.writeRawImportFiles) {
+ try {
+ if (this.bwhistOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.bwhistOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/bwhist.sql"));
+ }
+ this.bwhistOut.write("SELECT insert_bwhist('" + fingerprint
+ + "','" + lastDate + "','" + readIntArray.toString()
+ + "','" + writtenIntArray.toString() + "','"
+ + dirreadIntArray.toString() + "','"
+ + dirwrittenIntArray.toString() + "');\n");
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write bandwidth "
+ + "history to raw database import file. We won't make "
+ + "any further attempts to write raw import files in "
+ + "this execution.", e);
+ this.writeRawImportFiles = false;
+ }
+ }
+ readArray = writtenArray = dirreadArray = dirwrittenArray = null;
+ }
+ if (historyLine.equals("EOL")) {
+ break;
+ }
+ long lastIntervalTime;
+ try {
+ lastIntervalTime = dateTimeFormat.parse(parts[0] + " "
+ + parts[1]).getTime() - dateTimeFormat.parse(parts[0]
+ + " 00:00:00").getTime();
+ } catch (ParseException e) {
+ continue;
+ }
+ String[] stringValues = parts[5].split(",");
+ long[] longValues = new long[stringValues.length];
+ for (int i = 0; i < longValues.length; i++) {
+ longValues[i] = Long.parseLong(stringValues[i]);
+ }
+
+ int offset = (int) (lastIntervalTime / (15L * 60L * 1000L))
+ - longValues.length + 1;
+ String type = parts[2];
+ if (type.equals("read-history")) {
+ readArray = longValues;
+ readOffset = offset;
+ } else if (type.equals("write-history")) {
+ writtenArray = longValues;
+ writtenOffset = offset;
+ } else if (type.equals("dirreq-read-history")) {
+ dirreadArray = longValues;
+ dirreadOffset = offset;
+ } else if (type.equals("dirreq-write-history")) {
+ dirwrittenArray = longValues;
+ dirwrittenOffset = offset;
+ }
+ lastDate = currentDate;
+ }
+ }
+
+ /**
+ * Insert network status consensus into database.
+ */
+ public void addConsensus(long validAfter) {
+ if (this.importIntoDatabase) {
+ try {
+ this.addDateToScheduledUpdates(validAfter);
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ Timestamp validAfterTimestamp = new Timestamp(validAfter);
+ this.psCs.setTimestamp(1, validAfterTimestamp, cal);
+ ResultSet rs = psCs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) == 0) {
+ this.psC.clearParameters();
+ this.psC.setTimestamp(1, validAfterTimestamp, cal);
+ this.psC.executeUpdate();
+ rcsCount++;
+ if (rcsCount % autoCommitCount == 0) {
+ this.conn.commit();
+ }
+ }
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not add network status "
+ + "consensus. We won't make any further SQL requests in "
+ + "this execution.", e);
+ this.importIntoDatabase = false;
+ }
+ }
+ if (this.writeRawImportFiles) {
+ try {
+ if (this.consensusOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.consensusOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/consensus.sql"));
+ this.consensusOut.write(" COPY consensus (validafter) "
+ + "FROM stdin;\n");
+ }
+ String validAfterString = this.dateTimeFormat.format(validAfter);
+ this.consensusOut.write(validAfterString + "\n");
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write network status "
+ + "consensus to raw database import file. We won't make "
+ + "any further attempts to write raw import files in this "
+ + "execution.", e);
+ this.writeRawImportFiles = false;
+ }
+ }
+ }
+
+ /**
+ * Adds observations on the number of directory requests by country as
+ * seen on a directory at a given date to the database.
+ */
+ public void addDirReqStats(String source, long statsEndMillis,
+ long seconds, Map<String, String> dirReqsPerCountry) {
+ String statsEnd = this.dateTimeFormat.format(statsEndMillis);
+ if (this.importIntoDatabase) {
+ try {
+ this.addDateToScheduledUpdates(statsEndMillis);
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ Timestamp statsEndTimestamp = new Timestamp(statsEndMillis);
+ this.psQs.setString(1, source);
+ this.psQs.setTimestamp(2, statsEndTimestamp, cal);
+ ResultSet rs = psQs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) == 0) {
+ for (Map.Entry<String, String> e :
+ dirReqsPerCountry.entrySet()) {
+ this.psQ.clearParameters();
+ this.psQ.setString(1, source);
+ this.psQ.setTimestamp(2, statsEndTimestamp, cal);
+ this.psQ.setLong(3, seconds);
+ this.psQ.setString(4, e.getKey());
+ this.psQ.setLong(5, Long.parseLong(e.getValue()));
+ this.psQ.executeUpdate();
+ rqsCount++;
+ if (rqsCount % autoCommitCount == 0) {
+ this.conn.commit();
+ }
+ }
+ }
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not add dirreq stats. We "
+ + "won't make any further SQL requests in this execution.",
+ e);
+ this.importIntoDatabase = false;
+ }
+ }
+ if (this.writeRawImportFiles) {
+ try {
+ if (this.dirReqOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.dirReqOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/dirreq_stats.sql"));
+ this.dirReqOut.write(" COPY dirreq_stats (source, statsend, "
+ + "seconds, country, requests) FROM stdin;\n");
+ }
+ for (Map.Entry<String, String> e :
+ dirReqsPerCountry.entrySet()) {
+ this.dirReqOut.write(source + "\t" + statsEnd + "\t" + seconds
+ + "\t" + e.getKey() + "\t" + e.getValue() + "\n");
+ }
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write dirreq stats to "
+ + "raw database import file. We won't make any further "
+ + "attempts to write raw import files in this execution.", e);
+ this.writeRawImportFiles = false;
+ }
+ }
+ }
+
+ public void importRelayDescriptors() {
+ if (archivesDirectory.exists()) {
+ logger.fine("Importing files in directory " + archivesDirectory
+ + "/...");
+ DescriptorReader reader =
+ DescriptorSourceFactory.createDescriptorReader();
+ reader.addDirectory(archivesDirectory);
+ if (keepImportHistory) {
+ reader.setExcludeFiles(new File(statsDirectory,
+ "database-importer-relay-descriptor-history"));
+ }
+ Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
+ while (descriptorFiles.hasNext()) {
+ DescriptorFile descriptorFile = descriptorFiles.next();
+ if (descriptorFile.getDescriptors() != null) {
+ for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+ if (descriptor instanceof RelayNetworkStatusConsensus) {
+ this.addRelayNetworkStatusConsensus(
+ (RelayNetworkStatusConsensus) descriptor);
+ } else if (descriptor instanceof ServerDescriptor) {
+ this.addServerDescriptor((ServerDescriptor) descriptor);
+ } else if (descriptor instanceof ExtraInfoDescriptor) {
+ this.addExtraInfoDescriptor(
+ (ExtraInfoDescriptor) descriptor);
+ }
+ }
+ }
+ }
+ }
+
+ logger.info("Finished importing relay descriptors.");
+ }
+
+ private void addRelayNetworkStatusConsensus(
+ RelayNetworkStatusConsensus consensus) {
+ for (NetworkStatusEntry statusEntry :
+ consensus.getStatusEntries().values()) {
+ this.addStatusEntry(consensus.getValidAfterMillis(),
+ statusEntry.getNickname(),
+ statusEntry.getFingerprint().toLowerCase(),
+ statusEntry.getDescriptor().toLowerCase(),
+ statusEntry.getPublishedMillis(), statusEntry.getAddress(),
+ statusEntry.getOrPort(), statusEntry.getDirPort(),
+ statusEntry.getFlags(), statusEntry.getVersion(),
+ statusEntry.getBandwidth(), statusEntry.getPortList(),
+ statusEntry.getStatusEntryBytes());
+ }
+ this.addConsensus(consensus.getValidAfterMillis());
+ }
+
+ private void addServerDescriptor(ServerDescriptor descriptor) {
+ this.addServerDescriptor(descriptor.getServerDescriptorDigest(),
+ descriptor.getNickname(), descriptor.getAddress(),
+ descriptor.getOrPort(), descriptor.getDirPort(),
+ descriptor.getFingerprint(), descriptor.getBandwidthRate(),
+ descriptor.getBandwidthBurst(), descriptor.getBandwidthObserved(),
+ descriptor.getPlatform(), descriptor.getPublishedMillis(),
+ descriptor.getUptime(), descriptor.getExtraInfoDigest());
+ }
+
+ private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) {
+ if (descriptor.getDirreqV3Reqs() != null) {
+ int allUsers = 0;
+ Map<String, String> obs = new HashMap<String, String>();
+ for (Map.Entry<String, Integer> e :
+ descriptor.getDirreqV3Reqs().entrySet()) {
+ String country = e.getKey();
+ int users = e.getValue() - 4;
+ allUsers += users;
+ obs.put(country, "" + users);
+ }
+ obs.put("zy", "" + allUsers);
+ this.addDirReqStats(descriptor.getFingerprint(),
+ descriptor.getDirreqStatsEndMillis(),
+ descriptor.getDirreqStatsIntervalLength(), obs);
+ }
+ List<String> bandwidthHistoryLines = new ArrayList<String>();
+ if (descriptor.getWriteHistory() != null) {
+ bandwidthHistoryLines.add(descriptor.getWriteHistory().getLine());
+ }
+ if (descriptor.getReadHistory() != null) {
+ bandwidthHistoryLines.add(descriptor.getReadHistory().getLine());
+ }
+ if (descriptor.getDirreqWriteHistory() != null) {
+ bandwidthHistoryLines.add(
+ descriptor.getDirreqWriteHistory().getLine());
+ }
+ if (descriptor.getDirreqReadHistory() != null) {
+ bandwidthHistoryLines.add(
+ descriptor.getDirreqReadHistory().getLine());
+ }
+ this.addExtraInfoDescriptor(descriptor.getExtraInfoDigest(),
+ descriptor.getNickname(),
+ descriptor.getFingerprint().toLowerCase(),
+ descriptor.getPublishedMillis(), bandwidthHistoryLines);
+ }
+
+ /**
+ * Close the relay descriptor database connection.
+ */
+ public void closeConnection() {
+
+ /* Log stats about imported descriptors. */
+ this.logger.info(String.format("Finished importing relay "
+ + "descriptors: %d consensuses, %d network status entries, %d "
+ + "votes, %d server descriptors, %d extra-info descriptors, %d "
+ + "bandwidth history elements, and %d dirreq stats elements",
+ rcsCount, rrsCount, rvsCount, rdsCount, resCount, rhsCount,
+ rqsCount));
+
+ /* Insert scheduled updates a second time, just in case the refresh
+ * run has started since inserting them the first time in which case
+ * it will miss the data inserted afterwards. We cannot, however,
+ * insert them only now, because if a Java execution fails at a random
+ * point, we might have added data, but not the corresponding dates to
+ * update statistics. */
+ if (this.importIntoDatabase) {
+ try {
+ for (long dateMillis : this.scheduledUpdates) {
+ this.psU.setDate(1, new java.sql.Date(dateMillis));
+ this.psU.execute();
+ }
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not add scheduled dates "
+ + "for the next refresh run.", e);
+ }
+ }
+
+ /* Commit any stragglers before closing. */
+ if (this.conn != null) {
+ try {
+ this.csH.executeBatch();
+
+ this.conn.commit();
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not commit final records to "
+ + "database", e);
+ }
+ try {
+ this.conn.close();
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not close database "
+ + "connection.", e);
+ }
+ }
+
+ /* Close raw import files. */
+ try {
+ if (this.statusentryOut != null) {
+ this.statusentryOut.write("\\.\n");
+ this.statusentryOut.close();
+ }
+ if (this.descriptorOut != null) {
+ this.descriptorOut.write("\\.\n");
+ this.descriptorOut.close();
+ }
+ if (this.bwhistOut != null) {
+ this.bwhistOut.write("\\.\n");
+ this.bwhistOut.close();
+ }
+ if (this.consensusOut != null) {
+ this.consensusOut.write("\\.\n");
+ this.consensusOut.close();
+ }
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not close one or more raw "
+ + "database import files.", e);
+ }
+ }
+}
+
diff --git a/modules/legacy/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java b/modules/legacy/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java
new file mode 100644
index 0000000..d5cae37
--- /dev/null
+++ b/modules/legacy/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java
@@ -0,0 +1,380 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.ernie.cron.network;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.torproject.descriptor.BridgeNetworkStatus;
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorFile;
+import org.torproject.descriptor.DescriptorReader;
+import org.torproject.descriptor.DescriptorSourceFactory;
+import org.torproject.descriptor.NetworkStatusEntry;
+
+/**
+ * Generates statistics on the average number of relays and bridges per
+ * day. Accepts parse results from <code>RelayDescriptorParser</code> and
+ * <code>BridgeDescriptorParser</code> and stores them in intermediate
+ * result files <code>stats/consensus-stats-raw</code> and
+ * <code>stats/bridge-consensus-stats-raw</code>. Writes final results to
+ * <code>stats/consensus-stats</code> for all days for which at least half
+ * of the expected consensuses or statuses are known.
+ */
+public class ConsensusStatsFileHandler {
+
+ /**
+ * Intermediate results file holding the number of running bridges per
+ * bridge status.
+ */
+ private File bridgeConsensusStatsRawFile;
+
+ /**
+ * Number of running bridges in a given bridge status. Map keys are
+ * bridge status times formatted as "yyyy-MM-dd HH:mm:ss", map values
+ * are lines as read from <code>stats/bridge-consensus-stats-raw</code>.
+ */
+ private SortedMap<String, String> bridgesRaw;
+
+ /**
+ * Average number of running bridges per day. Map keys are dates
+ * formatted as "yyyy-MM-dd", map values are the last column as written
+ * to <code>stats/consensus-stats</code>.
+ */
+ private SortedMap<String, String> bridgesPerDay;
+
+ /**
+ * Logger for this class.
+ */
+ private Logger logger;
+
+ private int bridgeResultsAdded = 0;
+
+ /* Database connection string. */
+ private String connectionURL = null;
+
+ private SimpleDateFormat dateTimeFormat;
+
+ private File bridgesDir;
+
+ private File statsDirectory;
+
+ private boolean keepImportHistory;
+
+ /**
+ * Initializes this class, including reading in intermediate results
+ * files <code>stats/consensus-stats-raw</code> and
+ * <code>stats/bridge-consensus-stats-raw</code> and final results file
+ * <code>stats/consensus-stats</code>.
+ */
+ public ConsensusStatsFileHandler(String connectionURL,
+ File bridgesDir, File statsDirectory,
+ boolean keepImportHistory) {
+
+ if (bridgesDir == null || statsDirectory == null) {
+ throw new IllegalArgumentException();
+ }
+ this.bridgesDir = bridgesDir;
+ this.statsDirectory = statsDirectory;
+ this.keepImportHistory = keepImportHistory;
+
+ /* Initialize local data structures to hold intermediate and final
+ * results. */
+ this.bridgesPerDay = new TreeMap<String, String>();
+ this.bridgesRaw = new TreeMap<String, String>();
+
+ /* Initialize file names for intermediate and final results files. */
+ this.bridgeConsensusStatsRawFile = new File(
+ "stats/bridge-consensus-stats-raw");
+
+ /* Initialize database connection string. */
+ this.connectionURL = connectionURL;
+
+ this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+ /* Initialize logger. */
+ this.logger = Logger.getLogger(
+ ConsensusStatsFileHandler.class.getName());
+
+ /* Read in number of running bridges per bridge status. */
+ if (this.bridgeConsensusStatsRawFile.exists()) {
+ try {
+ this.logger.fine("Reading file "
+ + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "...");
+ BufferedReader br = new BufferedReader(new FileReader(
+ this.bridgeConsensusStatsRawFile));
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ if (line.startsWith("date")) {
+ /* Skip headers. */
+ continue;
+ }
+ String[] parts = line.split(",");
+ String dateTime = parts[0];
+ if (parts.length == 2) {
+ this.bridgesRaw.put(dateTime, line + ",0");
+ } else if (parts.length == 3) {
+ this.bridgesRaw.put(dateTime, line);
+ } else {
+ this.logger.warning("Corrupt line '" + line + "' in file "
+ + this.bridgeConsensusStatsRawFile.getAbsolutePath()
+ + "! Aborting to read this file!");
+ break;
+ }
+ }
+ br.close();
+ this.logger.fine("Finished reading file "
+ + this.bridgeConsensusStatsRawFile.getAbsolutePath() + ".");
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Failed to read file "
+ + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "!",
+ e);
+ }
+ }
+ }
+
+ /**
+ * Adds the intermediate results of the number of running bridges in a
+ * given bridge status to the existing observations.
+ */
+ public void addBridgeConsensusResults(long publishedMillis, int running,
+ int runningEc2Bridges) {
+ String published = dateTimeFormat.format(publishedMillis);
+ String line = published + "," + running + "," + runningEc2Bridges;
+ if (!this.bridgesRaw.containsKey(published)) {
+ this.logger.finer("Adding new bridge numbers: " + line);
+ this.bridgesRaw.put(published, line);
+ this.bridgeResultsAdded++;
+ } else if (!line.equals(this.bridgesRaw.get(published))) {
+ this.logger.warning("The numbers of running bridges we were just "
+ + "given (" + line + ") are different from what we learned "
+ + "before (" + this.bridgesRaw.get(published) + ")! "
+ + "Overwriting!");
+ this.bridgesRaw.put(published, line);
+ }
+ }
+
+ public void importSanitizedBridges() {
+ if (bridgesDir.exists()) {
+ logger.fine("Importing files in directory " + bridgesDir + "/...");
+ DescriptorReader reader =
+ DescriptorSourceFactory.createDescriptorReader();
+ reader.addDirectory(bridgesDir);
+ if (keepImportHistory) {
+ reader.setExcludeFiles(new File(statsDirectory,
+ "consensus-stats-bridge-descriptor-history"));
+ }
+ Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
+ while (descriptorFiles.hasNext()) {
+ DescriptorFile descriptorFile = descriptorFiles.next();
+ if (descriptorFile.getDescriptors() != null) {
+ for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+ if (descriptor instanceof BridgeNetworkStatus) {
+ this.addBridgeNetworkStatus(
+ (BridgeNetworkStatus) descriptor);
+ }
+ }
+ }
+ }
+ logger.info("Finished importing bridge descriptors.");
+ }
+ }
+
+ private void addBridgeNetworkStatus(BridgeNetworkStatus status) {
+ int runningBridges = 0, runningEc2Bridges = 0;
+ for (NetworkStatusEntry statusEntry :
+ status.getStatusEntries().values()) {
+ if (statusEntry.getFlags().contains("Running")) {
+ runningBridges++;
+ if (statusEntry.getNickname().startsWith("ec2bridge")) {
+ runningEc2Bridges++;
+ }
+ }
+ }
+ this.addBridgeConsensusResults(status.getPublishedMillis(),
+ runningBridges, runningEc2Bridges);
+ }
+
+ /**
+ * Aggregates the raw observations on relay and bridge numbers and
+ * writes both raw and aggregate observations to disk.
+ */
+ public void writeFiles() {
+
+ /* Go through raw observations of numbers of running bridges in bridge
+ * statuses, calculate averages per day, and add these averages to
+ * final results. */
+ if (!this.bridgesRaw.isEmpty()) {
+ String tempDate = null;
+ int brunning = 0, brunningEc2 = 0, statuses = 0;
+ Iterator<String> it = this.bridgesRaw.values().iterator();
+ boolean haveWrittenFinalLine = false;
+ while (it.hasNext() || !haveWrittenFinalLine) {
+ String next = it.hasNext() ? it.next() : null;
+ /* Finished reading a day or even all lines? */
+ if (tempDate != null && (next == null
+ || !next.substring(0, 10).equals(tempDate))) {
+ /* Only write results if we have seen at least half of all
+ * statuses. */
+ if (statuses >= 24) {
+ String line = "," + (brunning / statuses) + ","
+ + (brunningEc2 / statuses);
+ /* Are our results new? */
+ if (!this.bridgesPerDay.containsKey(tempDate)) {
+ this.logger.finer("Adding new average bridge numbers: "
+ + tempDate + line);
+ this.bridgesPerDay.put(tempDate, line);
+ } else if (!line.equals(this.bridgesPerDay.get(tempDate))) {
+ this.logger.finer("Replacing existing average bridge "
+ + "numbers (" + this.bridgesPerDay.get(tempDate)
+ + " with new numbers: " + line);
+ this.bridgesPerDay.put(tempDate, line);
+ }
+ }
+ brunning = brunningEc2 = statuses = 0;
+ haveWrittenFinalLine = (next == null);
+ }
+ /* Sum up number of running bridges. */
+ if (next != null) {
+ tempDate = next.substring(0, 10);
+ statuses++;
+ String[] parts = next.split(",");
+ brunning += Integer.parseInt(parts[1]);
+ brunningEc2 += Integer.parseInt(parts[2]);
+ }
+ }
+ }
+
+ /* Write raw numbers of running bridges to disk. */
+ try {
+ this.logger.fine("Writing file "
+ + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "...");
+ this.bridgeConsensusStatsRawFile.getParentFile().mkdirs();
+ BufferedWriter bw = new BufferedWriter(
+ new FileWriter(this.bridgeConsensusStatsRawFile));
+ bw.append("datetime,brunning,brunningec2\n");
+ for (String line : this.bridgesRaw.values()) {
+ bw.append(line + "\n");
+ }
+ bw.close();
+ this.logger.fine("Finished writing file "
+ + this.bridgeConsensusStatsRawFile.getAbsolutePath() + ".");
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Failed to write file "
+ + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "!",
+ e);
+ }
+
+ /* Add average number of bridges per day to the database. */
+ if (connectionURL != null) {
+ try {
+ Map<String, String> insertRows = new HashMap<String, String>(),
+ updateRows = new HashMap<String, String>();
+ insertRows.putAll(this.bridgesPerDay);
+ Connection conn = DriverManager.getConnection(connectionURL);
+ conn.setAutoCommit(false);
+ Statement statement = conn.createStatement();
+ ResultSet rs = statement.executeQuery(
+ "SELECT date, avg_running, avg_running_ec2 "
+ + "FROM bridge_network_size");
+ while (rs.next()) {
+ String date = rs.getDate(1).toString();
+ if (insertRows.containsKey(date)) {
+ String insertRow = insertRows.remove(date);
+ String[] parts = insertRow.substring(1).split(",");
+ long newAvgRunning = Long.parseLong(parts[0]);
+ long newAvgRunningEc2 = Long.parseLong(parts[1]);
+ long oldAvgRunning = rs.getLong(2);
+ long oldAvgRunningEc2 = rs.getLong(3);
+ if (newAvgRunning != oldAvgRunning ||
+ newAvgRunningEc2 != oldAvgRunningEc2) {
+ updateRows.put(date, insertRow);
+ }
+ }
+ }
+ rs.close();
+ PreparedStatement psU = conn.prepareStatement(
+ "UPDATE bridge_network_size SET avg_running = ?, "
+ + "avg_running_ec2 = ? WHERE date = ?");
+ for (Map.Entry<String, String> e : updateRows.entrySet()) {
+ java.sql.Date date = java.sql.Date.valueOf(e.getKey());
+ String[] parts = e.getValue().substring(1).split(",");
+ long avgRunning = Long.parseLong(parts[0]);
+ long avgRunningEc2 = Long.parseLong(parts[1]);
+ psU.clearParameters();
+ psU.setLong(1, avgRunning);
+ psU.setLong(2, avgRunningEc2);
+ psU.setDate(3, date);
+ psU.executeUpdate();
+ }
+ PreparedStatement psI = conn.prepareStatement(
+ "INSERT INTO bridge_network_size (avg_running, "
+ + "avg_running_ec2, date) VALUES (?, ?, ?)");
+ for (Map.Entry<String, String> e : insertRows.entrySet()) {
+ java.sql.Date date = java.sql.Date.valueOf(e.getKey());
+ String[] parts = e.getValue().substring(1).split(",");
+ long avgRunning = Long.parseLong(parts[0]);
+ long avgRunningEc2 = Long.parseLong(parts[1]);
+ psI.clearParameters();
+ psI.setLong(1, avgRunning);
+ psI.setLong(2, avgRunningEc2);
+ psI.setDate(3, date);
+ psI.executeUpdate();
+ }
+ conn.commit();
+ conn.close();
+ } catch (SQLException e) {
+ logger.log(Level.WARNING, "Failed to add average bridge numbers "
+ + "to database.", e);
+ }
+ }
+
+ /* Write stats. */
+ StringBuilder dumpStats = new StringBuilder("Finished writing "
+ + "statistics on bridge network statuses to disk.\nAdded "
+ + this.bridgeResultsAdded + " bridge network status(es) in this "
+ + "execution.");
+ long now = System.currentTimeMillis();
+ SimpleDateFormat dateTimeFormat =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ if (this.bridgesRaw.isEmpty()) {
+ dumpStats.append("\nNo bridge status known yet.");
+ } else {
+ dumpStats.append("\nLast known bridge status was published "
+ + this.bridgesRaw.lastKey() + ".");
+ try {
+ if (now - 6L * 60L * 60L * 1000L > dateTimeFormat.parse(
+ this.bridgesRaw.lastKey()).getTime()) {
+ logger.warning("Last known bridge status is more than 6 hours "
+ + "old: " + this.bridgesRaw.lastKey());
+ }
+ } catch (ParseException e) {
+ /* Can't parse the timestamp? Whatever. */
+ }
+ }
+ logger.info(dumpStats.toString());
+ }
+}
+
diff --git a/modules/legacy/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java b/modules/legacy/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java
new file mode 100644
index 0000000..815b37f
--- /dev/null
+++ b/modules/legacy/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java
@@ -0,0 +1,271 @@
+/* Copyright 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.ernie.cron.performance;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.TimeZone;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorFile;
+import org.torproject.descriptor.DescriptorReader;
+import org.torproject.descriptor.DescriptorSourceFactory;
+import org.torproject.descriptor.ExtraInfoDescriptor;
+
+public class PerformanceStatsImporter {
+
+ /**
+ * How many records to commit with each database transaction.
+ */
+ private final long autoCommitCount = 500;
+
+ /**
+ * Keep track of the number of records committed before each transaction
+ */
+ private int rbsCount = 0;
+
+ /**
+ * Relay descriptor database connection.
+ */
+ private Connection conn;
+
+ /**
+ * Prepared statement to check whether a given conn-bi-direct stats
+ * string has been imported into the database before.
+ */
+ private PreparedStatement psBs;
+
+ /**
+ * Prepared statement to insert a conn-bi-direct stats string into the
+ * database.
+ */
+ private PreparedStatement psB;
+
+ /**
+ * Logger for this class.
+ */
+ private Logger logger;
+
+ /**
+ * Directory for writing raw import files.
+ */
+ private String rawFilesDirectory;
+
+ /**
+ * Raw import file containing conn-bi-direct stats strings.
+ */
+ private BufferedWriter connBiDirectOut;
+
+ /**
+ * Date format to parse timestamps.
+ */
+ private SimpleDateFormat dateTimeFormat;
+
+ private boolean importIntoDatabase;
+ private boolean writeRawImportFiles;
+
+ private File archivesDirectory;
+ private File statsDirectory;
+ private boolean keepImportHistory;
+
+ /**
+ * Initialize database importer by connecting to the database and
+ * preparing statements.
+ */
+ public PerformanceStatsImporter(String connectionURL,
+ String rawFilesDirectory, File archivesDirectory,
+ File statsDirectory, boolean keepImportHistory) {
+
+ if (archivesDirectory == null ||
+ statsDirectory == null) {
+ throw new IllegalArgumentException();
+ }
+ this.archivesDirectory = archivesDirectory;
+ this.statsDirectory = statsDirectory;
+ this.keepImportHistory = keepImportHistory;
+
+ /* Initialize logger. */
+ this.logger = Logger.getLogger(
+ PerformanceStatsImporter.class.getName());
+
+ if (connectionURL != null) {
+ try {
+ /* Connect to database. */
+ this.conn = DriverManager.getConnection(connectionURL);
+
+ /* Turn autocommit off */
+ this.conn.setAutoCommit(false);
+
+ /* Prepare statements. */
+ this.psBs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM connbidirect WHERE source = ? AND statsend = ?");
+ this.psB = conn.prepareStatement("INSERT INTO connbidirect "
+ + "(source, statsend, seconds, belownum, readnum, writenum, "
+ + "bothnum) VALUES (?, ?, ?, ?, ?, ?, ?)");
+ this.importIntoDatabase = true;
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not connect to database or "
+ + "prepare statements.", e);
+ }
+ }
+
+ /* Remember where we want to write raw import files. */
+ if (rawFilesDirectory != null) {
+ this.rawFilesDirectory = rawFilesDirectory;
+ this.writeRawImportFiles = true;
+ }
+
+ /* Initialize date format, so that we can format timestamps. */
+ this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+
+ /**
+ * Insert a conn-bi-direct stats string into the database.
+ */
+ private void addConnBiDirect(String source, long statsEndMillis,
+ long seconds, long below, long read, long write, long both) {
+ String statsEnd = this.dateTimeFormat.format(statsEndMillis);
+ if (this.importIntoDatabase) {
+ try {
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ Timestamp statsEndTimestamp = new Timestamp(statsEndMillis);
+ this.psBs.setString(1, source);
+ this.psBs.setTimestamp(2, statsEndTimestamp, cal);
+ ResultSet rs = psBs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) == 0) {
+ this.psB.clearParameters();
+ this.psB.setString(1, source);
+ this.psB.setTimestamp(2, statsEndTimestamp, cal);
+ this.psB.setLong(3, seconds);
+ this.psB.setLong(4, below);
+ this.psB.setLong(5, read);
+ this.psB.setLong(6, write);
+ this.psB.setLong(7, both);
+ this.psB.executeUpdate();
+ rbsCount++;
+ if (rbsCount % autoCommitCount == 0) {
+ this.conn.commit();
+ }
+ }
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not add conn-bi-direct "
+ + "stats string. We won't make any further SQL requests in "
+ + "this execution.", e);
+ this.importIntoDatabase = false;
+ }
+ }
+ if (this.writeRawImportFiles) {
+ try {
+ if (this.connBiDirectOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.connBiDirectOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/connbidirect.sql"));
+ this.connBiDirectOut.write(" COPY connbidirect (source, "
+ + "statsend, seconds, belownum, readnum, writenum, "
+ + "bothnum) FROM stdin;\n");
+ }
+ this.connBiDirectOut.write(source + "\t" + statsEnd + "\t"
+ + seconds + "\t" + below + "\t" + read + "\t" + write + "\t"
+ + both + "\n");
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write conn-bi-direct "
+ + "stats string to raw database import file. We won't make "
+ + "any further attempts to write raw import files in this "
+ + "execution.", e);
+ this.writeRawImportFiles = false;
+ }
+ }
+ }
+
+ public void importRelayDescriptors() {
+ if (archivesDirectory.exists()) {
+ logger.fine("Importing files in directory " + archivesDirectory
+ + "/...");
+ DescriptorReader reader =
+ DescriptorSourceFactory.createDescriptorReader();
+ reader.addDirectory(archivesDirectory);
+ if (keepImportHistory) {
+ reader.setExcludeFiles(new File(statsDirectory,
+ "performance-stats-relay-descriptor-history"));
+ }
+ Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
+ while (descriptorFiles.hasNext()) {
+ DescriptorFile descriptorFile = descriptorFiles.next();
+ if (descriptorFile.getDescriptors() != null) {
+ for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+ if (descriptor instanceof ExtraInfoDescriptor) {
+ this.addExtraInfoDescriptor(
+ (ExtraInfoDescriptor) descriptor);
+ }
+ }
+ }
+ }
+ }
+
+ logger.info("Finished importing relay descriptors.");
+ }
+
+ private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) {
+ if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) {
+ this.addConnBiDirect(descriptor.getFingerprint(),
+ descriptor.getConnBiDirectStatsEndMillis(),
+ descriptor.getConnBiDirectStatsIntervalLength(),
+ descriptor.getConnBiDirectBelow(),
+ descriptor.getConnBiDirectRead(),
+ descriptor.getConnBiDirectWrite(),
+ descriptor.getConnBiDirectBoth());
+ }
+ }
+
+ /**
+ * Close the relay descriptor database connection.
+ */
+ public void closeConnection() {
+
+ /* Log stats about imported descriptors. */
+ this.logger.info(String.format("Finished importing relay "
+ + "descriptors: %d conn-bi-direct stats lines", rbsCount));
+
+ /* Commit any stragglers before closing. */
+ if (this.conn != null) {
+ try {
+ this.conn.commit();
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not commit final records "
+ + "to database", e);
+ }
+ try {
+ this.conn.close();
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not close database "
+ + "connection.", e);
+ }
+ }
+
+ /* Close raw import files. */
+ try {
+ if (this.connBiDirectOut != null) {
+ this.connBiDirectOut.write("\\.\n");
+ this.connBiDirectOut.close();
+ }
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not close one or more raw "
+ + "database import files.", e);
+ }
+ }
+}
diff --git a/modules/legacy/src/org/torproject/ernie/cron/performance/TorperfProcessor.java b/modules/legacy/src/org/torproject/ernie/cron/performance/TorperfProcessor.java
new file mode 100644
index 0000000..d7322db
--- /dev/null
+++ b/modules/legacy/src/org/torproject/ernie/cron/performance/TorperfProcessor.java
@@ -0,0 +1,374 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.ernie.cron.performance;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorFile;
+import org.torproject.descriptor.DescriptorReader;
+import org.torproject.descriptor.DescriptorSourceFactory;
+import org.torproject.descriptor.TorperfResult;
+
+public class TorperfProcessor {
+ public TorperfProcessor(File torperfDirectory, File statsDirectory,
+ String connectionURL) {
+
+ if (torperfDirectory == null || statsDirectory == null) {
+ throw new IllegalArgumentException();
+ }
+
+ Logger logger = Logger.getLogger(TorperfProcessor.class.getName());
+ File rawFile = new File(statsDirectory, "torperf-raw");
+ File statsFile = new File(statsDirectory, "torperf-stats");
+ SortedMap<String, String> rawObs = new TreeMap<String, String>();
+ SortedMap<String, String> stats = new TreeMap<String, String>();
+ int addedRawObs = 0;
+ SimpleDateFormat formatter =
+ new SimpleDateFormat("yyyy-MM-dd,HH:mm:ss");
+ formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
+ try {
+ if (rawFile.exists()) {
+ logger.fine("Reading file " + rawFile.getAbsolutePath() + "...");
+ BufferedReader br = new BufferedReader(new FileReader(rawFile));
+ String line = br.readLine(); // ignore header
+ while ((line = br.readLine()) != null) {
+ if (line.split(",").length != 4) {
+ logger.warning("Corrupt line in " + rawFile.getAbsolutePath()
+ + "!");
+ break;
+ }
+ String key = line.substring(0, line.lastIndexOf(","));
+ rawObs.put(key, line);
+ }
+ br.close();
+ logger.fine("Finished reading file " + rawFile.getAbsolutePath()
+ + ".");
+ }
+ if (statsFile.exists()) {
+ logger.fine("Reading file " + statsFile.getAbsolutePath()
+ + "...");
+ BufferedReader br = new BufferedReader(new FileReader(statsFile));
+ String line = br.readLine(); // ignore header
+ while ((line = br.readLine()) != null) {
+ String key = line.split(",")[0] + "," + line.split(",")[1];
+ stats.put(key, line);
+ }
+ br.close();
+ logger.fine("Finished reading file " + statsFile.getAbsolutePath()
+ + ".");
+ }
+ if (torperfDirectory.exists()) {
+ logger.fine("Importing files in " + torperfDirectory + "/...");
+ DescriptorReader descriptorReader =
+ DescriptorSourceFactory.createDescriptorReader();
+ descriptorReader.addDirectory(torperfDirectory);
+ descriptorReader.setExcludeFiles(new File(statsDirectory,
+ "torperf-history"));
+ Iterator<DescriptorFile> descriptorFiles =
+ descriptorReader.readDescriptors();
+ while (descriptorFiles.hasNext()) {
+ DescriptorFile descriptorFile = descriptorFiles.next();
+ if (descriptorFile.getException() != null) {
+ logger.log(Level.FINE, "Error parsing file.",
+ descriptorFile.getException());
+ continue;
+ }
+ for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+ if (!(descriptor instanceof TorperfResult)) {
+ continue;
+ }
+ TorperfResult result = (TorperfResult) descriptor;
+ String source = result.getSource();
+ long fileSize = result.getFileSize();
+ if (fileSize == 51200) {
+ source += "-50kb";
+ } else if (fileSize == 1048576) {
+ source += "-1mb";
+ } else if (fileSize == 5242880) {
+ source += "-5mb";
+ } else {
+ logger.fine("Unexpected file size '" + fileSize
+ + "'. Skipping.");
+ continue;
+ }
+ String dateTime = formatter.format(result.getStartMillis());
+ long completeMillis = result.getDataCompleteMillis()
+ - result.getStartMillis();
+ String key = source + "," + dateTime;
+ String value = key;
+ if ((result.didTimeout() == null &&
+ result.getDataCompleteMillis() < 1) ||
+ (result.didTimeout() != null && result.didTimeout())) {
+ value += ",-2"; // -2 for timeout
+ } else if (result.getReadBytes() < fileSize) {
+ value += ",-1"; // -1 for failure
+ } else {
+ value += "," + completeMillis;
+ }
+ if (!rawObs.containsKey(key)) {
+ rawObs.put(key, value);
+ addedRawObs++;
+ }
+ }
+ }
+ logger.fine("Finished importing files in " + torperfDirectory
+ + "/.");
+ }
+ if (rawObs.size() > 0) {
+ logger.fine("Writing file " + rawFile.getAbsolutePath() + "...");
+ rawFile.getParentFile().mkdirs();
+ BufferedWriter bw = new BufferedWriter(new FileWriter(rawFile));
+ bw.append("source,date,start,completemillis\n");
+ String tempSourceDate = null;
+ Iterator<Map.Entry<String, String>> it =
+ rawObs.entrySet().iterator();
+ List<Long> dlTimes = new ArrayList<Long>();
+ boolean haveWrittenFinalLine = false;
+ SortedMap<String, List<Long>> dlTimesAllSources =
+ new TreeMap<String, List<Long>>();
+ SortedMap<String, long[]> statusesAllSources =
+ new TreeMap<String, long[]>();
+ long failures = 0, timeouts = 0, requests = 0;
+ while (it.hasNext() || !haveWrittenFinalLine) {
+ Map.Entry<String, String> next = it.hasNext() ? it.next() : null;
+ if (tempSourceDate != null
+ && (next == null || !(next.getValue().split(",")[0] + ","
+ + next.getValue().split(",")[1]).equals(tempSourceDate))) {
+ if (dlTimes.size() > 4) {
+ Collections.sort(dlTimes);
+ long q1 = dlTimes.get(dlTimes.size() / 4 - 1);
+ long md = dlTimes.get(dlTimes.size() / 2 - 1);
+ long q3 = dlTimes.get(dlTimes.size() * 3 / 4 - 1);
+ stats.put(tempSourceDate, tempSourceDate + "," + q1 + ","
+ + md + "," + q3 + "," + timeouts + "," + failures + ","
+ + requests);
+ String allSourceDate = "all" + tempSourceDate.substring(
+ tempSourceDate.indexOf("-"));
+ if (dlTimesAllSources.containsKey(allSourceDate)) {
+ dlTimesAllSources.get(allSourceDate).addAll(dlTimes);
+ } else {
+ dlTimesAllSources.put(allSourceDate, dlTimes);
+ }
+ if (statusesAllSources.containsKey(allSourceDate)) {
+ long[] status = statusesAllSources.get(allSourceDate);
+ status[0] += timeouts;
+ status[1] += failures;
+ status[2] += requests;
+ } else {
+ long[] status = new long[3];
+ status[0] = timeouts;
+ status[1] = failures;
+ status[2] = requests;
+ statusesAllSources.put(allSourceDate, status);
+ }
+ }
+ dlTimes = new ArrayList<Long>();
+ failures = timeouts = requests = 0;
+ if (next == null) {
+ haveWrittenFinalLine = true;
+ }
+ }
+ if (next != null) {
+ bw.append(next.getValue() + "\n");
+ String[] parts = next.getValue().split(",");
+ tempSourceDate = parts[0] + "," + parts[1];
+ long completeMillis = Long.parseLong(parts[3]);
+ if (completeMillis == -2L) {
+ timeouts++;
+ } else if (completeMillis == -1L) {
+ failures++;
+ } else {
+ dlTimes.add(Long.parseLong(parts[3]));
+ }
+ requests++;
+ }
+ }
+ bw.close();
+ for (Map.Entry<String, List<Long>> e :
+ dlTimesAllSources.entrySet()) {
+ String allSourceDate = e.getKey();
+ dlTimes = e.getValue();
+ Collections.sort(dlTimes);
+ long q1 = dlTimes.get(dlTimes.size() / 4 - 1);
+ long md = dlTimes.get(dlTimes.size() / 2 - 1);
+ long q3 = dlTimes.get(dlTimes.size() * 3 / 4 - 1);
+ long[] status = statusesAllSources.get(allSourceDate);
+ timeouts = status[0];
+ failures = status[1];
+ requests = status[2];
+ stats.put(allSourceDate, allSourceDate + "," + q1 + "," + md
+ + "," + q3 + "," + timeouts + "," + failures + ","
+ + requests);
+ }
+ logger.fine("Finished writing file " + rawFile.getAbsolutePath()
+ + ".");
+ }
+ if (stats.size() > 0) {
+ logger.fine("Writing file " + statsFile.getAbsolutePath()
+ + "...");
+ statsFile.getParentFile().mkdirs();
+ BufferedWriter bw = new BufferedWriter(new FileWriter(statsFile));
+ bw.append("source,date,q1,md,q3,timeouts,failures,requests\n");
+ for (String s : stats.values()) {
+ bw.append(s + "\n");
+ }
+ bw.close();
+ logger.fine("Finished writing file " + statsFile.getAbsolutePath()
+ + ".");
+ }
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "Failed writing "
+ + rawFile.getAbsolutePath() + " or "
+ + statsFile.getAbsolutePath() + "!", e);
+ }
+
+ /* Write stats. */
+ StringBuilder dumpStats = new StringBuilder("Finished writing "
+ + "statistics on torperf results.\nAdded " + addedRawObs
+ + " new observations in this execution.\n"
+ + "Last known obserations by source and file size are:");
+ String lastSource = null;
+ String lastLine = null;
+ for (String s : rawObs.keySet()) {
+ String[] parts = s.split(",");
+ if (lastSource == null) {
+ lastSource = parts[0];
+ } else if (!parts[0].equals(lastSource)) {
+ String lastKnownObservation = lastLine.split(",")[1] + " "
+ + lastLine.split(",")[2];
+ dumpStats.append("\n" + lastSource + " " + lastKnownObservation);
+ lastSource = parts[0];
+ }
+ lastLine = s;
+ }
+ if (lastSource != null) {
+ String lastKnownObservation = lastLine.split(",")[1] + " "
+ + lastLine.split(",")[2];
+ dumpStats.append("\n" + lastSource + " " + lastKnownObservation);
+ }
+ logger.info(dumpStats.toString());
+
+ /* Write results to database. */
+ if (connectionURL != null) {
+ try {
+ Map<String, String> insertRows = new HashMap<String, String>();
+ insertRows.putAll(stats);
+ Set<String> updateRows = new HashSet<String>();
+ Connection conn = DriverManager.getConnection(connectionURL);
+ conn.setAutoCommit(false);
+ Statement statement = conn.createStatement();
+ ResultSet rs = statement.executeQuery(
+ "SELECT date, source, q1, md, q3, timeouts, failures, "
+ + "requests FROM torperf_stats");
+ while (rs.next()) {
+ String date = rs.getDate(1).toString();
+ String source = rs.getString(2);
+ String key = source + "," + date;
+ if (insertRows.containsKey(key)) {
+ String insertRow = insertRows.remove(key);
+ String[] newStats = insertRow.split(",");
+ long newQ1 = Long.parseLong(newStats[2]);
+ long newMd = Long.parseLong(newStats[3]);
+ long newQ3 = Long.parseLong(newStats[4]);
+ long newTimeouts = Long.parseLong(newStats[5]);
+ long newFailures = Long.parseLong(newStats[6]);
+ long newRequests = Long.parseLong(newStats[7]);
+ long oldQ1 = rs.getLong(3);
+ long oldMd = rs.getLong(4);
+ long oldQ3 = rs.getLong(5);
+ long oldTimeouts = rs.getLong(6);
+ long oldFailures = rs.getLong(7);
+ long oldRequests = rs.getLong(8);
+ if (newQ1 != oldQ1 || newMd != oldMd || newQ3 != oldQ3 ||
+ newTimeouts != oldTimeouts ||
+ newFailures != oldFailures ||
+ newRequests != oldRequests) {
+ updateRows.add(insertRow);
+ }
+ }
+ }
+ PreparedStatement psU = conn.prepareStatement(
+ "UPDATE torperf_stats SET q1 = ?, md = ?, q3 = ?, "
+ + "timeouts = ?, failures = ?, requests = ? "
+ + "WHERE date = ? AND source = ?");
+ for (String row : updateRows) {
+ String[] newStats = row.split(",");
+ String source = newStats[0];
+ java.sql.Date date = java.sql.Date.valueOf(newStats[1]);
+ long q1 = Long.parseLong(newStats[2]);
+ long md = Long.parseLong(newStats[3]);
+ long q3 = Long.parseLong(newStats[4]);
+ long timeouts = Long.parseLong(newStats[5]);
+ long failures = Long.parseLong(newStats[6]);
+ long requests = Long.parseLong(newStats[7]);
+ psU.clearParameters();
+ psU.setLong(1, q1);
+ psU.setLong(2, md);
+ psU.setLong(3, q3);
+ psU.setLong(4, timeouts);
+ psU.setLong(5, failures);
+ psU.setLong(6, requests);
+ psU.setDate(7, date);
+ psU.setString(8, source);
+ psU.executeUpdate();
+ }
+ PreparedStatement psI = conn.prepareStatement(
+ "INSERT INTO torperf_stats (q1, md, q3, timeouts, failures, "
+ + "requests, date, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
+ for (String row : insertRows.values()) {
+ String[] newStats = row.split(",");
+ String source = newStats[0];
+ java.sql.Date date = java.sql.Date.valueOf(newStats[1]);
+ long q1 = Long.parseLong(newStats[2]);
+ long md = Long.parseLong(newStats[3]);
+ long q3 = Long.parseLong(newStats[4]);
+ long timeouts = Long.parseLong(newStats[5]);
+ long failures = Long.parseLong(newStats[6]);
+ long requests = Long.parseLong(newStats[7]);
+ psI.clearParameters();
+ psI.setLong(1, q1);
+ psI.setLong(2, md);
+ psI.setLong(3, q3);
+ psI.setLong(4, timeouts);
+ psI.setLong(5, failures);
+ psI.setLong(6, requests);
+ psI.setDate(7, date);
+ psI.setString(8, source);
+ psI.executeUpdate();
+ }
+ conn.commit();
+ conn.close();
+ } catch (SQLException e) {
+ logger.log(Level.WARNING, "Failed to add torperf stats to "
+ + "database.", e);
+ }
+ }
+ }
+}
+
diff --git a/run-web.sh b/run-web.sh
new file mode 100755
index 0000000..5ee88d8
--- /dev/null
+++ b/run-web.sh
@@ -0,0 +1,3 @@
+#!/bin/sh
+for i in $(ls shared/bin/[0-9]* | sort); do ./$i; done
+
diff --git a/run.sh b/run.sh
deleted file mode 100755
index f8ac867..0000000
--- a/run.sh
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/bin/sh
-
-# TODO is there a better way to suppress Ant's output?
-ant | grep "\[java\]"
-
diff --git a/shared/bin/01-rsync-descriptors.sh b/shared/bin/01-rsync-descriptors.sh
new file mode 100755
index 0000000..649675f
--- /dev/null
+++ b/shared/bin/01-rsync-descriptors.sh
@@ -0,0 +1,3 @@
+#!/bin/sh
+rsync -arz --delete --exclude 'relay-descriptors/votes' metrics.torproject.org::metrics-recent shared/in
+
diff --git a/shared/bin/10-legacy.sh b/shared/bin/10-legacy.sh
new file mode 100755
index 0000000..cd9131f
--- /dev/null
+++ b/shared/bin/10-legacy.sh
@@ -0,0 +1,10 @@
+#!/bin/sh
+cd modules/legacy/
+ant | grep "\[java\]"
+psql -U metrics tordir -c 'SELECT * FROM refresh_all();'
+psql -c 'COPY (SELECT * FROM stats_servers) TO STDOUT WITH CSV HEADER;' tordir > stats/servers.csv
+psql -c 'COPY (SELECT * FROM stats_bandwidth) TO STDOUT WITH CSV HEADER;' tordir > stats/bandwidth.csv
+psql -c 'COPY (SELECT * FROM stats_torperf) TO STDOUT WITH CSV HEADER;' tordir > stats/torperf.csv
+psql -c 'COPY (SELECT * FROM stats_connbidirect) TO STDOUT WITH CSV HEADER;' tordir > stats/connbidirect.csv
+cd ../../
+
diff --git a/shared/bin/99-copy-stats-files.sh b/shared/bin/99-copy-stats-files.sh
new file mode 100755
index 0000000..6dce205
--- /dev/null
+++ b/shared/bin/99-copy-stats-files.sh
@@ -0,0 +1,4 @@
+#!/bin/sh
+mkdir -p shared/stats
+cp -a modules/legacy/stats/*.csv shared/stats/
+
diff --git a/src/org/torproject/ernie/cron/Configuration.java b/src/org/torproject/ernie/cron/Configuration.java
deleted file mode 100644
index 878e882..0000000
--- a/src/org/torproject/ernie/cron/Configuration.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/* Copyright 2011, 2012 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.ernie.cron;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * Initialize configuration with hard-coded defaults, overwrite with
- * configuration in config file, if exists, and answer Main.java about our
- * configuration.
- */
-public class Configuration {
- private boolean importDirectoryArchives = false;
- private String directoryArchivesDirectory = "in/relay-descriptors/";
- private boolean keepDirectoryArchiveImportHistory = false;
- private boolean importSanitizedBridges = false;
- private String sanitizedBridgesDirectory = "in/bridge-descriptors/";
- private boolean keepSanitizedBridgesImportHistory = false;
- private boolean writeRelayDescriptorDatabase = false;
- private String relayDescriptorDatabaseJdbc =
- "jdbc:postgresql://localhost/tordir?user=metrics&password=password";
- private boolean writeRelayDescriptorsRawFiles = false;
- private String relayDescriptorRawFilesDirectory = "pg-import/";
- private boolean writeBridgeStats = false;
- private boolean importWriteTorperfStats = false;
- private String torperfDirectory = "in/torperf/";
- private String exoneraTorDatabaseJdbc = "jdbc:postgresql:"
- + "//localhost/exonerator?user=metrics&password=password";
- private String exoneraTorImportDirectory = "exonerator-import/";
-
- public Configuration() {
-
- /* Initialize logger. */
- Logger logger = Logger.getLogger(Configuration.class.getName());
-
- /* Read config file, if present. */
- File configFile = new File("config");
- if (!configFile.exists()) {
- logger.warning("Could not find config file.");
- return;
- }
- String line = null;
- try {
- BufferedReader br = new BufferedReader(new FileReader(configFile));
- while ((line = br.readLine()) != null) {
- if (line.startsWith("#") || line.length() < 1) {
- continue;
- } else if (line.startsWith("ImportDirectoryArchives")) {
- this.importDirectoryArchives = Integer.parseInt(
- line.split(" ")[1]) != 0;
- } else if (line.startsWith("DirectoryArchivesDirectory")) {
- this.directoryArchivesDirectory = line.split(" ")[1];
- } else if (line.startsWith("KeepDirectoryArchiveImportHistory")) {
- this.keepDirectoryArchiveImportHistory = Integer.parseInt(
- line.split(" ")[1]) != 0;
- } else if (line.startsWith("ImportSanitizedBridges")) {
- this.importSanitizedBridges = Integer.parseInt(
- line.split(" ")[1]) != 0;
- } else if (line.startsWith("SanitizedBridgesDirectory")) {
- this.sanitizedBridgesDirectory = line.split(" ")[1];
- } else if (line.startsWith("KeepSanitizedBridgesImportHistory")) {
- this.keepSanitizedBridgesImportHistory = Integer.parseInt(
- line.split(" ")[1]) != 0;
- } else if (line.startsWith("WriteRelayDescriptorDatabase")) {
- this.writeRelayDescriptorDatabase = Integer.parseInt(
- line.split(" ")[1]) != 0;
- } else if (line.startsWith("RelayDescriptorDatabaseJDBC")) {
- this.relayDescriptorDatabaseJdbc = line.split(" ")[1];
- } else if (line.startsWith("WriteRelayDescriptorsRawFiles")) {
- this.writeRelayDescriptorsRawFiles = Integer.parseInt(
- line.split(" ")[1]) != 0;
- } else if (line.startsWith("RelayDescriptorRawFilesDirectory")) {
- this.relayDescriptorRawFilesDirectory = line.split(" ")[1];
- } else if (line.startsWith("WriteBridgeStats")) {
- this.writeBridgeStats = Integer.parseInt(
- line.split(" ")[1]) != 0;
- } else if (line.startsWith("ImportWriteTorperfStats")) {
- this.importWriteTorperfStats = Integer.parseInt(
- line.split(" ")[1]) != 0;
- } else if (line.startsWith("TorperfDirectory")) {
- this.torperfDirectory = line.split(" ")[1];
- } else if (line.startsWith("ExoneraTorDatabaseJdbc")) {
- this.exoneraTorDatabaseJdbc = line.split(" ")[1];
- } else if (line.startsWith("ExoneraTorImportDirectory")) {
- this.exoneraTorImportDirectory = line.split(" ")[1];
- } else {
- logger.severe("Configuration file contains unrecognized "
- + "configuration key in line '" + line + "'! Exiting!");
- System.exit(1);
- }
- }
- br.close();
- } catch (ArrayIndexOutOfBoundsException e) {
- logger.severe("Configuration file contains configuration key "
- + "without value in line '" + line + "'. Exiting!");
- System.exit(1);
- } catch (MalformedURLException e) {
- logger.severe("Configuration file contains illegal URL or IP:port "
- + "pair in line '" + line + "'. Exiting!");
- System.exit(1);
- } catch (NumberFormatException e) {
- logger.severe("Configuration file contains illegal value in line '"
- + line + "' with legal values being 0 or 1. Exiting!");
- System.exit(1);
- } catch (IOException e) {
- logger.log(Level.SEVERE, "Unknown problem while reading config "
- + "file! Exiting!", e);
- System.exit(1);
- }
- }
- public boolean getImportDirectoryArchives() {
- return this.importDirectoryArchives;
- }
- public String getDirectoryArchivesDirectory() {
- return this.directoryArchivesDirectory;
- }
- public boolean getKeepDirectoryArchiveImportHistory() {
- return this.keepDirectoryArchiveImportHistory;
- }
- public boolean getWriteRelayDescriptorDatabase() {
- return this.writeRelayDescriptorDatabase;
- }
- public boolean getImportSanitizedBridges() {
- return this.importSanitizedBridges;
- }
- public String getSanitizedBridgesDirectory() {
- return this.sanitizedBridgesDirectory;
- }
- public boolean getKeepSanitizedBridgesImportHistory() {
- return this.keepSanitizedBridgesImportHistory;
- }
- public String getRelayDescriptorDatabaseJDBC() {
- return this.relayDescriptorDatabaseJdbc;
- }
- public boolean getWriteRelayDescriptorsRawFiles() {
- return this.writeRelayDescriptorsRawFiles;
- }
- public String getRelayDescriptorRawFilesDirectory() {
- return this.relayDescriptorRawFilesDirectory;
- }
- public boolean getWriteBridgeStats() {
- return this.writeBridgeStats;
- }
- public boolean getImportWriteTorperfStats() {
- return this.importWriteTorperfStats;
- }
- public String getTorperfDirectory() {
- return this.torperfDirectory;
- }
- public String getExoneraTorDatabaseJdbc() {
- return this.exoneraTorDatabaseJdbc;
- }
- public String getExoneraTorImportDirectory() {
- return this.exoneraTorImportDirectory;
- }
-}
-
diff --git a/src/org/torproject/ernie/cron/LockFile.java b/src/org/torproject/ernie/cron/LockFile.java
deleted file mode 100644
index 4de8da0..0000000
--- a/src/org/torproject/ernie/cron/LockFile.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/* Copyright 2011, 2012 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.ernie.cron;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.logging.Logger;
-
-public class LockFile {
-
- private File lockFile;
- private Logger logger;
-
- public LockFile() {
- this.lockFile = new File("lock");
- this.logger = Logger.getLogger(LockFile.class.getName());
- }
-
- public boolean acquireLock() {
- this.logger.fine("Trying to acquire lock...");
- try {
- if (this.lockFile.exists()) {
- BufferedReader br = new BufferedReader(new FileReader("lock"));
- long runStarted = Long.parseLong(br.readLine());
- br.close();
- if (System.currentTimeMillis() - runStarted < 55L * 60L * 1000L) {
- return false;
- }
- }
- BufferedWriter bw = new BufferedWriter(new FileWriter("lock"));
- bw.append("" + System.currentTimeMillis() + "\n");
- bw.close();
- this.logger.fine("Acquired lock.");
- return true;
- } catch (IOException e) {
- this.logger.warning("Caught exception while trying to acquire "
- + "lock!");
- return false;
- }
- }
-
- public void releaseLock() {
- this.logger.fine("Releasing lock...");
- this.lockFile.delete();
- this.logger.fine("Released lock.");
- }
-}
-
diff --git a/src/org/torproject/ernie/cron/LoggingConfiguration.java b/src/org/torproject/ernie/cron/LoggingConfiguration.java
deleted file mode 100644
index c261d95..0000000
--- a/src/org/torproject/ernie/cron/LoggingConfiguration.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/* Copyright 2011, 2012 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.ernie.cron;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-import java.util.logging.ConsoleHandler;
-import java.util.logging.FileHandler;
-import java.util.logging.Formatter;
-import java.util.logging.Handler;
-import java.util.logging.Level;
-import java.util.logging.LogRecord;
-import java.util.logging.Logger;
-
-/**
- * Initialize logging configuration.
- *
- * Log levels used by ERNIE:
- *
- * - SEVERE: An event made it impossible to continue program execution.
- * - WARNING: A potential problem occurred that requires the operator to
- * look after the otherwise unattended setup
- * - INFO: Messages on INFO level are meant to help the operator in making
- * sure that operation works as expected.
- * - FINE: Debug messages that are used to identify problems and which are
- * turned on by default.
- * - FINER: More detailed debug messages to investigate problems in more
- * detail. Not turned on by default. Increase log file limit when using
- * FINER.
- * - FINEST: Most detailed debug messages. Not used.
- */
-public class LoggingConfiguration {
-
- public LoggingConfiguration() {
-
- /* Remove default console handler. */
- for (Handler h : Logger.getLogger("").getHandlers()) {
- Logger.getLogger("").removeHandler(h);
- }
-
- /* Disable logging of internal Sun classes. */
- Logger.getLogger("sun").setLevel(Level.OFF);
-
- /* Set minimum log level we care about from INFO to FINER. */
- Logger.getLogger("").setLevel(Level.FINER);
-
- /* Create log handler that writes messages on WARNING or higher to the
- * console. */
- final SimpleDateFormat dateTimeFormat =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- Formatter cf = new Formatter() {
- public String format(LogRecord record) {
- return dateTimeFormat.format(new Date(record.getMillis())) + " "
- + record.getMessage() + "\n";
- }
- };
- Handler ch = new ConsoleHandler();
- ch.setFormatter(cf);
- ch.setLevel(Level.WARNING);
- Logger.getLogger("").addHandler(ch);
-
- /* Initialize own logger for this class. */
- Logger logger = Logger.getLogger(
- LoggingConfiguration.class.getName());
-
- /* Create log handler that writes all messages on FINE or higher to a
- * local file. */
- Formatter ff = new Formatter() {
- public String format(LogRecord record) {
- return dateTimeFormat.format(new Date(record.getMillis())) + " "
- + record.getLevel() + " " + record.getSourceClassName() + " "
- + record.getSourceMethodName() + " " + record.getMessage()
- + (record.getThrown() != null ? " " + record.getThrown() : "")
- + "\n";
- }
- };
- try {
- FileHandler fh = new FileHandler("log", 5000000, 5, true);
- fh.setFormatter(ff);
- fh.setLevel(Level.FINE);
- Logger.getLogger("").addHandler(fh);
- } catch (SecurityException e) {
- logger.log(Level.WARNING, "No permission to create log file. "
- + "Logging to file is disabled.", e);
- } catch (IOException e) {
- logger.log(Level.WARNING, "Could not write to log file. Logging to "
- + "file is disabled.", e);
- }
- }
-}
-
diff --git a/src/org/torproject/ernie/cron/Main.java b/src/org/torproject/ernie/cron/Main.java
deleted file mode 100644
index 5d561a6..0000000
--- a/src/org/torproject/ernie/cron/Main.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/* Copyright 2011, 2012 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.ernie.cron;
-
-import java.io.File;
-import java.util.logging.Logger;
-
-import org.torproject.ernie.cron.network.ConsensusStatsFileHandler;
-import org.torproject.ernie.cron.performance.PerformanceStatsImporter;
-import org.torproject.ernie.cron.performance.TorperfProcessor;
-
-/**
- * Coordinate downloading and parsing of descriptors and extraction of
- * statistically relevant data for later processing with R.
- */
-public class Main {
- public static void main(String[] args) {
-
- /* Initialize logging configuration. */
- new LoggingConfiguration();
-
- Logger logger = Logger.getLogger(Main.class.getName());
- logger.info("Starting ERNIE.");
-
- // Initialize configuration
- Configuration config = new Configuration();
-
- // Use lock file to avoid overlapping runs
- LockFile lf = new LockFile();
- if (!lf.acquireLock()) {
- logger.severe("Warning: ERNIE is already running or has not exited "
- + "cleanly! Exiting!");
- System.exit(1);
- }
-
- // Define stats directory for temporary files
- File statsDirectory = new File("stats");
-
- // Import relay descriptors
- if (config.getImportDirectoryArchives()) {
- RelayDescriptorDatabaseImporter rddi =
- config.getWriteRelayDescriptorDatabase() ||
- config.getWriteRelayDescriptorsRawFiles() ?
- new RelayDescriptorDatabaseImporter(
- config.getWriteRelayDescriptorDatabase() ?
- config.getRelayDescriptorDatabaseJDBC() : null,
- config.getWriteRelayDescriptorsRawFiles() ?
- config.getRelayDescriptorRawFilesDirectory() : null,
- new File(config.getDirectoryArchivesDirectory()),
- statsDirectory,
- config.getKeepDirectoryArchiveImportHistory()) : null;
- if (rddi != null) {
- rddi.importRelayDescriptors();
- }
- rddi.closeConnection();
-
- // Import conn-bi-direct statistics.
- PerformanceStatsImporter psi = new PerformanceStatsImporter(
- config.getWriteRelayDescriptorDatabase() ?
- config.getRelayDescriptorDatabaseJDBC() : null,
- config.getWriteRelayDescriptorsRawFiles() ?
- config.getRelayDescriptorRawFilesDirectory() : null,
- new File(config.getDirectoryArchivesDirectory()),
- statsDirectory,
- config.getKeepDirectoryArchiveImportHistory());
- psi.importRelayDescriptors();
- psi.closeConnection();
- }
-
- // Prepare consensus stats file handler (used for stats on running
- // bridges only)
- ConsensusStatsFileHandler csfh = config.getWriteBridgeStats() ?
- new ConsensusStatsFileHandler(
- config.getRelayDescriptorDatabaseJDBC(),
- new File(config.getSanitizedBridgesDirectory()),
- statsDirectory, config.getKeepSanitizedBridgesImportHistory()) :
- null;
-
- // Import sanitized bridges and write updated stats files to disk
- if (csfh != null) {
- if (config.getImportSanitizedBridges()) {
- csfh.importSanitizedBridges();
- }
- csfh.writeFiles();
- csfh = null;
- }
-
- // Import and process torperf stats
- if (config.getImportWriteTorperfStats()) {
- new TorperfProcessor(new File(config.getTorperfDirectory()),
- statsDirectory, config.getRelayDescriptorDatabaseJDBC());
- }
-
- // Remove lock file
- lf.releaseLock();
-
- logger.info("Terminating ERNIE.");
- }
-}
-
diff --git a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
deleted file mode 100644
index a51092e..0000000
--- a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
+++ /dev/null
@@ -1,1077 +0,0 @@
-/* Copyright 2011, 2012 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.ernie.cron;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.sql.CallableStatement;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TimeZone;
-import java.util.TreeSet;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.postgresql.util.PGbytea;
-import org.torproject.descriptor.Descriptor;
-import org.torproject.descriptor.DescriptorFile;
-import org.torproject.descriptor.DescriptorReader;
-import org.torproject.descriptor.DescriptorSourceFactory;
-import org.torproject.descriptor.ExtraInfoDescriptor;
-import org.torproject.descriptor.NetworkStatusEntry;
-import org.torproject.descriptor.RelayNetworkStatusConsensus;
-import org.torproject.descriptor.ServerDescriptor;
-
-/**
- * Parse directory data.
- */
-
-/* TODO Split up this class and move its parts to cron.network,
- * cron.users, and status.relaysearch packages. Requires extensive
- * changes to the database schema though. */
-public final class RelayDescriptorDatabaseImporter {
-
- /**
- * How many records to commit with each database transaction.
- */
- private final long autoCommitCount = 500;
-
- /**
- * Keep track of the number of records committed before each transaction
- */
- private int rdsCount = 0;
- private int resCount = 0;
- private int rhsCount = 0;
- private int rrsCount = 0;
- private int rcsCount = 0;
- private int rvsCount = 0;
- private int rqsCount = 0;
-
- /**
- * Relay descriptor database connection.
- */
- private Connection conn;
-
- /**
- * Prepared statement to check whether any network status consensus
- * entries matching a given valid-after time have been imported into the
- * database before.
- */
- private PreparedStatement psSs;
-
- /**
- * Prepared statement to check whether a given network status consensus
- * entry has been imported into the database before.
- */
- private PreparedStatement psRs;
-
- /**
- * Prepared statement to check whether a given server descriptor has
- * been imported into the database before.
- */
- private PreparedStatement psDs;
-
- /**
- * Prepared statement to check whether a given network status consensus
- * has been imported into the database before.
- */
- private PreparedStatement psCs;
-
- /**
- * Prepared statement to check whether a given dirreq stats string has
- * been imported into the database before.
- */
- private PreparedStatement psQs;
-
- /**
- * Set of dates that have been inserted into the database for being
- * included in the next refresh run.
- */
- private Set<Long> scheduledUpdates;
-
- /**
- * Prepared statement to insert a date into the database that shall be
- * included in the next refresh run.
- */
- private PreparedStatement psU;
-
- /**
- * Prepared statement to insert a network status consensus entry into
- * the database.
- */
- private PreparedStatement psR;
-
- /**
- * Prepared statement to insert a server descriptor into the database.
- */
- private PreparedStatement psD;
-
- /**
- * Callable statement to insert the bandwidth history of an extra-info
- * descriptor into the database.
- */
- private CallableStatement csH;
-
- /**
- * Prepared statement to insert a network status consensus into the
- * database.
- */
- private PreparedStatement psC;
-
- /**
- * Prepared statement to insert a given dirreq stats string into the
- * database.
- */
- private PreparedStatement psQ;
-
- /**
- * Logger for this class.
- */
- private Logger logger;
-
- /**
- * Directory for writing raw import files.
- */
- private String rawFilesDirectory;
-
- /**
- * Raw import file containing status entries.
- */
- private BufferedWriter statusentryOut;
-
- /**
- * Raw import file containing server descriptors.
- */
- private BufferedWriter descriptorOut;
-
- /**
- * Raw import file containing bandwidth histories.
- */
- private BufferedWriter bwhistOut;
-
- /**
- * Raw import file containing consensuses.
- */
- private BufferedWriter consensusOut;
-
- /**
- * Raw import file containing dirreq stats.
- */
- private BufferedWriter dirReqOut;
-
- /**
- * Date format to parse timestamps.
- */
- private SimpleDateFormat dateTimeFormat;
-
- /**
- * The last valid-after time for which we checked whether they have been
- * any network status entries in the database.
- */
- private long lastCheckedStatusEntries;
-
- /**
- * Set of fingerprints that we imported for the valid-after time in
- * <code>lastCheckedStatusEntries</code>.
- */
- private Set<String> insertedStatusEntries;
-
- /**
- * Flag that tells us whether we need to check whether a network status
- * entry is already contained in the database or not.
- */
- private boolean separateStatusEntryCheckNecessary;
-
- private boolean importIntoDatabase;
- private boolean writeRawImportFiles;
-
- private File archivesDirectory;
- private File statsDirectory;
- private boolean keepImportHistory;
-
- /**
- * Initialize database importer by connecting to the database and
- * preparing statements.
- */
- public RelayDescriptorDatabaseImporter(String connectionURL,
- String rawFilesDirectory, File archivesDirectory,
- File statsDirectory, boolean keepImportHistory) {
-
- if (archivesDirectory == null ||
- statsDirectory == null) {
- throw new IllegalArgumentException();
- }
- this.archivesDirectory = archivesDirectory;
- this.statsDirectory = statsDirectory;
- this.keepImportHistory = keepImportHistory;
-
- /* Initialize logger. */
- this.logger = Logger.getLogger(
- RelayDescriptorDatabaseImporter.class.getName());
-
- if (connectionURL != null) {
- try {
- /* Connect to database. */
- this.conn = DriverManager.getConnection(connectionURL);
-
- /* Turn autocommit off */
- this.conn.setAutoCommit(false);
-
- /* Prepare statements. */
- this.psSs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM statusentry WHERE validafter = ?");
- this.psRs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM statusentry WHERE validafter = ? AND "
- + "fingerprint = ?");
- this.psDs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM descriptor WHERE descriptor = ?");
- this.psCs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM consensus WHERE validafter = ?");
- this.psQs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM dirreq_stats WHERE source = ? AND statsend = ?");
- this.psR = conn.prepareStatement("INSERT INTO statusentry "
- + "(validafter, nickname, fingerprint, descriptor, "
- + "published, address, orport, dirport, isauthority, "
- + "isbadexit, isbaddirectory, isexit, isfast, isguard, "
- + "ishsdir, isnamed, isstable, isrunning, isunnamed, "
- + "isvalid, isv2dir, isv3dir, version, bandwidth, ports, "
- + "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
- + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
- this.psD = conn.prepareStatement("INSERT INTO descriptor "
- + "(descriptor, nickname, address, orport, dirport, "
- + "fingerprint, bandwidthavg, bandwidthburst, "
- + "bandwidthobserved, platform, published, uptime, "
- + "extrainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
- + "?)");
- this.csH = conn.prepareCall("{call insert_bwhist(?, ?, ?, ?, ?, "
- + "?)}");
- this.psC = conn.prepareStatement("INSERT INTO consensus "
- + "(validafter) VALUES (?)");
- this.psQ = conn.prepareStatement("INSERT INTO dirreq_stats "
- + "(source, statsend, seconds, country, requests) VALUES "
- + "(?, ?, ?, ?, ?)");
- this.psU = conn.prepareStatement("INSERT INTO scheduled_updates "
- + "(date) VALUES (?)");
- this.scheduledUpdates = new HashSet<Long>();
- this.importIntoDatabase = true;
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not connect to database or "
- + "prepare statements.", e);
- }
-
- /* Initialize set of fingerprints to remember which status entries
- * we already imported. */
- this.insertedStatusEntries = new HashSet<String>();
- }
-
- /* Remember where we want to write raw import files. */
- if (rawFilesDirectory != null) {
- this.rawFilesDirectory = rawFilesDirectory;
- this.writeRawImportFiles = true;
- }
-
- /* Initialize date format, so that we can format timestamps. */
- this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- }
-
- private void addDateToScheduledUpdates(long timestamp)
- throws SQLException {
- if (!this.importIntoDatabase) {
- return;
- }
- long dateMillis = 0L;
- try {
- dateMillis = this.dateTimeFormat.parse(
- this.dateTimeFormat.format(timestamp).substring(0, 10)
- + " 00:00:00").getTime();
- } catch (ParseException e) {
- this.logger.log(Level.WARNING, "Internal parsing error.", e);
- return;
- }
- if (!this.scheduledUpdates.contains(dateMillis)) {
- this.psU.setDate(1, new java.sql.Date(dateMillis));
- this.psU.execute();
- this.scheduledUpdates.add(dateMillis);
- }
- }
-
- /**
- * Insert network status consensus entry into database.
- */
- public void addStatusEntry(long validAfter, String nickname,
- String fingerprint, String descriptor, long published,
- String address, long orPort, long dirPort,
- SortedSet<String> flags, String version, long bandwidth,
- String ports, byte[] rawDescriptor) {
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(validAfter);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp validAfterTimestamp = new Timestamp(validAfter);
- if (lastCheckedStatusEntries != validAfter) {
- this.psSs.setTimestamp(1, validAfterTimestamp, cal);
- ResultSet rs = psSs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- separateStatusEntryCheckNecessary = false;
- insertedStatusEntries.clear();
- } else {
- separateStatusEntryCheckNecessary = true;
- }
- rs.close();
- lastCheckedStatusEntries = validAfter;
- }
- boolean alreadyContained = false;
- if (separateStatusEntryCheckNecessary ||
- insertedStatusEntries.contains(fingerprint)) {
- this.psRs.setTimestamp(1, validAfterTimestamp, cal);
- this.psRs.setString(2, fingerprint);
- ResultSet rs = psRs.executeQuery();
- rs.next();
- if (rs.getInt(1) > 0) {
- alreadyContained = true;
- }
- rs.close();
- } else {
- insertedStatusEntries.add(fingerprint);
- }
- if (!alreadyContained) {
- this.psR.clearParameters();
- this.psR.setTimestamp(1, validAfterTimestamp, cal);
- this.psR.setString(2, nickname);
- this.psR.setString(3, fingerprint);
- this.psR.setString(4, descriptor);
- this.psR.setTimestamp(5, new Timestamp(published), cal);
- this.psR.setString(6, address);
- this.psR.setLong(7, orPort);
- this.psR.setLong(8, dirPort);
- this.psR.setBoolean(9, flags.contains("Authority"));
- this.psR.setBoolean(10, flags.contains("BadExit"));
- this.psR.setBoolean(11, flags.contains("BadDirectory"));
- this.psR.setBoolean(12, flags.contains("Exit"));
- this.psR.setBoolean(13, flags.contains("Fast"));
- this.psR.setBoolean(14, flags.contains("Guard"));
- this.psR.setBoolean(15, flags.contains("HSDir"));
- this.psR.setBoolean(16, flags.contains("Named"));
- this.psR.setBoolean(17, flags.contains("Stable"));
- this.psR.setBoolean(18, flags.contains("Running"));
- this.psR.setBoolean(19, flags.contains("Unnamed"));
- this.psR.setBoolean(20, flags.contains("Valid"));
- this.psR.setBoolean(21, flags.contains("V2Dir"));
- this.psR.setBoolean(22, flags.contains("V3Dir"));
- this.psR.setString(23, version);
- this.psR.setLong(24, bandwidth);
- this.psR.setString(25, ports);
- this.psR.setBytes(26, rawDescriptor);
- this.psR.executeUpdate();
- rrsCount++;
- if (rrsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add network status "
- + "consensus entry. We won't make any further SQL requests "
- + "in this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.statusentryOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.statusentryOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/statusentry.sql"));
- this.statusentryOut.write(" COPY statusentry (validafter, "
- + "nickname, fingerprint, descriptor, published, address, "
- + "orport, dirport, isauthority, isbadExit, "
- + "isbaddirectory, isexit, isfast, isguard, ishsdir, "
- + "isnamed, isstable, isrunning, isunnamed, isvalid, "
- + "isv2dir, isv3dir, version, bandwidth, ports, rawdesc) "
- + "FROM stdin;\n");
- }
- this.statusentryOut.write(
- this.dateTimeFormat.format(validAfter) + "\t" + nickname
- + "\t" + fingerprint.toLowerCase() + "\t"
- + descriptor.toLowerCase() + "\t"
- + this.dateTimeFormat.format(published) + "\t" + address
- + "\t" + orPort + "\t" + dirPort + "\t"
- + (flags.contains("Authority") ? "t" : "f") + "\t"
- + (flags.contains("BadExit") ? "t" : "f") + "\t"
- + (flags.contains("BadDirectory") ? "t" : "f") + "\t"
- + (flags.contains("Exit") ? "t" : "f") + "\t"
- + (flags.contains("Fast") ? "t" : "f") + "\t"
- + (flags.contains("Guard") ? "t" : "f") + "\t"
- + (flags.contains("HSDir") ? "t" : "f") + "\t"
- + (flags.contains("Named") ? "t" : "f") + "\t"
- + (flags.contains("Stable") ? "t" : "f") + "\t"
- + (flags.contains("Running") ? "t" : "f") + "\t"
- + (flags.contains("Unnamed") ? "t" : "f") + "\t"
- + (flags.contains("Valid") ? "t" : "f") + "\t"
- + (flags.contains("V2Dir") ? "t" : "f") + "\t"
- + (flags.contains("V3Dir") ? "t" : "f") + "\t"
- + (version != null ? version : "\\N") + "\t"
- + (bandwidth >= 0 ? bandwidth : "\\N") + "\t"
- + (ports != null ? ports : "\\N") + "\t");
- this.statusentryOut.write(PGbytea.toPGString(rawDescriptor).
- replaceAll("\\\\", "\\\\\\\\") + "\n");
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not write network status "
- + "consensus entry to raw database import file. We won't "
- + "make any further attempts to write raw import files in "
- + "this execution.", e);
- this.writeRawImportFiles = false;
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write network status "
- + "consensus entry to raw database import file. We won't "
- + "make any further attempts to write raw import files in "
- + "this execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- /**
- * Insert server descriptor into database.
- */
- public void addServerDescriptor(String descriptor, String nickname,
- String address, int orPort, int dirPort, String relayIdentifier,
- long bandwidthAvg, long bandwidthBurst, long bandwidthObserved,
- String platform, long published, long uptime,
- String extraInfoDigest) {
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(published);
- this.addDateToScheduledUpdates(
- published + 24L * 60L * 60L * 1000L);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- this.psDs.setString(1, descriptor);
- ResultSet rs = psDs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- this.psD.clearParameters();
- this.psD.setString(1, descriptor);
- this.psD.setString(2, nickname);
- this.psD.setString(3, address);
- this.psD.setInt(4, orPort);
- this.psD.setInt(5, dirPort);
- this.psD.setString(6, relayIdentifier);
- this.psD.setLong(7, bandwidthAvg);
- this.psD.setLong(8, bandwidthBurst);
- this.psD.setLong(9, bandwidthObserved);
- /* Remove all non-ASCII characters from the platform string, or
- * we'll make Postgres unhappy. Sun's JDK and OpenJDK behave
- * differently when creating a new String with a given encoding.
- * That's what the regexp below is for. */
- this.psD.setString(10, new String(platform.getBytes(),
- "US-ASCII").replaceAll("[^\\p{ASCII}]",""));
- this.psD.setTimestamp(11, new Timestamp(published), cal);
- this.psD.setLong(12, uptime);
- this.psD.setString(13, extraInfoDigest);
- this.psD.executeUpdate();
- rdsCount++;
- if (rdsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (UnsupportedEncodingException e) {
- // US-ASCII is supported for sure
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add server "
- + "descriptor. We won't make any further SQL requests in "
- + "this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.descriptorOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.descriptorOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/descriptor.sql"));
- this.descriptorOut.write(" COPY descriptor (descriptor, "
- + "nickname, address, orport, dirport, fingerprint, "
- + "bandwidthavg, bandwidthburst, bandwidthobserved, "
- + "platform, published, uptime, extrainfo) FROM stdin;\n");
- }
- this.descriptorOut.write(descriptor.toLowerCase() + "\t"
- + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
- + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t"
- + bandwidthBurst + "\t" + bandwidthObserved + "\t"
- + (platform != null && platform.length() > 0
- ? new String(platform.getBytes(), "US-ASCII") : "\\N")
- + "\t" + this.dateTimeFormat.format(published) + "\t"
- + (uptime >= 0 ? uptime : "\\N") + "\t"
- + (extraInfoDigest != null ? extraInfoDigest : "\\N")
- + "\n");
- } catch (UnsupportedEncodingException e) {
- // US-ASCII is supported for sure
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write server "
- + "descriptor to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- /**
- * Insert extra-info descriptor into database.
- */
- public void addExtraInfoDescriptor(String extraInfoDigest,
- String nickname, String fingerprint, long published,
- List<String> bandwidthHistoryLines) {
- if (!bandwidthHistoryLines.isEmpty()) {
- this.addBandwidthHistory(fingerprint.toLowerCase(), published,
- bandwidthHistoryLines);
- }
- }
-
- private static class BigIntArray implements java.sql.Array {
-
- private final String stringValue;
-
- public BigIntArray(long[] array, int offset) {
- if (array == null) {
- this.stringValue = "[-1:-1]={0}";
- } else {
- StringBuilder sb = new StringBuilder("[" + offset + ":"
- + (offset + array.length - 1) + "]={");
- for (int i = 0; i < array.length; i++) {
- sb.append((i > 0 ? "," : "") + array[i]);
- }
- sb.append('}');
- this.stringValue = sb.toString();
- }
- }
-
- public String toString() {
- return stringValue;
- }
-
- public String getBaseTypeName() {
- return "int8";
- }
-
- /* The other methods are never called; no need to implement them. */
- public void free() {
- throw new UnsupportedOperationException();
- }
- public Object getArray() {
- throw new UnsupportedOperationException();
- }
- public Object getArray(long index, int count) {
- throw new UnsupportedOperationException();
- }
- public Object getArray(long index, int count,
- Map<String, Class<?>> map) {
- throw new UnsupportedOperationException();
- }
- public Object getArray(Map<String, Class<?>> map) {
- throw new UnsupportedOperationException();
- }
- public int getBaseType() {
- throw new UnsupportedOperationException();
- }
- public ResultSet getResultSet() {
- throw new UnsupportedOperationException();
- }
- public ResultSet getResultSet(long index, int count) {
- throw new UnsupportedOperationException();
- }
- public ResultSet getResultSet(long index, int count,
- Map<String, Class<?>> map) {
- throw new UnsupportedOperationException();
- }
- public ResultSet getResultSet(Map<String, Class<?>> map) {
- throw new UnsupportedOperationException();
- }
- }
-
- public void addBandwidthHistory(String fingerprint, long published,
- List<String> bandwidthHistoryStrings) {
-
- /* Split history lines by date and rewrite them so that the date
- * comes first. */
- SortedSet<String> historyLinesByDate = new TreeSet<String>();
- for (String bandwidthHistoryString : bandwidthHistoryStrings) {
- String[] parts = bandwidthHistoryString.split(" ");
- if (parts.length != 6) {
- this.logger.finer("Bandwidth history line does not have expected "
- + "number of elements. Ignoring this line.");
- continue;
- }
- long intervalLength = 0L;
- try {
- intervalLength = Long.parseLong(parts[3].substring(1));
- } catch (NumberFormatException e) {
- this.logger.fine("Bandwidth history line does not have valid "
- + "interval length '" + parts[3] + " " + parts[4] + "'. "
- + "Ignoring this line.");
- continue;
- }
- if (intervalLength != 900L) {
- this.logger.fine("Bandwidth history line does not consist of "
- + "15-minute intervals. Ignoring this line.");
- continue;
- }
- String type = parts[0];
- String intervalEndTime = parts[1] + " " + parts[2];
- long intervalEnd, dateStart;
- try {
- intervalEnd = dateTimeFormat.parse(intervalEndTime).getTime();
- dateStart = dateTimeFormat.parse(parts[1] + " 00:00:00").
- getTime();
- } catch (ParseException e) {
- this.logger.fine("Parse exception while parsing timestamp in "
- + "bandwidth history line. Ignoring this line.");
- continue;
- }
- if (Math.abs(published - intervalEnd) >
- 7L * 24L * 60L * 60L * 1000L) {
- this.logger.fine("Extra-info descriptor publication time "
- + dateTimeFormat.format(published) + " and last interval "
- + "time " + intervalEndTime + " in " + type + " line differ "
- + "by more than 7 days! Not adding this line!");
- continue;
- }
- long currentIntervalEnd = intervalEnd;
- StringBuilder sb = new StringBuilder();
- String[] values = parts[5].split(",");
- SortedSet<String> newHistoryLines = new TreeSet<String>();
- try {
- for (int i = values.length - 1; i >= -1; i--) {
- if (i == -1 || currentIntervalEnd < dateStart) {
- sb.insert(0, intervalEndTime + " " + type + " ("
- + intervalLength + " s) ");
- sb.setLength(sb.length() - 1);
- String historyLine = sb.toString();
- newHistoryLines.add(historyLine);
- sb = new StringBuilder();
- dateStart -= 24L * 60L * 60L * 1000L;
- intervalEndTime = dateTimeFormat.format(currentIntervalEnd);
- }
- if (i == -1) {
- break;
- }
- Long.parseLong(values[i]);
- sb.insert(0, values[i] + ",");
- currentIntervalEnd -= intervalLength * 1000L;
- }
- } catch (NumberFormatException e) {
- this.logger.fine("Number format exception while parsing "
- + "bandwidth history line. Ignoring this line.");
- continue;
- }
- historyLinesByDate.addAll(newHistoryLines);
- }
-
- /* Add split history lines to database. */
- String lastDate = null;
- historyLinesByDate.add("EOL");
- long[] readArray = null, writtenArray = null, dirreadArray = null,
- dirwrittenArray = null;
- int readOffset = 0, writtenOffset = 0, dirreadOffset = 0,
- dirwrittenOffset = 0;
- for (String historyLine : historyLinesByDate) {
- String[] parts = historyLine.split(" ");
- String currentDate = parts[0];
- if (lastDate != null && (historyLine.equals("EOL") ||
- !currentDate.equals(lastDate))) {
- BigIntArray readIntArray = new BigIntArray(readArray,
- readOffset);
- BigIntArray writtenIntArray = new BigIntArray(writtenArray,
- writtenOffset);
- BigIntArray dirreadIntArray = new BigIntArray(dirreadArray,
- dirreadOffset);
- BigIntArray dirwrittenIntArray = new BigIntArray(dirwrittenArray,
- dirwrittenOffset);
- if (this.importIntoDatabase) {
- try {
- long dateMillis = dateTimeFormat.parse(lastDate
- + " 00:00:00").getTime();
- this.addDateToScheduledUpdates(dateMillis);
- this.csH.setString(1, fingerprint);
- this.csH.setDate(2, new java.sql.Date(dateMillis));
- this.csH.setArray(3, readIntArray);
- this.csH.setArray(4, writtenIntArray);
- this.csH.setArray(5, dirreadIntArray);
- this.csH.setArray(6, dirwrittenIntArray);
- this.csH.addBatch();
- rhsCount++;
- if (rhsCount % autoCommitCount == 0) {
- this.csH.executeBatch();
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not insert bandwidth "
- + "history line into database. We won't make any "
- + "further SQL requests in this execution.", e);
- this.importIntoDatabase = false;
- } catch (ParseException e) {
- this.logger.log(Level.WARNING, "Could not insert bandwidth "
- + "history line into database. We won't make any "
- + "further SQL requests in this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.bwhistOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.bwhistOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/bwhist.sql"));
- }
- this.bwhistOut.write("SELECT insert_bwhist('" + fingerprint
- + "','" + lastDate + "','" + readIntArray.toString()
- + "','" + writtenIntArray.toString() + "','"
- + dirreadIntArray.toString() + "','"
- + dirwrittenIntArray.toString() + "');\n");
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write bandwidth "
- + "history to raw database import file. We won't make "
- + "any further attempts to write raw import files in "
- + "this execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- readArray = writtenArray = dirreadArray = dirwrittenArray = null;
- }
- if (historyLine.equals("EOL")) {
- break;
- }
- long lastIntervalTime;
- try {
- lastIntervalTime = dateTimeFormat.parse(parts[0] + " "
- + parts[1]).getTime() - dateTimeFormat.parse(parts[0]
- + " 00:00:00").getTime();
- } catch (ParseException e) {
- continue;
- }
- String[] stringValues = parts[5].split(",");
- long[] longValues = new long[stringValues.length];
- for (int i = 0; i < longValues.length; i++) {
- longValues[i] = Long.parseLong(stringValues[i]);
- }
-
- int offset = (int) (lastIntervalTime / (15L * 60L * 1000L))
- - longValues.length + 1;
- String type = parts[2];
- if (type.equals("read-history")) {
- readArray = longValues;
- readOffset = offset;
- } else if (type.equals("write-history")) {
- writtenArray = longValues;
- writtenOffset = offset;
- } else if (type.equals("dirreq-read-history")) {
- dirreadArray = longValues;
- dirreadOffset = offset;
- } else if (type.equals("dirreq-write-history")) {
- dirwrittenArray = longValues;
- dirwrittenOffset = offset;
- }
- lastDate = currentDate;
- }
- }
-
- /**
- * Insert network status consensus into database.
- */
- public void addConsensus(long validAfter) {
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(validAfter);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp validAfterTimestamp = new Timestamp(validAfter);
- this.psCs.setTimestamp(1, validAfterTimestamp, cal);
- ResultSet rs = psCs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- this.psC.clearParameters();
- this.psC.setTimestamp(1, validAfterTimestamp, cal);
- this.psC.executeUpdate();
- rcsCount++;
- if (rcsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add network status "
- + "consensus. We won't make any further SQL requests in "
- + "this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.consensusOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.consensusOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/consensus.sql"));
- this.consensusOut.write(" COPY consensus (validafter) "
- + "FROM stdin;\n");
- }
- String validAfterString = this.dateTimeFormat.format(validAfter);
- this.consensusOut.write(validAfterString + "\n");
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write network status "
- + "consensus to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- /**
- * Adds observations on the number of directory requests by country as
- * seen on a directory at a given date to the database.
- */
- public void addDirReqStats(String source, long statsEndMillis,
- long seconds, Map<String, String> dirReqsPerCountry) {
- String statsEnd = this.dateTimeFormat.format(statsEndMillis);
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(statsEndMillis);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp statsEndTimestamp = new Timestamp(statsEndMillis);
- this.psQs.setString(1, source);
- this.psQs.setTimestamp(2, statsEndTimestamp, cal);
- ResultSet rs = psQs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- for (Map.Entry<String, String> e :
- dirReqsPerCountry.entrySet()) {
- this.psQ.clearParameters();
- this.psQ.setString(1, source);
- this.psQ.setTimestamp(2, statsEndTimestamp, cal);
- this.psQ.setLong(3, seconds);
- this.psQ.setString(4, e.getKey());
- this.psQ.setLong(5, Long.parseLong(e.getValue()));
- this.psQ.executeUpdate();
- rqsCount++;
- if (rqsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add dirreq stats. We "
- + "won't make any further SQL requests in this execution.",
- e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.dirReqOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.dirReqOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/dirreq_stats.sql"));
- this.dirReqOut.write(" COPY dirreq_stats (source, statsend, "
- + "seconds, country, requests) FROM stdin;\n");
- }
- for (Map.Entry<String, String> e :
- dirReqsPerCountry.entrySet()) {
- this.dirReqOut.write(source + "\t" + statsEnd + "\t" + seconds
- + "\t" + e.getKey() + "\t" + e.getValue() + "\n");
- }
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write dirreq stats to "
- + "raw database import file. We won't make any further "
- + "attempts to write raw import files in this execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- public void importRelayDescriptors() {
- if (archivesDirectory.exists()) {
- logger.fine("Importing files in directory " + archivesDirectory
- + "/...");
- DescriptorReader reader =
- DescriptorSourceFactory.createDescriptorReader();
- reader.addDirectory(archivesDirectory);
- if (keepImportHistory) {
- reader.setExcludeFiles(new File(statsDirectory,
- "database-importer-relay-descriptor-history"));
- }
- Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
- while (descriptorFiles.hasNext()) {
- DescriptorFile descriptorFile = descriptorFiles.next();
- if (descriptorFile.getDescriptors() != null) {
- for (Descriptor descriptor : descriptorFile.getDescriptors()) {
- if (descriptor instanceof RelayNetworkStatusConsensus) {
- this.addRelayNetworkStatusConsensus(
- (RelayNetworkStatusConsensus) descriptor);
- } else if (descriptor instanceof ServerDescriptor) {
- this.addServerDescriptor((ServerDescriptor) descriptor);
- } else if (descriptor instanceof ExtraInfoDescriptor) {
- this.addExtraInfoDescriptor(
- (ExtraInfoDescriptor) descriptor);
- }
- }
- }
- }
- }
-
- logger.info("Finished importing relay descriptors.");
- }
-
- private void addRelayNetworkStatusConsensus(
- RelayNetworkStatusConsensus consensus) {
- for (NetworkStatusEntry statusEntry :
- consensus.getStatusEntries().values()) {
- this.addStatusEntry(consensus.getValidAfterMillis(),
- statusEntry.getNickname(),
- statusEntry.getFingerprint().toLowerCase(),
- statusEntry.getDescriptor().toLowerCase(),
- statusEntry.getPublishedMillis(), statusEntry.getAddress(),
- statusEntry.getOrPort(), statusEntry.getDirPort(),
- statusEntry.getFlags(), statusEntry.getVersion(),
- statusEntry.getBandwidth(), statusEntry.getPortList(),
- statusEntry.getStatusEntryBytes());
- }
- this.addConsensus(consensus.getValidAfterMillis());
- }
-
- private void addServerDescriptor(ServerDescriptor descriptor) {
- this.addServerDescriptor(descriptor.getServerDescriptorDigest(),
- descriptor.getNickname(), descriptor.getAddress(),
- descriptor.getOrPort(), descriptor.getDirPort(),
- descriptor.getFingerprint(), descriptor.getBandwidthRate(),
- descriptor.getBandwidthBurst(), descriptor.getBandwidthObserved(),
- descriptor.getPlatform(), descriptor.getPublishedMillis(),
- descriptor.getUptime(), descriptor.getExtraInfoDigest());
- }
-
- private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) {
- if (descriptor.getDirreqV3Reqs() != null) {
- int allUsers = 0;
- Map<String, String> obs = new HashMap<String, String>();
- for (Map.Entry<String, Integer> e :
- descriptor.getDirreqV3Reqs().entrySet()) {
- String country = e.getKey();
- int users = e.getValue() - 4;
- allUsers += users;
- obs.put(country, "" + users);
- }
- obs.put("zy", "" + allUsers);
- this.addDirReqStats(descriptor.getFingerprint(),
- descriptor.getDirreqStatsEndMillis(),
- descriptor.getDirreqStatsIntervalLength(), obs);
- }
- List<String> bandwidthHistoryLines = new ArrayList<String>();
- if (descriptor.getWriteHistory() != null) {
- bandwidthHistoryLines.add(descriptor.getWriteHistory().getLine());
- }
- if (descriptor.getReadHistory() != null) {
- bandwidthHistoryLines.add(descriptor.getReadHistory().getLine());
- }
- if (descriptor.getDirreqWriteHistory() != null) {
- bandwidthHistoryLines.add(
- descriptor.getDirreqWriteHistory().getLine());
- }
- if (descriptor.getDirreqReadHistory() != null) {
- bandwidthHistoryLines.add(
- descriptor.getDirreqReadHistory().getLine());
- }
- this.addExtraInfoDescriptor(descriptor.getExtraInfoDigest(),
- descriptor.getNickname(),
- descriptor.getFingerprint().toLowerCase(),
- descriptor.getPublishedMillis(), bandwidthHistoryLines);
- }
-
- /**
- * Close the relay descriptor database connection.
- */
- public void closeConnection() {
-
- /* Log stats about imported descriptors. */
- this.logger.info(String.format("Finished importing relay "
- + "descriptors: %d consensuses, %d network status entries, %d "
- + "votes, %d server descriptors, %d extra-info descriptors, %d "
- + "bandwidth history elements, and %d dirreq stats elements",
- rcsCount, rrsCount, rvsCount, rdsCount, resCount, rhsCount,
- rqsCount));
-
- /* Insert scheduled updates a second time, just in case the refresh
- * run has started since inserting them the first time in which case
- * it will miss the data inserted afterwards. We cannot, however,
- * insert them only now, because if a Java execution fails at a random
- * point, we might have added data, but not the corresponding dates to
- * update statistics. */
- if (this.importIntoDatabase) {
- try {
- for (long dateMillis : this.scheduledUpdates) {
- this.psU.setDate(1, new java.sql.Date(dateMillis));
- this.psU.execute();
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add scheduled dates "
- + "for the next refresh run.", e);
- }
- }
-
- /* Commit any stragglers before closing. */
- if (this.conn != null) {
- try {
- this.csH.executeBatch();
-
- this.conn.commit();
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not commit final records to "
- + "database", e);
- }
- try {
- this.conn.close();
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not close database "
- + "connection.", e);
- }
- }
-
- /* Close raw import files. */
- try {
- if (this.statusentryOut != null) {
- this.statusentryOut.write("\\.\n");
- this.statusentryOut.close();
- }
- if (this.descriptorOut != null) {
- this.descriptorOut.write("\\.\n");
- this.descriptorOut.close();
- }
- if (this.bwhistOut != null) {
- this.bwhistOut.write("\\.\n");
- this.bwhistOut.close();
- }
- if (this.consensusOut != null) {
- this.consensusOut.write("\\.\n");
- this.consensusOut.close();
- }
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not close one or more raw "
- + "database import files.", e);
- }
- }
-}
-
diff --git a/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java b/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java
deleted file mode 100644
index d5cae37..0000000
--- a/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/* Copyright 2011, 2012 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.ernie.cron.network;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.torproject.descriptor.BridgeNetworkStatus;
-import org.torproject.descriptor.Descriptor;
-import org.torproject.descriptor.DescriptorFile;
-import org.torproject.descriptor.DescriptorReader;
-import org.torproject.descriptor.DescriptorSourceFactory;
-import org.torproject.descriptor.NetworkStatusEntry;
-
-/**
- * Generates statistics on the average number of relays and bridges per
- * day. Accepts parse results from <code>RelayDescriptorParser</code> and
- * <code>BridgeDescriptorParser</code> and stores them in intermediate
- * result files <code>stats/consensus-stats-raw</code> and
- * <code>stats/bridge-consensus-stats-raw</code>. Writes final results to
- * <code>stats/consensus-stats</code> for all days for which at least half
- * of the expected consensuses or statuses are known.
- */
-public class ConsensusStatsFileHandler {
-
- /**
- * Intermediate results file holding the number of running bridges per
- * bridge status.
- */
- private File bridgeConsensusStatsRawFile;
-
- /**
- * Number of running bridges in a given bridge status. Map keys are
- * bridge status times formatted as "yyyy-MM-dd HH:mm:ss", map values
- * are lines as read from <code>stats/bridge-consensus-stats-raw</code>.
- */
- private SortedMap<String, String> bridgesRaw;
-
- /**
- * Average number of running bridges per day. Map keys are dates
- * formatted as "yyyy-MM-dd", map values are the last column as written
- * to <code>stats/consensus-stats</code>.
- */
- private SortedMap<String, String> bridgesPerDay;
-
- /**
- * Logger for this class.
- */
- private Logger logger;
-
- private int bridgeResultsAdded = 0;
-
- /* Database connection string. */
- private String connectionURL = null;
-
- private SimpleDateFormat dateTimeFormat;
-
- private File bridgesDir;
-
- private File statsDirectory;
-
- private boolean keepImportHistory;
-
- /**
- * Initializes this class, including reading in intermediate results
- * files <code>stats/consensus-stats-raw</code> and
- * <code>stats/bridge-consensus-stats-raw</code> and final results file
- * <code>stats/consensus-stats</code>.
- */
- public ConsensusStatsFileHandler(String connectionURL,
- File bridgesDir, File statsDirectory,
- boolean keepImportHistory) {
-
- if (bridgesDir == null || statsDirectory == null) {
- throw new IllegalArgumentException();
- }
- this.bridgesDir = bridgesDir;
- this.statsDirectory = statsDirectory;
- this.keepImportHistory = keepImportHistory;
-
- /* Initialize local data structures to hold intermediate and final
- * results. */
- this.bridgesPerDay = new TreeMap<String, String>();
- this.bridgesRaw = new TreeMap<String, String>();
-
- /* Initialize file names for intermediate and final results files. */
- this.bridgeConsensusStatsRawFile = new File(
- "stats/bridge-consensus-stats-raw");
-
- /* Initialize database connection string. */
- this.connectionURL = connectionURL;
-
- this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-
- /* Initialize logger. */
- this.logger = Logger.getLogger(
- ConsensusStatsFileHandler.class.getName());
-
- /* Read in number of running bridges per bridge status. */
- if (this.bridgeConsensusStatsRawFile.exists()) {
- try {
- this.logger.fine("Reading file "
- + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "...");
- BufferedReader br = new BufferedReader(new FileReader(
- this.bridgeConsensusStatsRawFile));
- String line = null;
- while ((line = br.readLine()) != null) {
- if (line.startsWith("date")) {
- /* Skip headers. */
- continue;
- }
- String[] parts = line.split(",");
- String dateTime = parts[0];
- if (parts.length == 2) {
- this.bridgesRaw.put(dateTime, line + ",0");
- } else if (parts.length == 3) {
- this.bridgesRaw.put(dateTime, line);
- } else {
- this.logger.warning("Corrupt line '" + line + "' in file "
- + this.bridgeConsensusStatsRawFile.getAbsolutePath()
- + "! Aborting to read this file!");
- break;
- }
- }
- br.close();
- this.logger.fine("Finished reading file "
- + this.bridgeConsensusStatsRawFile.getAbsolutePath() + ".");
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Failed to read file "
- + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "!",
- e);
- }
- }
- }
-
- /**
- * Adds the intermediate results of the number of running bridges in a
- * given bridge status to the existing observations.
- */
- public void addBridgeConsensusResults(long publishedMillis, int running,
- int runningEc2Bridges) {
- String published = dateTimeFormat.format(publishedMillis);
- String line = published + "," + running + "," + runningEc2Bridges;
- if (!this.bridgesRaw.containsKey(published)) {
- this.logger.finer("Adding new bridge numbers: " + line);
- this.bridgesRaw.put(published, line);
- this.bridgeResultsAdded++;
- } else if (!line.equals(this.bridgesRaw.get(published))) {
- this.logger.warning("The numbers of running bridges we were just "
- + "given (" + line + ") are different from what we learned "
- + "before (" + this.bridgesRaw.get(published) + ")! "
- + "Overwriting!");
- this.bridgesRaw.put(published, line);
- }
- }
-
- public void importSanitizedBridges() {
- if (bridgesDir.exists()) {
- logger.fine("Importing files in directory " + bridgesDir + "/...");
- DescriptorReader reader =
- DescriptorSourceFactory.createDescriptorReader();
- reader.addDirectory(bridgesDir);
- if (keepImportHistory) {
- reader.setExcludeFiles(new File(statsDirectory,
- "consensus-stats-bridge-descriptor-history"));
- }
- Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
- while (descriptorFiles.hasNext()) {
- DescriptorFile descriptorFile = descriptorFiles.next();
- if (descriptorFile.getDescriptors() != null) {
- for (Descriptor descriptor : descriptorFile.getDescriptors()) {
- if (descriptor instanceof BridgeNetworkStatus) {
- this.addBridgeNetworkStatus(
- (BridgeNetworkStatus) descriptor);
- }
- }
- }
- }
- logger.info("Finished importing bridge descriptors.");
- }
- }
-
- private void addBridgeNetworkStatus(BridgeNetworkStatus status) {
- int runningBridges = 0, runningEc2Bridges = 0;
- for (NetworkStatusEntry statusEntry :
- status.getStatusEntries().values()) {
- if (statusEntry.getFlags().contains("Running")) {
- runningBridges++;
- if (statusEntry.getNickname().startsWith("ec2bridge")) {
- runningEc2Bridges++;
- }
- }
- }
- this.addBridgeConsensusResults(status.getPublishedMillis(),
- runningBridges, runningEc2Bridges);
- }
-
- /**
- * Aggregates the raw observations on relay and bridge numbers and
- * writes both raw and aggregate observations to disk.
- */
- public void writeFiles() {
-
- /* Go through raw observations of numbers of running bridges in bridge
- * statuses, calculate averages per day, and add these averages to
- * final results. */
- if (!this.bridgesRaw.isEmpty()) {
- String tempDate = null;
- int brunning = 0, brunningEc2 = 0, statuses = 0;
- Iterator<String> it = this.bridgesRaw.values().iterator();
- boolean haveWrittenFinalLine = false;
- while (it.hasNext() || !haveWrittenFinalLine) {
- String next = it.hasNext() ? it.next() : null;
- /* Finished reading a day or even all lines? */
- if (tempDate != null && (next == null
- || !next.substring(0, 10).equals(tempDate))) {
- /* Only write results if we have seen at least half of all
- * statuses. */
- if (statuses >= 24) {
- String line = "," + (brunning / statuses) + ","
- + (brunningEc2 / statuses);
- /* Are our results new? */
- if (!this.bridgesPerDay.containsKey(tempDate)) {
- this.logger.finer("Adding new average bridge numbers: "
- + tempDate + line);
- this.bridgesPerDay.put(tempDate, line);
- } else if (!line.equals(this.bridgesPerDay.get(tempDate))) {
- this.logger.finer("Replacing existing average bridge "
- + "numbers (" + this.bridgesPerDay.get(tempDate)
- + " with new numbers: " + line);
- this.bridgesPerDay.put(tempDate, line);
- }
- }
- brunning = brunningEc2 = statuses = 0;
- haveWrittenFinalLine = (next == null);
- }
- /* Sum up number of running bridges. */
- if (next != null) {
- tempDate = next.substring(0, 10);
- statuses++;
- String[] parts = next.split(",");
- brunning += Integer.parseInt(parts[1]);
- brunningEc2 += Integer.parseInt(parts[2]);
- }
- }
- }
-
- /* Write raw numbers of running bridges to disk. */
- try {
- this.logger.fine("Writing file "
- + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "...");
- this.bridgeConsensusStatsRawFile.getParentFile().mkdirs();
- BufferedWriter bw = new BufferedWriter(
- new FileWriter(this.bridgeConsensusStatsRawFile));
- bw.append("datetime,brunning,brunningec2\n");
- for (String line : this.bridgesRaw.values()) {
- bw.append(line + "\n");
- }
- bw.close();
- this.logger.fine("Finished writing file "
- + this.bridgeConsensusStatsRawFile.getAbsolutePath() + ".");
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Failed to write file "
- + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "!",
- e);
- }
-
- /* Add average number of bridges per day to the database. */
- if (connectionURL != null) {
- try {
- Map<String, String> insertRows = new HashMap<String, String>(),
- updateRows = new HashMap<String, String>();
- insertRows.putAll(this.bridgesPerDay);
- Connection conn = DriverManager.getConnection(connectionURL);
- conn.setAutoCommit(false);
- Statement statement = conn.createStatement();
- ResultSet rs = statement.executeQuery(
- "SELECT date, avg_running, avg_running_ec2 "
- + "FROM bridge_network_size");
- while (rs.next()) {
- String date = rs.getDate(1).toString();
- if (insertRows.containsKey(date)) {
- String insertRow = insertRows.remove(date);
- String[] parts = insertRow.substring(1).split(",");
- long newAvgRunning = Long.parseLong(parts[0]);
- long newAvgRunningEc2 = Long.parseLong(parts[1]);
- long oldAvgRunning = rs.getLong(2);
- long oldAvgRunningEc2 = rs.getLong(3);
- if (newAvgRunning != oldAvgRunning ||
- newAvgRunningEc2 != oldAvgRunningEc2) {
- updateRows.put(date, insertRow);
- }
- }
- }
- rs.close();
- PreparedStatement psU = conn.prepareStatement(
- "UPDATE bridge_network_size SET avg_running = ?, "
- + "avg_running_ec2 = ? WHERE date = ?");
- for (Map.Entry<String, String> e : updateRows.entrySet()) {
- java.sql.Date date = java.sql.Date.valueOf(e.getKey());
- String[] parts = e.getValue().substring(1).split(",");
- long avgRunning = Long.parseLong(parts[0]);
- long avgRunningEc2 = Long.parseLong(parts[1]);
- psU.clearParameters();
- psU.setLong(1, avgRunning);
- psU.setLong(2, avgRunningEc2);
- psU.setDate(3, date);
- psU.executeUpdate();
- }
- PreparedStatement psI = conn.prepareStatement(
- "INSERT INTO bridge_network_size (avg_running, "
- + "avg_running_ec2, date) VALUES (?, ?, ?)");
- for (Map.Entry<String, String> e : insertRows.entrySet()) {
- java.sql.Date date = java.sql.Date.valueOf(e.getKey());
- String[] parts = e.getValue().substring(1).split(",");
- long avgRunning = Long.parseLong(parts[0]);
- long avgRunningEc2 = Long.parseLong(parts[1]);
- psI.clearParameters();
- psI.setLong(1, avgRunning);
- psI.setLong(2, avgRunningEc2);
- psI.setDate(3, date);
- psI.executeUpdate();
- }
- conn.commit();
- conn.close();
- } catch (SQLException e) {
- logger.log(Level.WARNING, "Failed to add average bridge numbers "
- + "to database.", e);
- }
- }
-
- /* Write stats. */
- StringBuilder dumpStats = new StringBuilder("Finished writing "
- + "statistics on bridge network statuses to disk.\nAdded "
- + this.bridgeResultsAdded + " bridge network status(es) in this "
- + "execution.");
- long now = System.currentTimeMillis();
- SimpleDateFormat dateTimeFormat =
- new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- if (this.bridgesRaw.isEmpty()) {
- dumpStats.append("\nNo bridge status known yet.");
- } else {
- dumpStats.append("\nLast known bridge status was published "
- + this.bridgesRaw.lastKey() + ".");
- try {
- if (now - 6L * 60L * 60L * 1000L > dateTimeFormat.parse(
- this.bridgesRaw.lastKey()).getTime()) {
- logger.warning("Last known bridge status is more than 6 hours "
- + "old: " + this.bridgesRaw.lastKey());
- }
- } catch (ParseException e) {
- /* Can't parse the timestamp? Whatever. */
- }
- }
- logger.info(dumpStats.toString());
- }
-}
-
diff --git a/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java b/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java
deleted file mode 100644
index 815b37f..0000000
--- a/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/* Copyright 2012 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.ernie.cron.performance;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Iterator;
-import java.util.TimeZone;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.torproject.descriptor.Descriptor;
-import org.torproject.descriptor.DescriptorFile;
-import org.torproject.descriptor.DescriptorReader;
-import org.torproject.descriptor.DescriptorSourceFactory;
-import org.torproject.descriptor.ExtraInfoDescriptor;
-
-public class PerformanceStatsImporter {
-
- /**
- * How many records to commit with each database transaction.
- */
- private final long autoCommitCount = 500;
-
- /**
- * Keep track of the number of records committed before each transaction
- */
- private int rbsCount = 0;
-
- /**
- * Relay descriptor database connection.
- */
- private Connection conn;
-
- /**
- * Prepared statement to check whether a given conn-bi-direct stats
- * string has been imported into the database before.
- */
- private PreparedStatement psBs;
-
- /**
- * Prepared statement to insert a conn-bi-direct stats string into the
- * database.
- */
- private PreparedStatement psB;
-
- /**
- * Logger for this class.
- */
- private Logger logger;
-
- /**
- * Directory for writing raw import files.
- */
- private String rawFilesDirectory;
-
- /**
- * Raw import file containing conn-bi-direct stats strings.
- */
- private BufferedWriter connBiDirectOut;
-
- /**
- * Date format to parse timestamps.
- */
- private SimpleDateFormat dateTimeFormat;
-
- private boolean importIntoDatabase;
- private boolean writeRawImportFiles;
-
- private File archivesDirectory;
- private File statsDirectory;
- private boolean keepImportHistory;
-
- /**
- * Initialize database importer by connecting to the database and
- * preparing statements.
- */
- public PerformanceStatsImporter(String connectionURL,
- String rawFilesDirectory, File archivesDirectory,
- File statsDirectory, boolean keepImportHistory) {
-
- if (archivesDirectory == null ||
- statsDirectory == null) {
- throw new IllegalArgumentException();
- }
- this.archivesDirectory = archivesDirectory;
- this.statsDirectory = statsDirectory;
- this.keepImportHistory = keepImportHistory;
-
- /* Initialize logger. */
- this.logger = Logger.getLogger(
- PerformanceStatsImporter.class.getName());
-
- if (connectionURL != null) {
- try {
- /* Connect to database. */
- this.conn = DriverManager.getConnection(connectionURL);
-
- /* Turn autocommit off */
- this.conn.setAutoCommit(false);
-
- /* Prepare statements. */
- this.psBs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM connbidirect WHERE source = ? AND statsend = ?");
- this.psB = conn.prepareStatement("INSERT INTO connbidirect "
- + "(source, statsend, seconds, belownum, readnum, writenum, "
- + "bothnum) VALUES (?, ?, ?, ?, ?, ?, ?)");
- this.importIntoDatabase = true;
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not connect to database or "
- + "prepare statements.", e);
- }
- }
-
- /* Remember where we want to write raw import files. */
- if (rawFilesDirectory != null) {
- this.rawFilesDirectory = rawFilesDirectory;
- this.writeRawImportFiles = true;
- }
-
- /* Initialize date format, so that we can format timestamps. */
- this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- }
-
- /**
- * Insert a conn-bi-direct stats string into the database.
- */
- private void addConnBiDirect(String source, long statsEndMillis,
- long seconds, long below, long read, long write, long both) {
- String statsEnd = this.dateTimeFormat.format(statsEndMillis);
- if (this.importIntoDatabase) {
- try {
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp statsEndTimestamp = new Timestamp(statsEndMillis);
- this.psBs.setString(1, source);
- this.psBs.setTimestamp(2, statsEndTimestamp, cal);
- ResultSet rs = psBs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- this.psB.clearParameters();
- this.psB.setString(1, source);
- this.psB.setTimestamp(2, statsEndTimestamp, cal);
- this.psB.setLong(3, seconds);
- this.psB.setLong(4, below);
- this.psB.setLong(5, read);
- this.psB.setLong(6, write);
- this.psB.setLong(7, both);
- this.psB.executeUpdate();
- rbsCount++;
- if (rbsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add conn-bi-direct "
- + "stats string. We won't make any further SQL requests in "
- + "this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.connBiDirectOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.connBiDirectOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/connbidirect.sql"));
- this.connBiDirectOut.write(" COPY connbidirect (source, "
- + "statsend, seconds, belownum, readnum, writenum, "
- + "bothnum) FROM stdin;\n");
- }
- this.connBiDirectOut.write(source + "\t" + statsEnd + "\t"
- + seconds + "\t" + below + "\t" + read + "\t" + write + "\t"
- + both + "\n");
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write conn-bi-direct "
- + "stats string to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- public void importRelayDescriptors() {
- if (archivesDirectory.exists()) {
- logger.fine("Importing files in directory " + archivesDirectory
- + "/...");
- DescriptorReader reader =
- DescriptorSourceFactory.createDescriptorReader();
- reader.addDirectory(archivesDirectory);
- if (keepImportHistory) {
- reader.setExcludeFiles(new File(statsDirectory,
- "performance-stats-relay-descriptor-history"));
- }
- Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
- while (descriptorFiles.hasNext()) {
- DescriptorFile descriptorFile = descriptorFiles.next();
- if (descriptorFile.getDescriptors() != null) {
- for (Descriptor descriptor : descriptorFile.getDescriptors()) {
- if (descriptor instanceof ExtraInfoDescriptor) {
- this.addExtraInfoDescriptor(
- (ExtraInfoDescriptor) descriptor);
- }
- }
- }
- }
- }
-
- logger.info("Finished importing relay descriptors.");
- }
-
- private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) {
- if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) {
- this.addConnBiDirect(descriptor.getFingerprint(),
- descriptor.getConnBiDirectStatsEndMillis(),
- descriptor.getConnBiDirectStatsIntervalLength(),
- descriptor.getConnBiDirectBelow(),
- descriptor.getConnBiDirectRead(),
- descriptor.getConnBiDirectWrite(),
- descriptor.getConnBiDirectBoth());
- }
- }
-
- /**
- * Close the relay descriptor database connection.
- */
- public void closeConnection() {
-
- /* Log stats about imported descriptors. */
- this.logger.info(String.format("Finished importing relay "
- + "descriptors: %d conn-bi-direct stats lines", rbsCount));
-
- /* Commit any stragglers before closing. */
- if (this.conn != null) {
- try {
- this.conn.commit();
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not commit final records "
- + "to database", e);
- }
- try {
- this.conn.close();
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not close database "
- + "connection.", e);
- }
- }
-
- /* Close raw import files. */
- try {
- if (this.connBiDirectOut != null) {
- this.connBiDirectOut.write("\\.\n");
- this.connBiDirectOut.close();
- }
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not close one or more raw "
- + "database import files.", e);
- }
- }
-}
diff --git a/src/org/torproject/ernie/cron/performance/TorperfProcessor.java b/src/org/torproject/ernie/cron/performance/TorperfProcessor.java
deleted file mode 100644
index d7322db..0000000
--- a/src/org/torproject/ernie/cron/performance/TorperfProcessor.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/* Copyright 2011, 2012 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.ernie.cron.performance;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.torproject.descriptor.Descriptor;
-import org.torproject.descriptor.DescriptorFile;
-import org.torproject.descriptor.DescriptorReader;
-import org.torproject.descriptor.DescriptorSourceFactory;
-import org.torproject.descriptor.TorperfResult;
-
-public class TorperfProcessor {
- public TorperfProcessor(File torperfDirectory, File statsDirectory,
- String connectionURL) {
-
- if (torperfDirectory == null || statsDirectory == null) {
- throw new IllegalArgumentException();
- }
-
- Logger logger = Logger.getLogger(TorperfProcessor.class.getName());
- File rawFile = new File(statsDirectory, "torperf-raw");
- File statsFile = new File(statsDirectory, "torperf-stats");
- SortedMap<String, String> rawObs = new TreeMap<String, String>();
- SortedMap<String, String> stats = new TreeMap<String, String>();
- int addedRawObs = 0;
- SimpleDateFormat formatter =
- new SimpleDateFormat("yyyy-MM-dd,HH:mm:ss");
- formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
- try {
- if (rawFile.exists()) {
- logger.fine("Reading file " + rawFile.getAbsolutePath() + "...");
- BufferedReader br = new BufferedReader(new FileReader(rawFile));
- String line = br.readLine(); // ignore header
- while ((line = br.readLine()) != null) {
- if (line.split(",").length != 4) {
- logger.warning("Corrupt line in " + rawFile.getAbsolutePath()
- + "!");
- break;
- }
- String key = line.substring(0, line.lastIndexOf(","));
- rawObs.put(key, line);
- }
- br.close();
- logger.fine("Finished reading file " + rawFile.getAbsolutePath()
- + ".");
- }
- if (statsFile.exists()) {
- logger.fine("Reading file " + statsFile.getAbsolutePath()
- + "...");
- BufferedReader br = new BufferedReader(new FileReader(statsFile));
- String line = br.readLine(); // ignore header
- while ((line = br.readLine()) != null) {
- String key = line.split(",")[0] + "," + line.split(",")[1];
- stats.put(key, line);
- }
- br.close();
- logger.fine("Finished reading file " + statsFile.getAbsolutePath()
- + ".");
- }
- if (torperfDirectory.exists()) {
- logger.fine("Importing files in " + torperfDirectory + "/...");
- DescriptorReader descriptorReader =
- DescriptorSourceFactory.createDescriptorReader();
- descriptorReader.addDirectory(torperfDirectory);
- descriptorReader.setExcludeFiles(new File(statsDirectory,
- "torperf-history"));
- Iterator<DescriptorFile> descriptorFiles =
- descriptorReader.readDescriptors();
- while (descriptorFiles.hasNext()) {
- DescriptorFile descriptorFile = descriptorFiles.next();
- if (descriptorFile.getException() != null) {
- logger.log(Level.FINE, "Error parsing file.",
- descriptorFile.getException());
- continue;
- }
- for (Descriptor descriptor : descriptorFile.getDescriptors()) {
- if (!(descriptor instanceof TorperfResult)) {
- continue;
- }
- TorperfResult result = (TorperfResult) descriptor;
- String source = result.getSource();
- long fileSize = result.getFileSize();
- if (fileSize == 51200) {
- source += "-50kb";
- } else if (fileSize == 1048576) {
- source += "-1mb";
- } else if (fileSize == 5242880) {
- source += "-5mb";
- } else {
- logger.fine("Unexpected file size '" + fileSize
- + "'. Skipping.");
- continue;
- }
- String dateTime = formatter.format(result.getStartMillis());
- long completeMillis = result.getDataCompleteMillis()
- - result.getStartMillis();
- String key = source + "," + dateTime;
- String value = key;
- if ((result.didTimeout() == null &&
- result.getDataCompleteMillis() < 1) ||
- (result.didTimeout() != null && result.didTimeout())) {
- value += ",-2"; // -2 for timeout
- } else if (result.getReadBytes() < fileSize) {
- value += ",-1"; // -1 for failure
- } else {
- value += "," + completeMillis;
- }
- if (!rawObs.containsKey(key)) {
- rawObs.put(key, value);
- addedRawObs++;
- }
- }
- }
- logger.fine("Finished importing files in " + torperfDirectory
- + "/.");
- }
- if (rawObs.size() > 0) {
- logger.fine("Writing file " + rawFile.getAbsolutePath() + "...");
- rawFile.getParentFile().mkdirs();
- BufferedWriter bw = new BufferedWriter(new FileWriter(rawFile));
- bw.append("source,date,start,completemillis\n");
- String tempSourceDate = null;
- Iterator<Map.Entry<String, String>> it =
- rawObs.entrySet().iterator();
- List<Long> dlTimes = new ArrayList<Long>();
- boolean haveWrittenFinalLine = false;
- SortedMap<String, List<Long>> dlTimesAllSources =
- new TreeMap<String, List<Long>>();
- SortedMap<String, long[]> statusesAllSources =
- new TreeMap<String, long[]>();
- long failures = 0, timeouts = 0, requests = 0;
- while (it.hasNext() || !haveWrittenFinalLine) {
- Map.Entry<String, String> next = it.hasNext() ? it.next() : null;
- if (tempSourceDate != null
- && (next == null || !(next.getValue().split(",")[0] + ","
- + next.getValue().split(",")[1]).equals(tempSourceDate))) {
- if (dlTimes.size() > 4) {
- Collections.sort(dlTimes);
- long q1 = dlTimes.get(dlTimes.size() / 4 - 1);
- long md = dlTimes.get(dlTimes.size() / 2 - 1);
- long q3 = dlTimes.get(dlTimes.size() * 3 / 4 - 1);
- stats.put(tempSourceDate, tempSourceDate + "," + q1 + ","
- + md + "," + q3 + "," + timeouts + "," + failures + ","
- + requests);
- String allSourceDate = "all" + tempSourceDate.substring(
- tempSourceDate.indexOf("-"));
- if (dlTimesAllSources.containsKey(allSourceDate)) {
- dlTimesAllSources.get(allSourceDate).addAll(dlTimes);
- } else {
- dlTimesAllSources.put(allSourceDate, dlTimes);
- }
- if (statusesAllSources.containsKey(allSourceDate)) {
- long[] status = statusesAllSources.get(allSourceDate);
- status[0] += timeouts;
- status[1] += failures;
- status[2] += requests;
- } else {
- long[] status = new long[3];
- status[0] = timeouts;
- status[1] = failures;
- status[2] = requests;
- statusesAllSources.put(allSourceDate, status);
- }
- }
- dlTimes = new ArrayList<Long>();
- failures = timeouts = requests = 0;
- if (next == null) {
- haveWrittenFinalLine = true;
- }
- }
- if (next != null) {
- bw.append(next.getValue() + "\n");
- String[] parts = next.getValue().split(",");
- tempSourceDate = parts[0] + "," + parts[1];
- long completeMillis = Long.parseLong(parts[3]);
- if (completeMillis == -2L) {
- timeouts++;
- } else if (completeMillis == -1L) {
- failures++;
- } else {
- dlTimes.add(Long.parseLong(parts[3]));
- }
- requests++;
- }
- }
- bw.close();
- for (Map.Entry<String, List<Long>> e :
- dlTimesAllSources.entrySet()) {
- String allSourceDate = e.getKey();
- dlTimes = e.getValue();
- Collections.sort(dlTimes);
- long q1 = dlTimes.get(dlTimes.size() / 4 - 1);
- long md = dlTimes.get(dlTimes.size() / 2 - 1);
- long q3 = dlTimes.get(dlTimes.size() * 3 / 4 - 1);
- long[] status = statusesAllSources.get(allSourceDate);
- timeouts = status[0];
- failures = status[1];
- requests = status[2];
- stats.put(allSourceDate, allSourceDate + "," + q1 + "," + md
- + "," + q3 + "," + timeouts + "," + failures + ","
- + requests);
- }
- logger.fine("Finished writing file " + rawFile.getAbsolutePath()
- + ".");
- }
- if (stats.size() > 0) {
- logger.fine("Writing file " + statsFile.getAbsolutePath()
- + "...");
- statsFile.getParentFile().mkdirs();
- BufferedWriter bw = new BufferedWriter(new FileWriter(statsFile));
- bw.append("source,date,q1,md,q3,timeouts,failures,requests\n");
- for (String s : stats.values()) {
- bw.append(s + "\n");
- }
- bw.close();
- logger.fine("Finished writing file " + statsFile.getAbsolutePath()
- + ".");
- }
- } catch (IOException e) {
- logger.log(Level.WARNING, "Failed writing "
- + rawFile.getAbsolutePath() + " or "
- + statsFile.getAbsolutePath() + "!", e);
- }
-
- /* Write stats. */
- StringBuilder dumpStats = new StringBuilder("Finished writing "
- + "statistics on torperf results.\nAdded " + addedRawObs
- + " new observations in this execution.\n"
- + "Last known obserations by source and file size are:");
- String lastSource = null;
- String lastLine = null;
- for (String s : rawObs.keySet()) {
- String[] parts = s.split(",");
- if (lastSource == null) {
- lastSource = parts[0];
- } else if (!parts[0].equals(lastSource)) {
- String lastKnownObservation = lastLine.split(",")[1] + " "
- + lastLine.split(",")[2];
- dumpStats.append("\n" + lastSource + " " + lastKnownObservation);
- lastSource = parts[0];
- }
- lastLine = s;
- }
- if (lastSource != null) {
- String lastKnownObservation = lastLine.split(",")[1] + " "
- + lastLine.split(",")[2];
- dumpStats.append("\n" + lastSource + " " + lastKnownObservation);
- }
- logger.info(dumpStats.toString());
-
- /* Write results to database. */
- if (connectionURL != null) {
- try {
- Map<String, String> insertRows = new HashMap<String, String>();
- insertRows.putAll(stats);
- Set<String> updateRows = new HashSet<String>();
- Connection conn = DriverManager.getConnection(connectionURL);
- conn.setAutoCommit(false);
- Statement statement = conn.createStatement();
- ResultSet rs = statement.executeQuery(
- "SELECT date, source, q1, md, q3, timeouts, failures, "
- + "requests FROM torperf_stats");
- while (rs.next()) {
- String date = rs.getDate(1).toString();
- String source = rs.getString(2);
- String key = source + "," + date;
- if (insertRows.containsKey(key)) {
- String insertRow = insertRows.remove(key);
- String[] newStats = insertRow.split(",");
- long newQ1 = Long.parseLong(newStats[2]);
- long newMd = Long.parseLong(newStats[3]);
- long newQ3 = Long.parseLong(newStats[4]);
- long newTimeouts = Long.parseLong(newStats[5]);
- long newFailures = Long.parseLong(newStats[6]);
- long newRequests = Long.parseLong(newStats[7]);
- long oldQ1 = rs.getLong(3);
- long oldMd = rs.getLong(4);
- long oldQ3 = rs.getLong(5);
- long oldTimeouts = rs.getLong(6);
- long oldFailures = rs.getLong(7);
- long oldRequests = rs.getLong(8);
- if (newQ1 != oldQ1 || newMd != oldMd || newQ3 != oldQ3 ||
- newTimeouts != oldTimeouts ||
- newFailures != oldFailures ||
- newRequests != oldRequests) {
- updateRows.add(insertRow);
- }
- }
- }
- PreparedStatement psU = conn.prepareStatement(
- "UPDATE torperf_stats SET q1 = ?, md = ?, q3 = ?, "
- + "timeouts = ?, failures = ?, requests = ? "
- + "WHERE date = ? AND source = ?");
- for (String row : updateRows) {
- String[] newStats = row.split(",");
- String source = newStats[0];
- java.sql.Date date = java.sql.Date.valueOf(newStats[1]);
- long q1 = Long.parseLong(newStats[2]);
- long md = Long.parseLong(newStats[3]);
- long q3 = Long.parseLong(newStats[4]);
- long timeouts = Long.parseLong(newStats[5]);
- long failures = Long.parseLong(newStats[6]);
- long requests = Long.parseLong(newStats[7]);
- psU.clearParameters();
- psU.setLong(1, q1);
- psU.setLong(2, md);
- psU.setLong(3, q3);
- psU.setLong(4, timeouts);
- psU.setLong(5, failures);
- psU.setLong(6, requests);
- psU.setDate(7, date);
- psU.setString(8, source);
- psU.executeUpdate();
- }
- PreparedStatement psI = conn.prepareStatement(
- "INSERT INTO torperf_stats (q1, md, q3, timeouts, failures, "
- + "requests, date, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
- for (String row : insertRows.values()) {
- String[] newStats = row.split(",");
- String source = newStats[0];
- java.sql.Date date = java.sql.Date.valueOf(newStats[1]);
- long q1 = Long.parseLong(newStats[2]);
- long md = Long.parseLong(newStats[3]);
- long q3 = Long.parseLong(newStats[4]);
- long timeouts = Long.parseLong(newStats[5]);
- long failures = Long.parseLong(newStats[6]);
- long requests = Long.parseLong(newStats[7]);
- psI.clearParameters();
- psI.setLong(1, q1);
- psI.setLong(2, md);
- psI.setLong(3, q3);
- psI.setLong(4, timeouts);
- psI.setLong(5, failures);
- psI.setLong(6, requests);
- psI.setDate(7, date);
- psI.setString(8, source);
- psI.executeUpdate();
- }
- conn.commit();
- conn.close();
- } catch (SQLException e) {
- logger.log(Level.WARNING, "Failed to add torperf stats to "
- + "database.", e);
- }
- }
- }
-}
-