commit 58cf97dd1bddc1caa00731867ea3300b3a08d141 Author: Karsten Loesing karsten.loesing@gmx.net Date: Tue Jan 21 18:27:39 2014 +0100
Move plain Java classes to modules/legacy/ subdirectory. --- build.xml | 71 -- config.template | 47 - db/tordir.sql | 1005 ------------------ modules/legacy/build.xml | 55 + modules/legacy/config.template | 47 + modules/legacy/db/tordir.sql | 1005 ++++++++++++++++++ .../org/torproject/ernie/cron/Configuration.java | 163 +++ .../src/org/torproject/ernie/cron/LockFile.java | 52 + .../ernie/cron/LoggingConfiguration.java | 94 ++ .../legacy/src/org/torproject/ernie/cron/Main.java | 100 ++ .../cron/RelayDescriptorDatabaseImporter.java | 1077 ++++++++++++++++++++ .../cron/network/ConsensusStatsFileHandler.java | 380 +++++++ .../cron/performance/PerformanceStatsImporter.java | 271 +++++ .../ernie/cron/performance/TorperfProcessor.java | 374 +++++++ run-web.sh | 3 + run.sh | 5 - shared/bin/01-rsync-descriptors.sh | 3 + shared/bin/10-legacy.sh | 10 + shared/bin/99-copy-stats-files.sh | 4 + src/org/torproject/ernie/cron/Configuration.java | 163 --- src/org/torproject/ernie/cron/LockFile.java | 52 - .../ernie/cron/LoggingConfiguration.java | 94 -- src/org/torproject/ernie/cron/Main.java | 100 -- .../cron/RelayDescriptorDatabaseImporter.java | 1077 -------------------- .../cron/network/ConsensusStatsFileHandler.java | 380 ------- .../cron/performance/PerformanceStatsImporter.java | 271 ----- .../ernie/cron/performance/TorperfProcessor.java | 374 ------- 27 files changed, 3638 insertions(+), 3639 deletions(-)
diff --git a/build.xml b/build.xml deleted file mode 100644 index 393e2d8..0000000 --- a/build.xml +++ /dev/null @@ -1,71 +0,0 @@ -<project default="run" name="metrics-web" basedir="."> - - <!-- Define build paths. --> - <property name="sources" value="src"/> - <property name="classes" value="classes"/> - <property name="config" value="etc"/> - <property name="warfile" value="ernie.war"/> - <path id="classpath"> - <pathelement path="${classes}"/> - <fileset dir="/usr/share/java"> - <include name="commons-codec.jar"/> - <include name="commons-compress.jar"/> - <include name="postgresql-jdbc3.jar"/> - <include name="junit4.jar"/> - <include name="servlet-api-3.0.jar"/> - <include name="commons-lang.jar"/> - </fileset> - <fileset dir="deps/metrics-lib"> - <include name="descriptor.jar"/> - </fileset> - <fileset dir="lib"> - <include name="REngine.jar"/> - <include name="RserveEngine.jar"/> - </fileset> - </path> - - <target name="init"> - <copy file="config.template" tofile="config"/> - <mkdir dir="${classes}"/> - </target> - <target name="metrics-lib"> - <ant dir="deps/metrics-lib"/> - </target> - - <!-- Compile all plain Java classes. --> - <target name="compile" depends="metrics-lib,init"> - <javac destdir="${classes}" - srcdir="${sources}" - source="1.5" - target="1.5" - debug="true" - deprecation="true" - optimize="false" - failonerror="true" - includeantruntime="false"> - <classpath refid="classpath"/> - </javac> - </target> - - <!-- Prepare data for being displayed on the website. --> - <target name="run" depends="compile"> - <java fork="true" - maxmemory="1024m" - classname="org.torproject.ernie.cron.Main"> - <classpath refid="classpath"/> - </java> - </target> - - <!-- Run unit tests. --> - <target name="test" depends="compile"> - <junit haltonfailure="true" printsummary="off"> - <classpath refid="classpath"/> - <formatter type="plain" usefile="false"/> - <batchtest> - <fileset dir="${classes}" - includes="**/*Test.class"/> - </batchtest> - </junit> - </target> -</project> - diff --git a/config.template b/config.template deleted file mode 100644 index 8f0789b..0000000 --- a/config.template +++ /dev/null @@ -1,47 +0,0 @@ -## Import directory archives from disk, if available -#ImportDirectoryArchives 0 -# -## Relative path to directory to import directory archives from -#DirectoryArchivesDirectory in/relay-descriptors/ -# -## Keep a history of imported directory archive files to know which files -## have been imported before. This history can be useful when importing -## from a changing source to avoid importing descriptors over and over -## again, but it can be confusing to users who don't know about it. -#KeepDirectoryArchiveImportHistory 0 -# -## Import sanitized bridges from disk, if available -#ImportSanitizedBridges 0 -# -## Relative path to directory to import sanitized bridges from -#SanitizedBridgesDirectory in/bridge-descriptors/ -# -## Keep a history of imported sanitized bridge descriptors. This history -## can be useful when importing from a changing data source to avoid -## importing descriptors more than once, but it can be confusing to users -## who don't know about it. -#KeepSanitizedBridgesImportHistory 0 -# -## Write relay descriptors to the database -#WriteRelayDescriptorDatabase 0 -# -## JDBC string for relay descriptor database -#RelayDescriptorDatabaseJDBC jdbc:postgresql://localhost/tordir?user=metrics&password=password -# -## Write relay descriptors to raw text files for importing them into the -## database using PostgreSQL's \copy command -#WriteRelayDescriptorsRawFiles 0 -# -## Relative path to directory to write raw text files; note that existing -## files will be overwritten! -#RelayDescriptorRawFilesDirectory pg-import/ -# -## Write bridge stats to disk -#WriteBridgeStats 0 -# -## Import torperf data, if available, and write stats to disk -#ImportWriteTorperfStats 0 -# -## Relative path to directory to import torperf results from -#TorperfDirectory in/torperf/ -# diff --git a/db/tordir.sql b/db/tordir.sql deleted file mode 100644 index cd2ed6a..0000000 --- a/db/tordir.sql +++ /dev/null @@ -1,1005 +0,0 @@ --- Copyright 2010 The Tor Project --- See LICENSE for licensing information - -CREATE LANGUAGE plpgsql; - --- TABLE descriptor --- Contains all of the descriptors published by routers. -CREATE TABLE descriptor ( - descriptor CHARACTER(40) NOT NULL, - nickname CHARACTER VARYING(19) NOT NULL, - address CHARACTER VARYING(15) NOT NULL, - orport INTEGER NOT NULL, - dirport INTEGER NOT NULL, - fingerprint CHARACTER(40) NOT NULL, - bandwidthavg BIGINT NOT NULL, - bandwidthburst BIGINT NOT NULL, - bandwidthobserved BIGINT NOT NULL, - platform CHARACTER VARYING(256), - published TIMESTAMP WITHOUT TIME ZONE NOT NULL, - uptime BIGINT, - extrainfo CHARACTER(40), - CONSTRAINT descriptor_pkey PRIMARY KEY (descriptor) -); - --- Contains bandwidth histories reported by relays in extra-info --- descriptors. Each row contains the reported bandwidth in 15-minute --- intervals for each relay and date. -CREATE TABLE bwhist ( - fingerprint CHARACTER(40) NOT NULL, - date DATE NOT NULL, - read BIGINT[], - read_sum BIGINT, - written BIGINT[], - written_sum BIGINT, - dirread BIGINT[], - dirread_sum BIGINT, - dirwritten BIGINT[], - dirwritten_sum BIGINT, - CONSTRAINT bwhist_pkey PRIMARY KEY (fingerprint, date) -); - -CREATE INDEX bwhist_date ON bwhist (date); - --- TABLE statusentry --- Contains all of the consensus entries published by the directories. --- Each statusentry references a valid descriptor. -CREATE TABLE statusentry ( - validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL, - nickname CHARACTER VARYING(19) NOT NULL, - fingerprint CHARACTER(40) NOT NULL, - descriptor CHARACTER(40) NOT NULL, - published TIMESTAMP WITHOUT TIME ZONE NOT NULL, - address CHARACTER VARYING(15) NOT NULL, - orport INTEGER NOT NULL, - dirport INTEGER NOT NULL, - isauthority BOOLEAN DEFAULT FALSE NOT NULL, - isbadexit BOOLEAN DEFAULT FALSE NOT NULL, - isbaddirectory BOOLEAN DEFAULT FALSE NOT NULL, - isexit BOOLEAN DEFAULT FALSE NOT NULL, - isfast BOOLEAN DEFAULT FALSE NOT NULL, - isguard BOOLEAN DEFAULT FALSE NOT NULL, - ishsdir BOOLEAN DEFAULT FALSE NOT NULL, - isnamed BOOLEAN DEFAULT FALSE NOT NULL, - isstable BOOLEAN DEFAULT FALSE NOT NULL, - isrunning BOOLEAN DEFAULT FALSE NOT NULL, - isunnamed BOOLEAN DEFAULT FALSE NOT NULL, - isvalid BOOLEAN DEFAULT FALSE NOT NULL, - isv2dir BOOLEAN DEFAULT FALSE NOT NULL, - isv3dir BOOLEAN DEFAULT FALSE NOT NULL, - version CHARACTER VARYING(50), - bandwidth BIGINT, - ports TEXT, - rawdesc BYTEA NOT NULL -); - -CREATE OR REPLACE FUNCTION statusentry_insert_trigger() -RETURNS TRIGGER AS $$ - -DECLARE - tablename TEXT; - selectresult TEXT; - nextmonth TIMESTAMP WITHOUT TIME ZONE; - v_year INTEGER; - v_month INTEGER; - n_year INTEGER; - n_month INTEGER; - -BEGIN - v_year := extract(YEAR FROM NEW.validafter); - v_month := extract(MONTH FROM NEW.validafter); - tablename := 'statusentry_y' || v_year || 'm' || - TO_CHAR(NEW.validafter, 'mm'); - EXECUTE 'SELECT relname FROM pg_class WHERE relname = '''|| tablename || - '''' INTO selectresult; - IF selectresult IS NULL THEN - nextmonth := new.validafter + interval '1 month'; - n_year := extract(YEAR FROM nextmonth); - n_month := extract(MONTH FROM nextmonth); - EXECUTE 'CREATE TABLE ' || tablename || - ' ( CHECK ( validafter >= ''' || v_year || '-' || - TO_CHAR(NEW.validafter, 'mm') || '-01 00:00:00'' ' || - 'AND validafter < ''' || n_year || '-' || - TO_CHAR(nextmonth, 'mm') || - '-01 00:00:00'') ) INHERITS (statusentry)'; - EXECUTE 'ALTER TABLE ' || tablename || ' ADD CONSTRAINT ' || - tablename || '_pkey PRIMARY KEY (validafter, fingerprint)'; - EXECUTE 'CREATE INDEX ' || tablename || '_address ON ' || - tablename || ' (address)'; - EXECUTE 'CREATE INDEX ' || tablename || '_fingerprint ON ' || - tablename || ' (fingerprint)'; - EXECUTE 'CREATE INDEX ' || tablename || '_nickname ON ' || - tablename || ' (LOWER(nickname))'; - EXECUTE 'CREATE INDEX ' || tablename || '_validafter ON ' || - tablename || ' (validafter)'; - EXECUTE 'CREATE INDEX ' || tablename || '_descriptor ON ' || - tablename || ' (descriptor)'; - EXECUTE 'CREATE INDEX ' || tablename || '_validafter_date ON ' || - tablename || ' (DATE(validafter))'; - END IF; - EXECUTE 'INSERT INTO ' || tablename || ' SELECT ($1).*' USING NEW; - RETURN NULL; -END; -$$ LANGUAGE plpgsql; - -CREATE TRIGGER insert_statusentry_trigger - BEFORE INSERT ON statusentry - FOR EACH ROW EXECUTE PROCEDURE statusentry_insert_trigger(); - --- TABLE consensus --- Contains all of the consensuses published by the directories. -CREATE TABLE consensus ( - validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL, - CONSTRAINT consensus_pkey PRIMARY KEY (validafter) -); - --- TABLE connbidirect --- Contain conn-bi-direct stats strings -CREATE TABLE connbidirect ( - source CHARACTER(40) NOT NULL, - statsend TIMESTAMP WITHOUT TIME ZONE NOT NULL, - seconds INTEGER NOT NULL, - belownum BIGINT NOT NULL, - readnum BIGINT NOT NULL, - writenum BIGINT NOT NULL, - bothnum BIGINT NOT NULL, - CONSTRAINT connbidirect_pkey PRIMARY KEY (source, statsend) -); - --- TABLE network_size -CREATE TABLE network_size ( - date DATE NOT NULL, - avg_running INTEGER NOT NULL, - avg_exit INTEGER NOT NULL, - avg_guard INTEGER NOT NULL, - avg_fast INTEGER NOT NULL, - avg_stable INTEGER NOT NULL, - avg_authority INTEGER NOT NULL, - avg_badexit INTEGER NOT NULL, - avg_baddirectory INTEGER NOT NULL, - avg_hsdir INTEGER NOT NULL, - avg_named INTEGER NOT NULL, - avg_unnamed INTEGER NOT NULL, - avg_valid INTEGER NOT NULL, - avg_v2dir INTEGER NOT NULL, - avg_v3dir INTEGER NOT NULL, - CONSTRAINT network_size_pkey PRIMARY KEY(date) -); - --- TABLE relay_countries -CREATE TABLE relay_countries ( - date DATE NOT NULL, - country CHARACTER(2) NOT NULL, - relays INTEGER NOT NULL, - CONSTRAINT relay_countries_pkey PRIMARY KEY(date, country) -); - --- TABLE relay_platforms -CREATE TABLE relay_platforms ( - date DATE NOT NULL, - avg_linux INTEGER NOT NULL, - avg_darwin INTEGER NOT NULL, - avg_bsd INTEGER NOT NULL, - avg_windows INTEGER NOT NULL, - avg_other INTEGER NOT NULL, - CONSTRAINT relay_platforms_pkey PRIMARY KEY(date) -); - --- TABLE relay_versions -CREATE TABLE relay_versions ( - date DATE NOT NULL, - version CHARACTER(5) NOT NULL, - relays INTEGER NOT NULL, - CONSTRAINT relay_versions_pkey PRIMARY KEY(date, version) -); - --- TABLE total_bandwidth --- Contains information for the whole network's total bandwidth which is --- used in the bandwidth graphs. -CREATE TABLE total_bandwidth ( - date DATE NOT NULL, - bwavg BIGINT NOT NULL, - bwburst BIGINT NOT NULL, - bwobserved BIGINT NOT NULL, - bwadvertised BIGINT NOT NULL, - CONSTRAINT total_bandwidth_pkey PRIMARY KEY(date) -); - --- TABLE total_bwhist --- Contains the total number of read/written and the number of dir bytes --- read/written by all relays in the network on a given day. The dir bytes --- are an estimate based on the subset of relays that count dir bytes. -CREATE TABLE total_bwhist ( - date DATE NOT NULL, - read BIGINT, - written BIGINT, - CONSTRAINT total_bwhist_pkey PRIMARY KEY(date) -); - --- TABLE bandwidth_flags -CREATE TABLE bandwidth_flags ( - date DATE NOT NULL, - isexit BOOLEAN NOT NULL, - isguard BOOLEAN NOT NULL, - bwadvertised BIGINT NOT NULL, - CONSTRAINT bandwidth_flags_pkey PRIMARY KEY(date, isexit, isguard) -); - --- TABLE bwhist_flags -CREATE TABLE bwhist_flags ( - date DATE NOT NULL, - isexit BOOLEAN NOT NULL, - isguard BOOLEAN NOT NULL, - read BIGINT, - written BIGINT, - CONSTRAINT bwhist_flags_pkey PRIMARY KEY(date, isexit, isguard) -); - --- TABLE user_stats --- Aggregate statistics on directory requests and byte histories that we --- use to estimate user numbers. -CREATE TABLE user_stats ( - date DATE NOT NULL, - country CHARACTER(2) NOT NULL, - r BIGINT, - dw BIGINT, - dr BIGINT, - drw BIGINT, - drr BIGINT, - bw BIGINT, - br BIGINT, - bwd BIGINT, - brd BIGINT, - bwr BIGINT, - brr BIGINT, - bwdr BIGINT, - brdr BIGINT, - bwp BIGINT, - brp BIGINT, - bwn BIGINT, - brn BIGINT, - CONSTRAINT user_stats_pkey PRIMARY KEY(date, country) -); - --- TABLE relay_statuses_per_day --- A helper table which is commonly used to update the tables above in the --- refresh_* functions. -CREATE TABLE relay_statuses_per_day ( - date DATE NOT NULL, - count INTEGER NOT NULL, - CONSTRAINT relay_statuses_per_day_pkey PRIMARY KEY(date) -); - --- Dates to be included in the next refresh run. -CREATE TABLE scheduled_updates ( - id SERIAL, - date DATE NOT NULL -); - --- Dates in the current refresh run. When starting a refresh run, we copy --- the rows from scheduled_updates here in order to delete just those --- lines after the refresh run. Otherwise we might forget scheduled dates --- that have been added during a refresh run. If this happens we're going --- to update these dates in the next refresh run. -CREATE TABLE updates ( - id INTEGER, - date DATE -); - --- FUNCTION refresh_relay_statuses_per_day() --- Updates helper table which is used to refresh the aggregate tables. -CREATE OR REPLACE FUNCTION refresh_relay_statuses_per_day() -RETURNS INTEGER AS $$ - BEGIN - DELETE FROM relay_statuses_per_day - WHERE date IN (SELECT date FROM updates); - INSERT INTO relay_statuses_per_day (date, count) - SELECT DATE(validafter) AS date, COUNT(*) AS count - FROM consensus - WHERE DATE(validafter) >= (SELECT MIN(date) FROM updates) - AND DATE(validafter) <= (SELECT MAX(date) FROM updates) - AND DATE(validafter) IN (SELECT date FROM updates) - GROUP BY DATE(validafter); - RETURN 1; - END; -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION array_sum (BIGINT[]) RETURNS BIGINT AS $$ - SELECT SUM($1[i])::bigint - FROM generate_series(array_lower($1, 1), array_upper($1, 1)) index(i); -$$ LANGUAGE SQL; - -CREATE OR REPLACE FUNCTION insert_bwhist( - insert_fingerprint CHARACTER(40), insert_date DATE, - insert_read BIGINT[], insert_written BIGINT[], - insert_dirread BIGINT[], insert_dirwritten BIGINT[]) - RETURNS INTEGER AS $$ - BEGIN - IF (SELECT COUNT(*) FROM bwhist - WHERE fingerprint = insert_fingerprint AND date = insert_date) = 0 - THEN - INSERT INTO bwhist (fingerprint, date, read, written, dirread, - dirwritten) - VALUES (insert_fingerprint, insert_date, insert_read, insert_written, - insert_dirread, insert_dirwritten); - ELSE - BEGIN - UPDATE bwhist - SET read[array_lower(insert_read, 1): - array_upper(insert_read, 1)] = insert_read, - written[array_lower(insert_written, 1): - array_upper(insert_written, 1)] = insert_written, - dirread[array_lower(insert_dirread, 1): - array_upper(insert_dirread, 1)] = insert_dirread, - dirwritten[array_lower(insert_dirwritten, 1): - array_upper(insert_dirwritten, 1)] = insert_dirwritten - WHERE fingerprint = insert_fingerprint AND date = insert_date; - -- Updating twice is an ugly workaround for PostgreSQL bug 5840 - UPDATE bwhist - SET read[array_lower(insert_read, 1): - array_upper(insert_read, 1)] = insert_read, - written[array_lower(insert_written, 1): - array_upper(insert_written, 1)] = insert_written, - dirread[array_lower(insert_dirread, 1): - array_upper(insert_dirread, 1)] = insert_dirread, - dirwritten[array_lower(insert_dirwritten, 1): - array_upper(insert_dirwritten, 1)] = insert_dirwritten - WHERE fingerprint = insert_fingerprint AND date = insert_date; - END; - END IF; - UPDATE bwhist - SET read_sum = array_sum(read), - written_sum = array_sum(written), - dirread_sum = array_sum(dirread), - dirwritten_sum = array_sum(dirwritten) - WHERE fingerprint = insert_fingerprint AND date = insert_date; - RETURN 1; - END; -$$ LANGUAGE plpgsql; - --- refresh_* functions --- The following functions keep their corresponding aggregate tables --- up-to-date. They should be called every time ERNIE is run, or when new --- data is finished being added to the descriptor or statusentry tables. --- They find what new data has been entered or updated based on the --- updates table. - --- FUNCTION refresh_network_size() -CREATE OR REPLACE FUNCTION refresh_network_size() RETURNS INTEGER AS $$ - DECLARE - min_date TIMESTAMP WITHOUT TIME ZONE; - max_date TIMESTAMP WITHOUT TIME ZONE; - BEGIN - - min_date := (SELECT MIN(date) FROM updates); - max_date := (SELECT MAX(date) + 1 FROM updates); - - DELETE FROM network_size - WHERE date IN (SELECT date FROM updates); - - EXECUTE ' - INSERT INTO network_size - (date, avg_running, avg_exit, avg_guard, avg_fast, avg_stable, - avg_authority, avg_badexit, avg_baddirectory, avg_hsdir, - avg_named, avg_unnamed, avg_valid, avg_v2dir, avg_v3dir) - SELECT date, - isrunning / count AS avg_running, - isexit / count AS avg_exit, - isguard / count AS avg_guard, - isfast / count AS avg_fast, - isstable / count AS avg_stable, - isauthority / count as avg_authority, - isbadexit / count as avg_badexit, - isbaddirectory / count as avg_baddirectory, - ishsdir / count as avg_hsdir, - isnamed / count as avg_named, - isunnamed / count as avg_unnamed, - isvalid / count as avg_valid, - isv2dir / count as avg_v2dir, - isv3dir / count as avg_v3dir - FROM ( - SELECT DATE(validafter) AS date, - COUNT(*) AS isrunning, - COUNT(NULLIF(isexit, FALSE)) AS isexit, - COUNT(NULLIF(isguard, FALSE)) AS isguard, - COUNT(NULLIF(isfast, FALSE)) AS isfast, - COUNT(NULLIF(isstable, FALSE)) AS isstable, - COUNT(NULLIF(isauthority, FALSE)) AS isauthority, - COUNT(NULLIF(isbadexit, FALSE)) AS isbadexit, - COUNT(NULLIF(isbaddirectory, FALSE)) AS isbaddirectory, - COUNT(NULLIF(ishsdir, FALSE)) AS ishsdir, - COUNT(NULLIF(isnamed, FALSE)) AS isnamed, - COUNT(NULLIF(isunnamed, FALSE)) AS isunnamed, - COUNT(NULLIF(isvalid, FALSE)) AS isvalid, - COUNT(NULLIF(isv2dir, FALSE)) AS isv2dir, - COUNT(NULLIF(isv3dir, FALSE)) AS isv3dir - FROM statusentry - WHERE isrunning = TRUE - AND validafter >= ''' || min_date || ''' - AND validafter < ''' || max_date || ''' - AND DATE(validafter) IN (SELECT date FROM updates) - GROUP BY DATE(validafter) - ) b - NATURAL JOIN relay_statuses_per_day'; - - RETURN 1; - END; -$$ LANGUAGE plpgsql; - --- FUNCTION refresh_relay_platforms() -CREATE OR REPLACE FUNCTION refresh_relay_platforms() RETURNS INTEGER AS $$ - DECLARE - min_date TIMESTAMP WITHOUT TIME ZONE; - max_date TIMESTAMP WITHOUT TIME ZONE; - BEGIN - - min_date := (SELECT MIN(date) FROM updates); - max_date := (SELECT MAX(date) + 1 FROM updates); - - DELETE FROM relay_platforms - WHERE date IN (SELECT date FROM updates); - - EXECUTE ' - INSERT INTO relay_platforms - (date, avg_linux, avg_darwin, avg_bsd, avg_windows, avg_other) - SELECT date, - linux / count AS avg_linux, - darwin / count AS avg_darwin, - bsd / count AS avg_bsd, - windows / count AS avg_windows, - other / count AS avg_other - FROM ( - SELECT DATE(validafter) AS date, - SUM(CASE WHEN platform LIKE ''%Linux%'' THEN 1 ELSE 0 END) - AS linux, - SUM(CASE WHEN platform LIKE ''%Darwin%'' THEN 1 ELSE 0 END) - AS darwin, - SUM(CASE WHEN platform LIKE ''%BSD%'' THEN 1 ELSE 0 END) - AS bsd, - SUM(CASE WHEN platform LIKE ''%Windows%'' THEN 1 ELSE 0 END) - AS windows, - SUM(CASE WHEN platform NOT LIKE ''%Windows%'' - AND platform NOT LIKE ''%Darwin%'' - AND platform NOT LIKE ''%BSD%'' - AND platform NOT LIKE ''%Linux%'' THEN 1 ELSE 0 END) - AS other - FROM descriptor - RIGHT JOIN statusentry - ON statusentry.descriptor = descriptor.descriptor - WHERE isrunning = TRUE - AND validafter >= ''' || min_date || ''' - AND validafter < ''' || max_date || ''' - AND DATE(validafter) IN (SELECT date FROM updates) - GROUP BY DATE(validafter) - ) b - NATURAL JOIN relay_statuses_per_day'; - - RETURN 1; - END; -$$ LANGUAGE plpgsql; - --- FUNCTION refresh_relay_versions() -CREATE OR REPLACE FUNCTION refresh_relay_versions() RETURNS INTEGER AS $$ - DECLARE - min_date TIMESTAMP WITHOUT TIME ZONE; - max_date TIMESTAMP WITHOUT TIME ZONE; - BEGIN - - min_date := (SELECT MIN(date) FROM updates); - max_date := (SELECT MAX(date) + 1 FROM updates); - - DELETE FROM relay_versions - WHERE date IN (SELECT date FROM updates); - - EXECUTE ' - INSERT INTO relay_versions - (date, version, relays) - SELECT date, version, relays / count AS relays - FROM ( - SELECT DATE(validafter), SUBSTRING(platform, 5, 5) AS version, - COUNT(*) AS relays - FROM descriptor RIGHT JOIN statusentry - ON descriptor.descriptor = statusentry.descriptor - WHERE isrunning = TRUE - AND platform IS NOT NULL - AND validafter >= ''' || min_date || ''' - AND validafter < ''' || max_date || ''' - AND DATE(validafter) IN (SELECT date FROM updates) - GROUP BY 1, 2 - ) b - NATURAL JOIN relay_statuses_per_day'; - - RETURN 1; - END; -$$ LANGUAGE plpgsql; - --- FUNCTION refresh_total_bandwidth() --- This keeps the table total_bandwidth up-to-date when necessary. -CREATE OR REPLACE FUNCTION refresh_total_bandwidth() RETURNS INTEGER AS $$ - DECLARE - min_date TIMESTAMP WITHOUT TIME ZONE; - max_date TIMESTAMP WITHOUT TIME ZONE; - BEGIN - - min_date := (SELECT MIN(date) FROM updates); - max_date := (SELECT MAX(date) + 1 FROM updates); - - DELETE FROM total_bandwidth - WHERE date IN (SELECT date FROM updates); - - EXECUTE ' - INSERT INTO total_bandwidth - (bwavg, bwburst, bwobserved, bwadvertised, date) - SELECT (SUM(bandwidthavg) - / relay_statuses_per_day.count)::BIGINT AS bwavg, - (SUM(bandwidthburst) - / relay_statuses_per_day.count)::BIGINT AS bwburst, - (SUM(bandwidthobserved) - / relay_statuses_per_day.count)::BIGINT AS bwobserved, - (SUM(LEAST(bandwidthavg, bandwidthobserved)) - / relay_statuses_per_day.count)::BIGINT AS bwadvertised, - DATE(validafter) - FROM descriptor RIGHT JOIN statusentry - ON descriptor.descriptor = statusentry.descriptor - JOIN relay_statuses_per_day - ON DATE(validafter) = relay_statuses_per_day.date - WHERE isrunning = TRUE - AND validafter >= ''' || min_date || ''' - AND validafter < ''' || max_date || ''' - AND DATE(validafter) IN (SELECT date FROM updates) - AND relay_statuses_per_day.date >= ''' || min_date || ''' - AND relay_statuses_per_day.date < ''' || max_date || ''' - AND DATE(relay_statuses_per_day.date) IN - (SELECT date FROM updates) - GROUP BY DATE(validafter), relay_statuses_per_day.count'; - - RETURN 1; - END; -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION refresh_total_bwhist() RETURNS INTEGER AS $$ - BEGIN - DELETE FROM total_bwhist WHERE date IN (SELECT date FROM updates); - INSERT INTO total_bwhist (date, read, written) - SELECT date, SUM(read_sum) AS read, SUM(written_sum) AS written - FROM bwhist - WHERE date >= (SELECT MIN(date) FROM updates) - AND date <= (SELECT MAX(date) FROM updates) - AND date IN (SELECT date FROM updates) - GROUP BY date; - RETURN 1; - END; -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION refresh_bandwidth_flags() RETURNS INTEGER AS $$ - DECLARE - min_date TIMESTAMP WITHOUT TIME ZONE; - max_date TIMESTAMP WITHOUT TIME ZONE; - BEGIN - - min_date := (SELECT MIN(date) FROM updates); - max_date := (SELECT MAX(date) + 1 FROM updates); - - DELETE FROM bandwidth_flags WHERE date IN (SELECT date FROM updates); - EXECUTE ' - INSERT INTO bandwidth_flags (date, isexit, isguard, bwadvertised) - SELECT DATE(validafter) AS date, - BOOL_OR(isexit) AS isexit, - BOOL_OR(isguard) AS isguard, - (SUM(LEAST(bandwidthavg, bandwidthobserved)) - / relay_statuses_per_day.count)::BIGINT AS bwadvertised - FROM descriptor RIGHT JOIN statusentry - ON descriptor.descriptor = statusentry.descriptor - JOIN relay_statuses_per_day - ON DATE(validafter) = relay_statuses_per_day.date - WHERE isrunning = TRUE - AND validafter >= ''' || min_date || ''' - AND validafter < ''' || max_date || ''' - AND DATE(validafter) IN (SELECT date FROM updates) - AND relay_statuses_per_day.date >= ''' || min_date || ''' - AND relay_statuses_per_day.date < ''' || max_date || ''' - AND DATE(relay_statuses_per_day.date) IN - (SELECT date FROM updates) - GROUP BY DATE(validafter), isexit, isguard, relay_statuses_per_day.count'; - RETURN 1; - END; -$$ LANGUAGE plpgsql; - -CREATE OR REPLACE FUNCTION refresh_bwhist_flags() RETURNS INTEGER AS $$ - DECLARE - min_date TIMESTAMP WITHOUT TIME ZONE; - max_date TIMESTAMP WITHOUT TIME ZONE; - BEGIN - - min_date := (SELECT MIN(date) FROM updates); - max_date := (SELECT MAX(date) + 1 FROM updates); - - DELETE FROM bwhist_flags WHERE date IN (SELECT date FROM updates); - EXECUTE ' - INSERT INTO bwhist_flags (date, isexit, isguard, read, written) - SELECT a.date, isexit, isguard, SUM(read_sum) as read, - SUM(written_sum) AS written - FROM - (SELECT DATE(validafter) AS date, - fingerprint, - BOOL_OR(isexit) AS isexit, - BOOL_OR(isguard) AS isguard - FROM statusentry - WHERE isrunning = TRUE - AND validafter >= ''' || min_date || ''' - AND validafter < ''' || max_date || ''' - AND DATE(validafter) IN (SELECT date FROM updates) - GROUP BY 1, 2) a - JOIN bwhist - ON a.date = bwhist.date - AND a.fingerprint = bwhist.fingerprint - GROUP BY 1, 2, 3'; - RETURN 1; - END; -$$ LANGUAGE plpgsql; - --- FUNCTION refresh_user_stats() --- This function refreshes our user statistics by weighting reported --- directory request statistics of directory mirrors with bandwidth --- histories. -CREATE OR REPLACE FUNCTION refresh_user_stats() RETURNS INTEGER AS $$ - DECLARE - min_date TIMESTAMP WITHOUT TIME ZONE; - max_date TIMESTAMP WITHOUT TIME ZONE; - BEGIN - - min_date := (SELECT MIN(date) FROM updates); - max_date := (SELECT MAX(date) + 1 FROM updates); - - -- Start by deleting user statistics of the dates we're about to - -- regenerate. - DELETE FROM user_stats WHERE date IN (SELECT date FROM updates); - -- Now insert new user statistics. - EXECUTE ' - INSERT INTO user_stats (date, country, r, dw, dr, drw, drr, bw, br, bwd, - brd, bwr, brr, bwdr, brdr, bwp, brp, bwn, brn) - SELECT - -- We want to learn about total requests by date and country. - dirreq_stats_by_country.date AS date, - dirreq_stats_by_country.country AS country, - dirreq_stats_by_country.r AS r, - -- In order to weight the reported directory requests, we are - -- counting bytes of relays (except directory authorities) - -- matching certain criteria: whether or not they are reporting - -- directory requests, whether or not they are reporting - -- directory bytes, and whether their directory port is open or - -- closed. - SUM(CASE WHEN authority IS NOT NULL - THEN NULL ELSE dirwritten END) AS dw, - SUM(CASE WHEN authority IS NOT NULL - THEN NULL ELSE dirread END) AS dr, - SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL - THEN NULL ELSE dirwritten END) AS dwr, - SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL - THEN NULL ELSE dirread END) AS drr, - SUM(CASE WHEN authority IS NOT NULL - THEN NULL ELSE written END) AS bw, - SUM(CASE WHEN authority IS NOT NULL - THEN NULL ELSE read END) AS br, - SUM(CASE WHEN dirwritten = 0 OR authority IS NOT NULL - THEN NULL ELSE written END) AS bwd, - SUM(CASE WHEN dirwritten = 0 OR authority IS NOT NULL - THEN NULL ELSE read END) AS brd, - SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL - THEN NULL ELSE written END) AS bwr, - SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL - THEN NULL ELSE read END) AS brr, - SUM(CASE WHEN dirwritten = 0 OR requests IS NULL - OR authority IS NOT NULL THEN NULL ELSE written END) AS bwdr, - SUM(CASE WHEN dirwritten = 0 OR requests IS NULL - OR authority IS NOT NULL THEN NULL ELSE read END) AS brdr, - SUM(CASE WHEN opendirport IS NULL OR authority IS NOT NULL - THEN NULL ELSE written END) AS bwp, - SUM(CASE WHEN opendirport IS NULL OR authority IS NOT NULL - THEN NULL ELSE read END) AS brp, - SUM(CASE WHEN opendirport IS NOT NULL OR authority IS NOT NULL - THEN NULL ELSE written END) AS bwn, - SUM(CASE WHEN opendirport IS NOT NULL OR authority IS NOT NULL - THEN NULL ELSE read END) AS brn - FROM ( - -- The first sub-select tells us the total number of directory - -- requests per country reported by all directory mirrors. - SELECT dirreq_stats_by_date.date AS date, country, SUM(requests) AS r - FROM ( - SELECT fingerprint, date, country, SUM(requests) AS requests - FROM ( - -- There are two selects here, because in most cases the directory - -- request statistics cover two calendar dates. - SELECT LOWER(source) AS fingerprint, DATE(statsend) AS date, - country, FLOOR(requests * (CASE - WHEN EXTRACT(EPOCH FROM DATE(statsend)) > - EXTRACT(EPOCH FROM statsend) - seconds - THEN EXTRACT(EPOCH FROM statsend) - - EXTRACT(EPOCH FROM DATE(statsend)) - ELSE seconds END) / seconds) AS requests - FROM dirreq_stats - UNION - SELECT LOWER(source) AS fingerprint, DATE(statsend) - 1 AS date, - country, FLOOR(requests * - (EXTRACT(EPOCH FROM DATE(statsend)) - - EXTRACT(EPOCH FROM statsend) + seconds) - / seconds) AS requests - FROM dirreq_stats - WHERE EXTRACT(EPOCH FROM DATE(statsend)) - - EXTRACT(EPOCH FROM statsend) + seconds > 0 - ) dirreq_stats_split - GROUP BY 1, 2, 3 - ) dirreq_stats_by_date - -- We are only interested in requests by directory mirrors, not - -- directory authorities, so we exclude all relays with the Authority - -- flag. - RIGHT JOIN ( - SELECT fingerprint, DATE(validafter) AS date - FROM statusentry - WHERE validafter >= ''' || min_date || ''' - AND validafter < ''' || max_date || ''' - AND DATE(validafter) IN (SELECT date FROM updates) - AND isauthority IS FALSE - GROUP BY 1, 2 - ) statusentry_dirmirrors - ON dirreq_stats_by_date.fingerprint = - statusentry_dirmirrors.fingerprint - AND dirreq_stats_by_date.date = statusentry_dirmirrors.date - GROUP BY 1, 2 - ) dirreq_stats_by_country - LEFT JOIN ( - -- In the next step, we expand the result by bandwidth histories of - -- all relays. - SELECT fingerprint, date, read_sum AS read, written_sum AS written, - dirread_sum AS dirread, dirwritten_sum AS dirwritten - FROM bwhist - WHERE date >= ''' || min_date || ''' - AND date < ''' || max_date || ''' - AND date IN (SELECT date FROM updates) - ) bwhist_by_relay - ON dirreq_stats_by_country.date = bwhist_by_relay.date - LEFT JOIN ( - -- For each relay, tell how often it had an open directory port and - -- how often it had the Authority flag assigned on a given date. - SELECT fingerprint, DATE(validafter) AS date, - SUM(CASE WHEN dirport > 0 THEN 1 ELSE NULL END) AS opendirport, - SUM(CASE WHEN isauthority IS TRUE THEN 1 ELSE NULL END) AS authority - FROM statusentry - WHERE validafter >= ''' || min_date || ''' - AND validafter < ''' || max_date || ''' - AND DATE(validafter) IN (SELECT date FROM updates) - GROUP BY 1, 2 - ) statusentry_by_relay - ON bwhist_by_relay.fingerprint = statusentry_by_relay.fingerprint - AND bwhist_by_relay.date = statusentry_by_relay.date - LEFT JOIN ( - -- For each relay, tell if it has reported directory request - -- statistics on a given date. Again, we have to take into account - -- that statistics intervals cover more than one calendar date in most - -- cases. The exact number of requests is not relevant here, but only - -- whether the relay reported directory requests or not. - SELECT fingerprint, date, 1 AS requests - FROM ( - SELECT LOWER(source) AS fingerprint, DATE(statsend) AS date - FROM dirreq_stats - WHERE DATE(statsend) >= ''' || min_date || ''' - AND DATE(statsend) < ''' || max_date || ''' - AND DATE(statsend) IN (SELECT date FROM updates) - AND country = ''zy'' - UNION - SELECT LOWER(source) AS fingerprint, DATE(statsend) - 1 AS date - FROM dirreq_stats - WHERE DATE(statsend) - 1 >= ''' || min_date || ''' - AND DATE(statsend) - 1 < ''' || max_date || ''' - AND DATE(statsend) IN (SELECT date FROM updates) - AND country = ''zy'' - AND EXTRACT(EPOCH FROM DATE(statsend)) - - EXTRACT(EPOCH FROM statsend) + seconds > 0 - ) dirreq_stats_split - GROUP BY 1, 2 - ) dirreq_stats_by_relay - ON bwhist_by_relay.fingerprint = dirreq_stats_by_relay.fingerprint - AND bwhist_by_relay.date = dirreq_stats_by_relay.date - WHERE dirreq_stats_by_country.country IS NOT NULL - -- Group by date, country, and total reported directory requests, - -- summing up the bandwidth histories. - GROUP BY 1, 2, 3'; - RETURN 1; - END; -$$ LANGUAGE plpgsql; - --- non-relay statistics --- The following tables contain pre-aggregated statistics that are not --- based on relay descriptors or that are not yet derived from the relay --- descriptors in the database. - --- TABLE bridge_network_size --- Contains average number of running bridges. -CREATE TABLE bridge_network_size ( - "date" DATE NOT NULL, - avg_running INTEGER NOT NULL, - avg_running_ec2 INTEGER NOT NULL, - CONSTRAINT bridge_network_size_pkey PRIMARY KEY(date) -); - --- TABLE dirreq_stats --- Contains daily users by country. -CREATE TABLE dirreq_stats ( - source CHARACTER(40) NOT NULL, - statsend TIMESTAMP WITHOUT TIME ZONE NOT NULL, - seconds INTEGER NOT NULL, - country CHARACTER(2) NOT NULL, - requests INTEGER NOT NULL, - CONSTRAINT dirreq_stats_pkey - PRIMARY KEY (source, statsend, seconds, country) -); - --- TABLE torperf_stats --- Quantiles and medians of daily torperf results. -CREATE TABLE torperf_stats ( - "date" DATE NOT NULL, - source CHARACTER VARYING(32) NOT NULL, - q1 INTEGER NOT NULL, - md INTEGER NOT NULL, - q3 INTEGER NOT NULL, - timeouts INTEGER NOT NULL, - failures INTEGER NOT NULL, - requests INTEGER NOT NULL, - CONSTRAINT torperf_stats_pkey PRIMARY KEY("date", source) -); - --- Refresh all statistics in the database. -CREATE OR REPLACE FUNCTION refresh_all() RETURNS INTEGER AS $$ - BEGIN - RAISE NOTICE '% Starting refresh run.', timeofday(); - RAISE NOTICE '% Deleting old dates from updates.', timeofday(); - DELETE FROM updates; - RAISE NOTICE '% Copying scheduled dates.', timeofday(); - INSERT INTO updates SELECT * FROM scheduled_updates; - RAISE NOTICE '% Refreshing relay statuses per day.', timeofday(); - PERFORM refresh_relay_statuses_per_day(); - RAISE NOTICE '% Refreshing network size.', timeofday(); - PERFORM refresh_network_size(); - RAISE NOTICE '% Refreshing relay platforms.', timeofday(); - PERFORM refresh_relay_platforms(); - RAISE NOTICE '% Refreshing relay versions.', timeofday(); - PERFORM refresh_relay_versions(); - RAISE NOTICE '% Refreshing total relay bandwidth.', timeofday(); - PERFORM refresh_total_bandwidth(); - RAISE NOTICE '% Refreshing relay bandwidth history.', timeofday(); - PERFORM refresh_total_bwhist(); - RAISE NOTICE '% Refreshing total relay bandwidth by flags.', timeofday(); - PERFORM refresh_bandwidth_flags(); - RAISE NOTICE '% Refreshing bandwidth history by flags.', timeofday(); - PERFORM refresh_bwhist_flags(); - RAISE NOTICE '% Refreshing user statistics.', timeofday(); - PERFORM refresh_user_stats(); - RAISE NOTICE '% Deleting processed dates.', timeofday(); - DELETE FROM scheduled_updates WHERE id IN (SELECT id FROM updates); - RAISE NOTICE '% Terminating refresh run.', timeofday(); - RETURN 1; - END; -$$ LANGUAGE plpgsql; - --- View for exporting server statistics. -CREATE VIEW stats_servers AS - (SELECT date, NULL AS flag, NULL AS country, NULL AS version, - NULL AS platform, TRUE AS ec2bridge, NULL AS relays, - avg_running_ec2 AS bridges FROM bridge_network_size - WHERE date < current_date - 1) -UNION ALL - (SELECT COALESCE(network_size.date, bridge_network_size.date) AS date, - NULL AS flag, NULL AS country, NULL AS version, NULL AS platform, - NULL AS ec2bridge, network_size.avg_running AS relays, - bridge_network_size.avg_running AS bridges FROM network_size - FULL OUTER JOIN bridge_network_size - ON network_size.date = bridge_network_size.date - WHERE COALESCE(network_size.date, bridge_network_size.date) < - current_date - 1) -UNION ALL - (SELECT date, 'Exit' AS flag, NULL AS country, NULL AS version, - NULL AS platform, NULL AS ec2bridge, avg_exit AS relays, - NULL AS bridges FROM network_size WHERE date < current_date - 1) -UNION ALL - (SELECT date, 'Guard' AS flag, NULL AS country, NULL AS version, - NULL AS platform, NULL AS ec2bridge, avg_guard AS relays, - NULL AS bridges FROM network_size WHERE date < current_date - 1) -UNION ALL - (SELECT date, 'Fast' AS flag, NULL AS country, NULL AS version, - NULL AS platform, NULL AS ec2bridge, avg_fast AS relays, - NULL AS bridges FROM network_size WHERE date < current_date - 1) -UNION ALL - (SELECT date, 'Stable' AS flag, NULL AS country, NULL AS version, - NULL AS platform, NULL AS ec2bridge, avg_stable AS relays, - NULL AS bridges FROM network_size WHERE date < current_date - 1) -UNION ALL - (SELECT date, 'HSDir' AS flag, NULL AS country, NULL AS version, - NULL AS platform, NULL AS ec2bridge, avg_hsdir AS relays, - NULL AS bridges FROM network_size WHERE date < current_date - 1) -UNION ALL - (SELECT date, NULL AS flag, CASE WHEN country != 'zz' THEN country - ELSE '??' END AS country, NULL AS version, NULL AS platform, - NULL AS ec2bridge, relays, NULL AS bridges FROM relay_countries - WHERE date < current_date - 1) -UNION ALL - (SELECT date, NULL AS flag, NULL AS country, version, NULL AS platform, - NULL AS ec2bridge, relays, NULL AS bridges FROM relay_versions - WHERE date < current_date - 1) -UNION ALL - (SELECT date, NULL AS flag, NULL AS country, NULL AS version, - 'Linux' AS platform, NULL AS ec2bridge, avg_linux AS relays, - NULL AS bridges FROM relay_platforms WHERE date < current_date - 1) -UNION ALL - (SELECT date, NULL AS flag, NULL AS country, NULL AS version, - 'Darwin' AS platform, NULL AS ec2bridge, avg_darwin AS relays, - NULL AS bridges FROM relay_platforms WHERE date < current_date - 1) -UNION ALL - (SELECT date, NULL AS flag, NULL AS country, NULL AS version, - 'FreeBSD' AS platform, NULL AS ec2bridge, avg_bsd AS relays, - NULL AS bridges FROM relay_platforms WHERE date < current_date - 1) -UNION ALL - (SELECT date, NULL AS flag, NULL AS country, NULL AS version, - 'Windows' AS platform, NULL AS ec2bridge, avg_windows AS relays, - NULL AS bridges FROM relay_platforms WHERE date < current_date - 1) -UNION ALL - (SELECT date, NULL AS flag, NULL AS country, NULL AS version, - 'Other' AS platform, NULL AS ec2bridge, avg_other AS relays, - NULL AS bridges FROM relay_platforms WHERE date < current_date - 1) -ORDER BY 1, 2, 3, 4, 5, 6; - --- View for exporting bandwidth statistics. -CREATE VIEW stats_bandwidth AS - (SELECT COALESCE(bandwidth_flags.date, bwhist_flags.date) AS date, - COALESCE(bandwidth_flags.isexit, bwhist_flags.isexit) AS isexit, - COALESCE(bandwidth_flags.isguard, bwhist_flags.isguard) AS isguard, - bandwidth_flags.bwadvertised AS advbw, - CASE WHEN bwhist_flags.read IS NOT NULL - THEN bwhist_flags.read / 86400 END AS bwread, - CASE WHEN bwhist_flags.written IS NOT NULL - THEN bwhist_flags.written / 86400 END AS bwwrite, - NULL AS dirread, NULL AS dirwrite - FROM bandwidth_flags FULL OUTER JOIN bwhist_flags - ON bandwidth_flags.date = bwhist_flags.date - AND bandwidth_flags.isexit = bwhist_flags.isexit - AND bandwidth_flags.isguard = bwhist_flags.isguard - WHERE COALESCE(bandwidth_flags.date, bwhist_flags.date) < - current_date - 3) -UNION ALL - (SELECT COALESCE(total_bandwidth.date, total_bwhist.date, u.date) - AS date, NULL AS isexit, NULL AS isguard, - total_bandwidth.bwadvertised AS advbw, - CASE WHEN total_bwhist.read IS NOT NULL - THEN total_bwhist.read / 86400 END AS bwread, - CASE WHEN total_bwhist.written IS NOT NULL - THEN total_bwhist.written / 86400 END AS bwwrite, - CASE WHEN u.date IS NOT NULL - THEN FLOOR(CAST(u.dr AS NUMERIC) * CAST(u.brp AS NUMERIC) / - CAST(u.brd AS NUMERIC) / CAST(86400 AS NUMERIC)) END AS dirread, - CASE WHEN u.date IS NOT NULL - THEN FLOOR(CAST(u.dw AS NUMERIC) * CAST(u.bwp AS NUMERIC) / - CAST(u.bwd AS NUMERIC) / CAST(86400 AS NUMERIC)) END AS dirwrite - FROM total_bandwidth FULL OUTER JOIN total_bwhist - ON total_bandwidth.date = total_bwhist.date - FULL OUTER JOIN (SELECT * FROM user_stats WHERE country = 'zy' - AND bwp / bwd <= 3) u - ON COALESCE(total_bandwidth.date, total_bwhist.date) = u.date - WHERE COALESCE(total_bandwidth.date, total_bwhist.date, u.date) < - current_date - 3) -ORDER BY 1, 2, 3; - --- View for exporting torperf statistics. -CREATE VIEW stats_torperf AS -SELECT date, CASE WHEN source LIKE '%-50kb' THEN 50 * 1024 - WHEN source LIKE '%-1mb' THEN 1024 * 1024 - WHEN source LIKE '%-5mb' THEN 5 * 1024 * 1024 END AS size, - CASE WHEN source NOT LIKE 'all-%' - THEN split_part(source, '-', 1) END AS source, q1, md, q3, timeouts, - failures, requests FROM torperf_stats WHERE date < current_date - 1 - ORDER BY 1, 2, 3; - --- View for exporting connbidirect statistics. -CREATE VIEW stats_connbidirect AS -SELECT DATE(statsend) AS date, source, belownum AS below, readnum AS read, - writenum AS write, bothnum AS "both" FROM connbidirect - WHERE DATE(statsend) < current_date - 1 ORDER BY 1, 2; - diff --git a/modules/legacy/build.xml b/modules/legacy/build.xml new file mode 100644 index 0000000..bf0b65d --- /dev/null +++ b/modules/legacy/build.xml @@ -0,0 +1,55 @@ +<project default="run" name="metrics-web" basedir="."> + + <!-- Define build paths. --> + <property name="sources" value="src"/> + <property name="classes" value="classes"/> + <property name="config" value="etc"/> + <property name="warfile" value="ernie.war"/> + <path id="classpath"> + <pathelement path="${classes}"/> + <fileset dir="/usr/share/java"> + <include name="commons-codec.jar"/> + <include name="commons-compress.jar"/> + <include name="postgresql-jdbc3.jar"/> + <include name="junit4.jar"/> + <include name="servlet-api-3.0.jar"/> + <include name="commons-lang.jar"/> + </fileset> + <fileset dir="../../deps/metrics-lib"> + <include name="descriptor.jar"/> + </fileset> + </path> + + <target name="init"> + <copy file="config.template" tofile="config"/> + <mkdir dir="${classes}"/> + </target> + <target name="metrics-lib"> + <ant dir="../../deps/metrics-lib"/> + </target> + + <!-- Compile all plain Java classes. --> + <target name="compile" depends="metrics-lib,init"> + <javac destdir="${classes}" + srcdir="${sources}" + source="1.5" + target="1.5" + debug="true" + deprecation="true" + optimize="false" + failonerror="true" + includeantruntime="false"> + <classpath refid="classpath"/> + </javac> + </target> + + <!-- Prepare data for being displayed on the website. --> + <target name="run" depends="compile"> + <java fork="true" + maxmemory="1024m" + classname="org.torproject.ernie.cron.Main"> + <classpath refid="classpath"/> + </java> + </target> +</project> + diff --git a/modules/legacy/config.template b/modules/legacy/config.template new file mode 100644 index 0000000..8f0789b --- /dev/null +++ b/modules/legacy/config.template @@ -0,0 +1,47 @@ +## Import directory archives from disk, if available +#ImportDirectoryArchives 0 +# +## Relative path to directory to import directory archives from +#DirectoryArchivesDirectory in/relay-descriptors/ +# +## Keep a history of imported directory archive files to know which files +## have been imported before. This history can be useful when importing +## from a changing source to avoid importing descriptors over and over +## again, but it can be confusing to users who don't know about it. +#KeepDirectoryArchiveImportHistory 0 +# +## Import sanitized bridges from disk, if available +#ImportSanitizedBridges 0 +# +## Relative path to directory to import sanitized bridges from +#SanitizedBridgesDirectory in/bridge-descriptors/ +# +## Keep a history of imported sanitized bridge descriptors. This history +## can be useful when importing from a changing data source to avoid +## importing descriptors more than once, but it can be confusing to users +## who don't know about it. +#KeepSanitizedBridgesImportHistory 0 +# +## Write relay descriptors to the database +#WriteRelayDescriptorDatabase 0 +# +## JDBC string for relay descriptor database +#RelayDescriptorDatabaseJDBC jdbc:postgresql://localhost/tordir?user=metrics&password=password +# +## Write relay descriptors to raw text files for importing them into the +## database using PostgreSQL's \copy command +#WriteRelayDescriptorsRawFiles 0 +# +## Relative path to directory to write raw text files; note that existing +## files will be overwritten! +#RelayDescriptorRawFilesDirectory pg-import/ +# +## Write bridge stats to disk +#WriteBridgeStats 0 +# +## Import torperf data, if available, and write stats to disk +#ImportWriteTorperfStats 0 +# +## Relative path to directory to import torperf results from +#TorperfDirectory in/torperf/ +# diff --git a/modules/legacy/db/tordir.sql b/modules/legacy/db/tordir.sql new file mode 100644 index 0000000..cd2ed6a --- /dev/null +++ b/modules/legacy/db/tordir.sql @@ -0,0 +1,1005 @@ +-- Copyright 2010 The Tor Project +-- See LICENSE for licensing information + +CREATE LANGUAGE plpgsql; + +-- TABLE descriptor +-- Contains all of the descriptors published by routers. +CREATE TABLE descriptor ( + descriptor CHARACTER(40) NOT NULL, + nickname CHARACTER VARYING(19) NOT NULL, + address CHARACTER VARYING(15) NOT NULL, + orport INTEGER NOT NULL, + dirport INTEGER NOT NULL, + fingerprint CHARACTER(40) NOT NULL, + bandwidthavg BIGINT NOT NULL, + bandwidthburst BIGINT NOT NULL, + bandwidthobserved BIGINT NOT NULL, + platform CHARACTER VARYING(256), + published TIMESTAMP WITHOUT TIME ZONE NOT NULL, + uptime BIGINT, + extrainfo CHARACTER(40), + CONSTRAINT descriptor_pkey PRIMARY KEY (descriptor) +); + +-- Contains bandwidth histories reported by relays in extra-info +-- descriptors. Each row contains the reported bandwidth in 15-minute +-- intervals for each relay and date. +CREATE TABLE bwhist ( + fingerprint CHARACTER(40) NOT NULL, + date DATE NOT NULL, + read BIGINT[], + read_sum BIGINT, + written BIGINT[], + written_sum BIGINT, + dirread BIGINT[], + dirread_sum BIGINT, + dirwritten BIGINT[], + dirwritten_sum BIGINT, + CONSTRAINT bwhist_pkey PRIMARY KEY (fingerprint, date) +); + +CREATE INDEX bwhist_date ON bwhist (date); + +-- TABLE statusentry +-- Contains all of the consensus entries published by the directories. +-- Each statusentry references a valid descriptor. +CREATE TABLE statusentry ( + validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL, + nickname CHARACTER VARYING(19) NOT NULL, + fingerprint CHARACTER(40) NOT NULL, + descriptor CHARACTER(40) NOT NULL, + published TIMESTAMP WITHOUT TIME ZONE NOT NULL, + address CHARACTER VARYING(15) NOT NULL, + orport INTEGER NOT NULL, + dirport INTEGER NOT NULL, + isauthority BOOLEAN DEFAULT FALSE NOT NULL, + isbadexit BOOLEAN DEFAULT FALSE NOT NULL, + isbaddirectory BOOLEAN DEFAULT FALSE NOT NULL, + isexit BOOLEAN DEFAULT FALSE NOT NULL, + isfast BOOLEAN DEFAULT FALSE NOT NULL, + isguard BOOLEAN DEFAULT FALSE NOT NULL, + ishsdir BOOLEAN DEFAULT FALSE NOT NULL, + isnamed BOOLEAN DEFAULT FALSE NOT NULL, + isstable BOOLEAN DEFAULT FALSE NOT NULL, + isrunning BOOLEAN DEFAULT FALSE NOT NULL, + isunnamed BOOLEAN DEFAULT FALSE NOT NULL, + isvalid BOOLEAN DEFAULT FALSE NOT NULL, + isv2dir BOOLEAN DEFAULT FALSE NOT NULL, + isv3dir BOOLEAN DEFAULT FALSE NOT NULL, + version CHARACTER VARYING(50), + bandwidth BIGINT, + ports TEXT, + rawdesc BYTEA NOT NULL +); + +CREATE OR REPLACE FUNCTION statusentry_insert_trigger() +RETURNS TRIGGER AS $$ + +DECLARE + tablename TEXT; + selectresult TEXT; + nextmonth TIMESTAMP WITHOUT TIME ZONE; + v_year INTEGER; + v_month INTEGER; + n_year INTEGER; + n_month INTEGER; + +BEGIN + v_year := extract(YEAR FROM NEW.validafter); + v_month := extract(MONTH FROM NEW.validafter); + tablename := 'statusentry_y' || v_year || 'm' || + TO_CHAR(NEW.validafter, 'mm'); + EXECUTE 'SELECT relname FROM pg_class WHERE relname = '''|| tablename || + '''' INTO selectresult; + IF selectresult IS NULL THEN + nextmonth := new.validafter + interval '1 month'; + n_year := extract(YEAR FROM nextmonth); + n_month := extract(MONTH FROM nextmonth); + EXECUTE 'CREATE TABLE ' || tablename || + ' ( CHECK ( validafter >= ''' || v_year || '-' || + TO_CHAR(NEW.validafter, 'mm') || '-01 00:00:00'' ' || + 'AND validafter < ''' || n_year || '-' || + TO_CHAR(nextmonth, 'mm') || + '-01 00:00:00'') ) INHERITS (statusentry)'; + EXECUTE 'ALTER TABLE ' || tablename || ' ADD CONSTRAINT ' || + tablename || '_pkey PRIMARY KEY (validafter, fingerprint)'; + EXECUTE 'CREATE INDEX ' || tablename || '_address ON ' || + tablename || ' (address)'; + EXECUTE 'CREATE INDEX ' || tablename || '_fingerprint ON ' || + tablename || ' (fingerprint)'; + EXECUTE 'CREATE INDEX ' || tablename || '_nickname ON ' || + tablename || ' (LOWER(nickname))'; + EXECUTE 'CREATE INDEX ' || tablename || '_validafter ON ' || + tablename || ' (validafter)'; + EXECUTE 'CREATE INDEX ' || tablename || '_descriptor ON ' || + tablename || ' (descriptor)'; + EXECUTE 'CREATE INDEX ' || tablename || '_validafter_date ON ' || + tablename || ' (DATE(validafter))'; + END IF; + EXECUTE 'INSERT INTO ' || tablename || ' SELECT ($1).*' USING NEW; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER insert_statusentry_trigger + BEFORE INSERT ON statusentry + FOR EACH ROW EXECUTE PROCEDURE statusentry_insert_trigger(); + +-- TABLE consensus +-- Contains all of the consensuses published by the directories. +CREATE TABLE consensus ( + validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL, + CONSTRAINT consensus_pkey PRIMARY KEY (validafter) +); + +-- TABLE connbidirect +-- Contain conn-bi-direct stats strings +CREATE TABLE connbidirect ( + source CHARACTER(40) NOT NULL, + statsend TIMESTAMP WITHOUT TIME ZONE NOT NULL, + seconds INTEGER NOT NULL, + belownum BIGINT NOT NULL, + readnum BIGINT NOT NULL, + writenum BIGINT NOT NULL, + bothnum BIGINT NOT NULL, + CONSTRAINT connbidirect_pkey PRIMARY KEY (source, statsend) +); + +-- TABLE network_size +CREATE TABLE network_size ( + date DATE NOT NULL, + avg_running INTEGER NOT NULL, + avg_exit INTEGER NOT NULL, + avg_guard INTEGER NOT NULL, + avg_fast INTEGER NOT NULL, + avg_stable INTEGER NOT NULL, + avg_authority INTEGER NOT NULL, + avg_badexit INTEGER NOT NULL, + avg_baddirectory INTEGER NOT NULL, + avg_hsdir INTEGER NOT NULL, + avg_named INTEGER NOT NULL, + avg_unnamed INTEGER NOT NULL, + avg_valid INTEGER NOT NULL, + avg_v2dir INTEGER NOT NULL, + avg_v3dir INTEGER NOT NULL, + CONSTRAINT network_size_pkey PRIMARY KEY(date) +); + +-- TABLE relay_countries +CREATE TABLE relay_countries ( + date DATE NOT NULL, + country CHARACTER(2) NOT NULL, + relays INTEGER NOT NULL, + CONSTRAINT relay_countries_pkey PRIMARY KEY(date, country) +); + +-- TABLE relay_platforms +CREATE TABLE relay_platforms ( + date DATE NOT NULL, + avg_linux INTEGER NOT NULL, + avg_darwin INTEGER NOT NULL, + avg_bsd INTEGER NOT NULL, + avg_windows INTEGER NOT NULL, + avg_other INTEGER NOT NULL, + CONSTRAINT relay_platforms_pkey PRIMARY KEY(date) +); + +-- TABLE relay_versions +CREATE TABLE relay_versions ( + date DATE NOT NULL, + version CHARACTER(5) NOT NULL, + relays INTEGER NOT NULL, + CONSTRAINT relay_versions_pkey PRIMARY KEY(date, version) +); + +-- TABLE total_bandwidth +-- Contains information for the whole network's total bandwidth which is +-- used in the bandwidth graphs. +CREATE TABLE total_bandwidth ( + date DATE NOT NULL, + bwavg BIGINT NOT NULL, + bwburst BIGINT NOT NULL, + bwobserved BIGINT NOT NULL, + bwadvertised BIGINT NOT NULL, + CONSTRAINT total_bandwidth_pkey PRIMARY KEY(date) +); + +-- TABLE total_bwhist +-- Contains the total number of read/written and the number of dir bytes +-- read/written by all relays in the network on a given day. The dir bytes +-- are an estimate based on the subset of relays that count dir bytes. +CREATE TABLE total_bwhist ( + date DATE NOT NULL, + read BIGINT, + written BIGINT, + CONSTRAINT total_bwhist_pkey PRIMARY KEY(date) +); + +-- TABLE bandwidth_flags +CREATE TABLE bandwidth_flags ( + date DATE NOT NULL, + isexit BOOLEAN NOT NULL, + isguard BOOLEAN NOT NULL, + bwadvertised BIGINT NOT NULL, + CONSTRAINT bandwidth_flags_pkey PRIMARY KEY(date, isexit, isguard) +); + +-- TABLE bwhist_flags +CREATE TABLE bwhist_flags ( + date DATE NOT NULL, + isexit BOOLEAN NOT NULL, + isguard BOOLEAN NOT NULL, + read BIGINT, + written BIGINT, + CONSTRAINT bwhist_flags_pkey PRIMARY KEY(date, isexit, isguard) +); + +-- TABLE user_stats +-- Aggregate statistics on directory requests and byte histories that we +-- use to estimate user numbers. +CREATE TABLE user_stats ( + date DATE NOT NULL, + country CHARACTER(2) NOT NULL, + r BIGINT, + dw BIGINT, + dr BIGINT, + drw BIGINT, + drr BIGINT, + bw BIGINT, + br BIGINT, + bwd BIGINT, + brd BIGINT, + bwr BIGINT, + brr BIGINT, + bwdr BIGINT, + brdr BIGINT, + bwp BIGINT, + brp BIGINT, + bwn BIGINT, + brn BIGINT, + CONSTRAINT user_stats_pkey PRIMARY KEY(date, country) +); + +-- TABLE relay_statuses_per_day +-- A helper table which is commonly used to update the tables above in the +-- refresh_* functions. +CREATE TABLE relay_statuses_per_day ( + date DATE NOT NULL, + count INTEGER NOT NULL, + CONSTRAINT relay_statuses_per_day_pkey PRIMARY KEY(date) +); + +-- Dates to be included in the next refresh run. +CREATE TABLE scheduled_updates ( + id SERIAL, + date DATE NOT NULL +); + +-- Dates in the current refresh run. When starting a refresh run, we copy +-- the rows from scheduled_updates here in order to delete just those +-- lines after the refresh run. Otherwise we might forget scheduled dates +-- that have been added during a refresh run. If this happens we're going +-- to update these dates in the next refresh run. +CREATE TABLE updates ( + id INTEGER, + date DATE +); + +-- FUNCTION refresh_relay_statuses_per_day() +-- Updates helper table which is used to refresh the aggregate tables. +CREATE OR REPLACE FUNCTION refresh_relay_statuses_per_day() +RETURNS INTEGER AS $$ + BEGIN + DELETE FROM relay_statuses_per_day + WHERE date IN (SELECT date FROM updates); + INSERT INTO relay_statuses_per_day (date, count) + SELECT DATE(validafter) AS date, COUNT(*) AS count + FROM consensus + WHERE DATE(validafter) >= (SELECT MIN(date) FROM updates) + AND DATE(validafter) <= (SELECT MAX(date) FROM updates) + AND DATE(validafter) IN (SELECT date FROM updates) + GROUP BY DATE(validafter); + RETURN 1; + END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION array_sum (BIGINT[]) RETURNS BIGINT AS $$ + SELECT SUM($1[i])::bigint + FROM generate_series(array_lower($1, 1), array_upper($1, 1)) index(i); +$$ LANGUAGE SQL; + +CREATE OR REPLACE FUNCTION insert_bwhist( + insert_fingerprint CHARACTER(40), insert_date DATE, + insert_read BIGINT[], insert_written BIGINT[], + insert_dirread BIGINT[], insert_dirwritten BIGINT[]) + RETURNS INTEGER AS $$ + BEGIN + IF (SELECT COUNT(*) FROM bwhist + WHERE fingerprint = insert_fingerprint AND date = insert_date) = 0 + THEN + INSERT INTO bwhist (fingerprint, date, read, written, dirread, + dirwritten) + VALUES (insert_fingerprint, insert_date, insert_read, insert_written, + insert_dirread, insert_dirwritten); + ELSE + BEGIN + UPDATE bwhist + SET read[array_lower(insert_read, 1): + array_upper(insert_read, 1)] = insert_read, + written[array_lower(insert_written, 1): + array_upper(insert_written, 1)] = insert_written, + dirread[array_lower(insert_dirread, 1): + array_upper(insert_dirread, 1)] = insert_dirread, + dirwritten[array_lower(insert_dirwritten, 1): + array_upper(insert_dirwritten, 1)] = insert_dirwritten + WHERE fingerprint = insert_fingerprint AND date = insert_date; + -- Updating twice is an ugly workaround for PostgreSQL bug 5840 + UPDATE bwhist + SET read[array_lower(insert_read, 1): + array_upper(insert_read, 1)] = insert_read, + written[array_lower(insert_written, 1): + array_upper(insert_written, 1)] = insert_written, + dirread[array_lower(insert_dirread, 1): + array_upper(insert_dirread, 1)] = insert_dirread, + dirwritten[array_lower(insert_dirwritten, 1): + array_upper(insert_dirwritten, 1)] = insert_dirwritten + WHERE fingerprint = insert_fingerprint AND date = insert_date; + END; + END IF; + UPDATE bwhist + SET read_sum = array_sum(read), + written_sum = array_sum(written), + dirread_sum = array_sum(dirread), + dirwritten_sum = array_sum(dirwritten) + WHERE fingerprint = insert_fingerprint AND date = insert_date; + RETURN 1; + END; +$$ LANGUAGE plpgsql; + +-- refresh_* functions +-- The following functions keep their corresponding aggregate tables +-- up-to-date. They should be called every time ERNIE is run, or when new +-- data is finished being added to the descriptor or statusentry tables. +-- They find what new data has been entered or updated based on the +-- updates table. + +-- FUNCTION refresh_network_size() +CREATE OR REPLACE FUNCTION refresh_network_size() RETURNS INTEGER AS $$ + DECLARE + min_date TIMESTAMP WITHOUT TIME ZONE; + max_date TIMESTAMP WITHOUT TIME ZONE; + BEGIN + + min_date := (SELECT MIN(date) FROM updates); + max_date := (SELECT MAX(date) + 1 FROM updates); + + DELETE FROM network_size + WHERE date IN (SELECT date FROM updates); + + EXECUTE ' + INSERT INTO network_size + (date, avg_running, avg_exit, avg_guard, avg_fast, avg_stable, + avg_authority, avg_badexit, avg_baddirectory, avg_hsdir, + avg_named, avg_unnamed, avg_valid, avg_v2dir, avg_v3dir) + SELECT date, + isrunning / count AS avg_running, + isexit / count AS avg_exit, + isguard / count AS avg_guard, + isfast / count AS avg_fast, + isstable / count AS avg_stable, + isauthority / count as avg_authority, + isbadexit / count as avg_badexit, + isbaddirectory / count as avg_baddirectory, + ishsdir / count as avg_hsdir, + isnamed / count as avg_named, + isunnamed / count as avg_unnamed, + isvalid / count as avg_valid, + isv2dir / count as avg_v2dir, + isv3dir / count as avg_v3dir + FROM ( + SELECT DATE(validafter) AS date, + COUNT(*) AS isrunning, + COUNT(NULLIF(isexit, FALSE)) AS isexit, + COUNT(NULLIF(isguard, FALSE)) AS isguard, + COUNT(NULLIF(isfast, FALSE)) AS isfast, + COUNT(NULLIF(isstable, FALSE)) AS isstable, + COUNT(NULLIF(isauthority, FALSE)) AS isauthority, + COUNT(NULLIF(isbadexit, FALSE)) AS isbadexit, + COUNT(NULLIF(isbaddirectory, FALSE)) AS isbaddirectory, + COUNT(NULLIF(ishsdir, FALSE)) AS ishsdir, + COUNT(NULLIF(isnamed, FALSE)) AS isnamed, + COUNT(NULLIF(isunnamed, FALSE)) AS isunnamed, + COUNT(NULLIF(isvalid, FALSE)) AS isvalid, + COUNT(NULLIF(isv2dir, FALSE)) AS isv2dir, + COUNT(NULLIF(isv3dir, FALSE)) AS isv3dir + FROM statusentry + WHERE isrunning = TRUE + AND validafter >= ''' || min_date || ''' + AND validafter < ''' || max_date || ''' + AND DATE(validafter) IN (SELECT date FROM updates) + GROUP BY DATE(validafter) + ) b + NATURAL JOIN relay_statuses_per_day'; + + RETURN 1; + END; +$$ LANGUAGE plpgsql; + +-- FUNCTION refresh_relay_platforms() +CREATE OR REPLACE FUNCTION refresh_relay_platforms() RETURNS INTEGER AS $$ + DECLARE + min_date TIMESTAMP WITHOUT TIME ZONE; + max_date TIMESTAMP WITHOUT TIME ZONE; + BEGIN + + min_date := (SELECT MIN(date) FROM updates); + max_date := (SELECT MAX(date) + 1 FROM updates); + + DELETE FROM relay_platforms + WHERE date IN (SELECT date FROM updates); + + EXECUTE ' + INSERT INTO relay_platforms + (date, avg_linux, avg_darwin, avg_bsd, avg_windows, avg_other) + SELECT date, + linux / count AS avg_linux, + darwin / count AS avg_darwin, + bsd / count AS avg_bsd, + windows / count AS avg_windows, + other / count AS avg_other + FROM ( + SELECT DATE(validafter) AS date, + SUM(CASE WHEN platform LIKE ''%Linux%'' THEN 1 ELSE 0 END) + AS linux, + SUM(CASE WHEN platform LIKE ''%Darwin%'' THEN 1 ELSE 0 END) + AS darwin, + SUM(CASE WHEN platform LIKE ''%BSD%'' THEN 1 ELSE 0 END) + AS bsd, + SUM(CASE WHEN platform LIKE ''%Windows%'' THEN 1 ELSE 0 END) + AS windows, + SUM(CASE WHEN platform NOT LIKE ''%Windows%'' + AND platform NOT LIKE ''%Darwin%'' + AND platform NOT LIKE ''%BSD%'' + AND platform NOT LIKE ''%Linux%'' THEN 1 ELSE 0 END) + AS other + FROM descriptor + RIGHT JOIN statusentry + ON statusentry.descriptor = descriptor.descriptor + WHERE isrunning = TRUE + AND validafter >= ''' || min_date || ''' + AND validafter < ''' || max_date || ''' + AND DATE(validafter) IN (SELECT date FROM updates) + GROUP BY DATE(validafter) + ) b + NATURAL JOIN relay_statuses_per_day'; + + RETURN 1; + END; +$$ LANGUAGE plpgsql; + +-- FUNCTION refresh_relay_versions() +CREATE OR REPLACE FUNCTION refresh_relay_versions() RETURNS INTEGER AS $$ + DECLARE + min_date TIMESTAMP WITHOUT TIME ZONE; + max_date TIMESTAMP WITHOUT TIME ZONE; + BEGIN + + min_date := (SELECT MIN(date) FROM updates); + max_date := (SELECT MAX(date) + 1 FROM updates); + + DELETE FROM relay_versions + WHERE date IN (SELECT date FROM updates); + + EXECUTE ' + INSERT INTO relay_versions + (date, version, relays) + SELECT date, version, relays / count AS relays + FROM ( + SELECT DATE(validafter), SUBSTRING(platform, 5, 5) AS version, + COUNT(*) AS relays + FROM descriptor RIGHT JOIN statusentry + ON descriptor.descriptor = statusentry.descriptor + WHERE isrunning = TRUE + AND platform IS NOT NULL + AND validafter >= ''' || min_date || ''' + AND validafter < ''' || max_date || ''' + AND DATE(validafter) IN (SELECT date FROM updates) + GROUP BY 1, 2 + ) b + NATURAL JOIN relay_statuses_per_day'; + + RETURN 1; + END; +$$ LANGUAGE plpgsql; + +-- FUNCTION refresh_total_bandwidth() +-- This keeps the table total_bandwidth up-to-date when necessary. +CREATE OR REPLACE FUNCTION refresh_total_bandwidth() RETURNS INTEGER AS $$ + DECLARE + min_date TIMESTAMP WITHOUT TIME ZONE; + max_date TIMESTAMP WITHOUT TIME ZONE; + BEGIN + + min_date := (SELECT MIN(date) FROM updates); + max_date := (SELECT MAX(date) + 1 FROM updates); + + DELETE FROM total_bandwidth + WHERE date IN (SELECT date FROM updates); + + EXECUTE ' + INSERT INTO total_bandwidth + (bwavg, bwburst, bwobserved, bwadvertised, date) + SELECT (SUM(bandwidthavg) + / relay_statuses_per_day.count)::BIGINT AS bwavg, + (SUM(bandwidthburst) + / relay_statuses_per_day.count)::BIGINT AS bwburst, + (SUM(bandwidthobserved) + / relay_statuses_per_day.count)::BIGINT AS bwobserved, + (SUM(LEAST(bandwidthavg, bandwidthobserved)) + / relay_statuses_per_day.count)::BIGINT AS bwadvertised, + DATE(validafter) + FROM descriptor RIGHT JOIN statusentry + ON descriptor.descriptor = statusentry.descriptor + JOIN relay_statuses_per_day + ON DATE(validafter) = relay_statuses_per_day.date + WHERE isrunning = TRUE + AND validafter >= ''' || min_date || ''' + AND validafter < ''' || max_date || ''' + AND DATE(validafter) IN (SELECT date FROM updates) + AND relay_statuses_per_day.date >= ''' || min_date || ''' + AND relay_statuses_per_day.date < ''' || max_date || ''' + AND DATE(relay_statuses_per_day.date) IN + (SELECT date FROM updates) + GROUP BY DATE(validafter), relay_statuses_per_day.count'; + + RETURN 1; + END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION refresh_total_bwhist() RETURNS INTEGER AS $$ + BEGIN + DELETE FROM total_bwhist WHERE date IN (SELECT date FROM updates); + INSERT INTO total_bwhist (date, read, written) + SELECT date, SUM(read_sum) AS read, SUM(written_sum) AS written + FROM bwhist + WHERE date >= (SELECT MIN(date) FROM updates) + AND date <= (SELECT MAX(date) FROM updates) + AND date IN (SELECT date FROM updates) + GROUP BY date; + RETURN 1; + END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION refresh_bandwidth_flags() RETURNS INTEGER AS $$ + DECLARE + min_date TIMESTAMP WITHOUT TIME ZONE; + max_date TIMESTAMP WITHOUT TIME ZONE; + BEGIN + + min_date := (SELECT MIN(date) FROM updates); + max_date := (SELECT MAX(date) + 1 FROM updates); + + DELETE FROM bandwidth_flags WHERE date IN (SELECT date FROM updates); + EXECUTE ' + INSERT INTO bandwidth_flags (date, isexit, isguard, bwadvertised) + SELECT DATE(validafter) AS date, + BOOL_OR(isexit) AS isexit, + BOOL_OR(isguard) AS isguard, + (SUM(LEAST(bandwidthavg, bandwidthobserved)) + / relay_statuses_per_day.count)::BIGINT AS bwadvertised + FROM descriptor RIGHT JOIN statusentry + ON descriptor.descriptor = statusentry.descriptor + JOIN relay_statuses_per_day + ON DATE(validafter) = relay_statuses_per_day.date + WHERE isrunning = TRUE + AND validafter >= ''' || min_date || ''' + AND validafter < ''' || max_date || ''' + AND DATE(validafter) IN (SELECT date FROM updates) + AND relay_statuses_per_day.date >= ''' || min_date || ''' + AND relay_statuses_per_day.date < ''' || max_date || ''' + AND DATE(relay_statuses_per_day.date) IN + (SELECT date FROM updates) + GROUP BY DATE(validafter), isexit, isguard, relay_statuses_per_day.count'; + RETURN 1; + END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION refresh_bwhist_flags() RETURNS INTEGER AS $$ + DECLARE + min_date TIMESTAMP WITHOUT TIME ZONE; + max_date TIMESTAMP WITHOUT TIME ZONE; + BEGIN + + min_date := (SELECT MIN(date) FROM updates); + max_date := (SELECT MAX(date) + 1 FROM updates); + + DELETE FROM bwhist_flags WHERE date IN (SELECT date FROM updates); + EXECUTE ' + INSERT INTO bwhist_flags (date, isexit, isguard, read, written) + SELECT a.date, isexit, isguard, SUM(read_sum) as read, + SUM(written_sum) AS written + FROM + (SELECT DATE(validafter) AS date, + fingerprint, + BOOL_OR(isexit) AS isexit, + BOOL_OR(isguard) AS isguard + FROM statusentry + WHERE isrunning = TRUE + AND validafter >= ''' || min_date || ''' + AND validafter < ''' || max_date || ''' + AND DATE(validafter) IN (SELECT date FROM updates) + GROUP BY 1, 2) a + JOIN bwhist + ON a.date = bwhist.date + AND a.fingerprint = bwhist.fingerprint + GROUP BY 1, 2, 3'; + RETURN 1; + END; +$$ LANGUAGE plpgsql; + +-- FUNCTION refresh_user_stats() +-- This function refreshes our user statistics by weighting reported +-- directory request statistics of directory mirrors with bandwidth +-- histories. +CREATE OR REPLACE FUNCTION refresh_user_stats() RETURNS INTEGER AS $$ + DECLARE + min_date TIMESTAMP WITHOUT TIME ZONE; + max_date TIMESTAMP WITHOUT TIME ZONE; + BEGIN + + min_date := (SELECT MIN(date) FROM updates); + max_date := (SELECT MAX(date) + 1 FROM updates); + + -- Start by deleting user statistics of the dates we're about to + -- regenerate. + DELETE FROM user_stats WHERE date IN (SELECT date FROM updates); + -- Now insert new user statistics. + EXECUTE ' + INSERT INTO user_stats (date, country, r, dw, dr, drw, drr, bw, br, bwd, + brd, bwr, brr, bwdr, brdr, bwp, brp, bwn, brn) + SELECT + -- We want to learn about total requests by date and country. + dirreq_stats_by_country.date AS date, + dirreq_stats_by_country.country AS country, + dirreq_stats_by_country.r AS r, + -- In order to weight the reported directory requests, we are + -- counting bytes of relays (except directory authorities) + -- matching certain criteria: whether or not they are reporting + -- directory requests, whether or not they are reporting + -- directory bytes, and whether their directory port is open or + -- closed. + SUM(CASE WHEN authority IS NOT NULL + THEN NULL ELSE dirwritten END) AS dw, + SUM(CASE WHEN authority IS NOT NULL + THEN NULL ELSE dirread END) AS dr, + SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL + THEN NULL ELSE dirwritten END) AS dwr, + SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL + THEN NULL ELSE dirread END) AS drr, + SUM(CASE WHEN authority IS NOT NULL + THEN NULL ELSE written END) AS bw, + SUM(CASE WHEN authority IS NOT NULL + THEN NULL ELSE read END) AS br, + SUM(CASE WHEN dirwritten = 0 OR authority IS NOT NULL + THEN NULL ELSE written END) AS bwd, + SUM(CASE WHEN dirwritten = 0 OR authority IS NOT NULL + THEN NULL ELSE read END) AS brd, + SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL + THEN NULL ELSE written END) AS bwr, + SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL + THEN NULL ELSE read END) AS brr, + SUM(CASE WHEN dirwritten = 0 OR requests IS NULL + OR authority IS NOT NULL THEN NULL ELSE written END) AS bwdr, + SUM(CASE WHEN dirwritten = 0 OR requests IS NULL + OR authority IS NOT NULL THEN NULL ELSE read END) AS brdr, + SUM(CASE WHEN opendirport IS NULL OR authority IS NOT NULL + THEN NULL ELSE written END) AS bwp, + SUM(CASE WHEN opendirport IS NULL OR authority IS NOT NULL + THEN NULL ELSE read END) AS brp, + SUM(CASE WHEN opendirport IS NOT NULL OR authority IS NOT NULL + THEN NULL ELSE written END) AS bwn, + SUM(CASE WHEN opendirport IS NOT NULL OR authority IS NOT NULL + THEN NULL ELSE read END) AS brn + FROM ( + -- The first sub-select tells us the total number of directory + -- requests per country reported by all directory mirrors. + SELECT dirreq_stats_by_date.date AS date, country, SUM(requests) AS r + FROM ( + SELECT fingerprint, date, country, SUM(requests) AS requests + FROM ( + -- There are two selects here, because in most cases the directory + -- request statistics cover two calendar dates. + SELECT LOWER(source) AS fingerprint, DATE(statsend) AS date, + country, FLOOR(requests * (CASE + WHEN EXTRACT(EPOCH FROM DATE(statsend)) > + EXTRACT(EPOCH FROM statsend) - seconds + THEN EXTRACT(EPOCH FROM statsend) - + EXTRACT(EPOCH FROM DATE(statsend)) + ELSE seconds END) / seconds) AS requests + FROM dirreq_stats + UNION + SELECT LOWER(source) AS fingerprint, DATE(statsend) - 1 AS date, + country, FLOOR(requests * + (EXTRACT(EPOCH FROM DATE(statsend)) - + EXTRACT(EPOCH FROM statsend) + seconds) + / seconds) AS requests + FROM dirreq_stats + WHERE EXTRACT(EPOCH FROM DATE(statsend)) - + EXTRACT(EPOCH FROM statsend) + seconds > 0 + ) dirreq_stats_split + GROUP BY 1, 2, 3 + ) dirreq_stats_by_date + -- We are only interested in requests by directory mirrors, not + -- directory authorities, so we exclude all relays with the Authority + -- flag. + RIGHT JOIN ( + SELECT fingerprint, DATE(validafter) AS date + FROM statusentry + WHERE validafter >= ''' || min_date || ''' + AND validafter < ''' || max_date || ''' + AND DATE(validafter) IN (SELECT date FROM updates) + AND isauthority IS FALSE + GROUP BY 1, 2 + ) statusentry_dirmirrors + ON dirreq_stats_by_date.fingerprint = + statusentry_dirmirrors.fingerprint + AND dirreq_stats_by_date.date = statusentry_dirmirrors.date + GROUP BY 1, 2 + ) dirreq_stats_by_country + LEFT JOIN ( + -- In the next step, we expand the result by bandwidth histories of + -- all relays. + SELECT fingerprint, date, read_sum AS read, written_sum AS written, + dirread_sum AS dirread, dirwritten_sum AS dirwritten + FROM bwhist + WHERE date >= ''' || min_date || ''' + AND date < ''' || max_date || ''' + AND date IN (SELECT date FROM updates) + ) bwhist_by_relay + ON dirreq_stats_by_country.date = bwhist_by_relay.date + LEFT JOIN ( + -- For each relay, tell how often it had an open directory port and + -- how often it had the Authority flag assigned on a given date. + SELECT fingerprint, DATE(validafter) AS date, + SUM(CASE WHEN dirport > 0 THEN 1 ELSE NULL END) AS opendirport, + SUM(CASE WHEN isauthority IS TRUE THEN 1 ELSE NULL END) AS authority + FROM statusentry + WHERE validafter >= ''' || min_date || ''' + AND validafter < ''' || max_date || ''' + AND DATE(validafter) IN (SELECT date FROM updates) + GROUP BY 1, 2 + ) statusentry_by_relay + ON bwhist_by_relay.fingerprint = statusentry_by_relay.fingerprint + AND bwhist_by_relay.date = statusentry_by_relay.date + LEFT JOIN ( + -- For each relay, tell if it has reported directory request + -- statistics on a given date. Again, we have to take into account + -- that statistics intervals cover more than one calendar date in most + -- cases. The exact number of requests is not relevant here, but only + -- whether the relay reported directory requests or not. + SELECT fingerprint, date, 1 AS requests + FROM ( + SELECT LOWER(source) AS fingerprint, DATE(statsend) AS date + FROM dirreq_stats + WHERE DATE(statsend) >= ''' || min_date || ''' + AND DATE(statsend) < ''' || max_date || ''' + AND DATE(statsend) IN (SELECT date FROM updates) + AND country = ''zy'' + UNION + SELECT LOWER(source) AS fingerprint, DATE(statsend) - 1 AS date + FROM dirreq_stats + WHERE DATE(statsend) - 1 >= ''' || min_date || ''' + AND DATE(statsend) - 1 < ''' || max_date || ''' + AND DATE(statsend) IN (SELECT date FROM updates) + AND country = ''zy'' + AND EXTRACT(EPOCH FROM DATE(statsend)) - + EXTRACT(EPOCH FROM statsend) + seconds > 0 + ) dirreq_stats_split + GROUP BY 1, 2 + ) dirreq_stats_by_relay + ON bwhist_by_relay.fingerprint = dirreq_stats_by_relay.fingerprint + AND bwhist_by_relay.date = dirreq_stats_by_relay.date + WHERE dirreq_stats_by_country.country IS NOT NULL + -- Group by date, country, and total reported directory requests, + -- summing up the bandwidth histories. + GROUP BY 1, 2, 3'; + RETURN 1; + END; +$$ LANGUAGE plpgsql; + +-- non-relay statistics +-- The following tables contain pre-aggregated statistics that are not +-- based on relay descriptors or that are not yet derived from the relay +-- descriptors in the database. + +-- TABLE bridge_network_size +-- Contains average number of running bridges. +CREATE TABLE bridge_network_size ( + "date" DATE NOT NULL, + avg_running INTEGER NOT NULL, + avg_running_ec2 INTEGER NOT NULL, + CONSTRAINT bridge_network_size_pkey PRIMARY KEY(date) +); + +-- TABLE dirreq_stats +-- Contains daily users by country. +CREATE TABLE dirreq_stats ( + source CHARACTER(40) NOT NULL, + statsend TIMESTAMP WITHOUT TIME ZONE NOT NULL, + seconds INTEGER NOT NULL, + country CHARACTER(2) NOT NULL, + requests INTEGER NOT NULL, + CONSTRAINT dirreq_stats_pkey + PRIMARY KEY (source, statsend, seconds, country) +); + +-- TABLE torperf_stats +-- Quantiles and medians of daily torperf results. +CREATE TABLE torperf_stats ( + "date" DATE NOT NULL, + source CHARACTER VARYING(32) NOT NULL, + q1 INTEGER NOT NULL, + md INTEGER NOT NULL, + q3 INTEGER NOT NULL, + timeouts INTEGER NOT NULL, + failures INTEGER NOT NULL, + requests INTEGER NOT NULL, + CONSTRAINT torperf_stats_pkey PRIMARY KEY("date", source) +); + +-- Refresh all statistics in the database. +CREATE OR REPLACE FUNCTION refresh_all() RETURNS INTEGER AS $$ + BEGIN + RAISE NOTICE '% Starting refresh run.', timeofday(); + RAISE NOTICE '% Deleting old dates from updates.', timeofday(); + DELETE FROM updates; + RAISE NOTICE '% Copying scheduled dates.', timeofday(); + INSERT INTO updates SELECT * FROM scheduled_updates; + RAISE NOTICE '% Refreshing relay statuses per day.', timeofday(); + PERFORM refresh_relay_statuses_per_day(); + RAISE NOTICE '% Refreshing network size.', timeofday(); + PERFORM refresh_network_size(); + RAISE NOTICE '% Refreshing relay platforms.', timeofday(); + PERFORM refresh_relay_platforms(); + RAISE NOTICE '% Refreshing relay versions.', timeofday(); + PERFORM refresh_relay_versions(); + RAISE NOTICE '% Refreshing total relay bandwidth.', timeofday(); + PERFORM refresh_total_bandwidth(); + RAISE NOTICE '% Refreshing relay bandwidth history.', timeofday(); + PERFORM refresh_total_bwhist(); + RAISE NOTICE '% Refreshing total relay bandwidth by flags.', timeofday(); + PERFORM refresh_bandwidth_flags(); + RAISE NOTICE '% Refreshing bandwidth history by flags.', timeofday(); + PERFORM refresh_bwhist_flags(); + RAISE NOTICE '% Refreshing user statistics.', timeofday(); + PERFORM refresh_user_stats(); + RAISE NOTICE '% Deleting processed dates.', timeofday(); + DELETE FROM scheduled_updates WHERE id IN (SELECT id FROM updates); + RAISE NOTICE '% Terminating refresh run.', timeofday(); + RETURN 1; + END; +$$ LANGUAGE plpgsql; + +-- View for exporting server statistics. +CREATE VIEW stats_servers AS + (SELECT date, NULL AS flag, NULL AS country, NULL AS version, + NULL AS platform, TRUE AS ec2bridge, NULL AS relays, + avg_running_ec2 AS bridges FROM bridge_network_size + WHERE date < current_date - 1) +UNION ALL + (SELECT COALESCE(network_size.date, bridge_network_size.date) AS date, + NULL AS flag, NULL AS country, NULL AS version, NULL AS platform, + NULL AS ec2bridge, network_size.avg_running AS relays, + bridge_network_size.avg_running AS bridges FROM network_size + FULL OUTER JOIN bridge_network_size + ON network_size.date = bridge_network_size.date + WHERE COALESCE(network_size.date, bridge_network_size.date) < + current_date - 1) +UNION ALL + (SELECT date, 'Exit' AS flag, NULL AS country, NULL AS version, + NULL AS platform, NULL AS ec2bridge, avg_exit AS relays, + NULL AS bridges FROM network_size WHERE date < current_date - 1) +UNION ALL + (SELECT date, 'Guard' AS flag, NULL AS country, NULL AS version, + NULL AS platform, NULL AS ec2bridge, avg_guard AS relays, + NULL AS bridges FROM network_size WHERE date < current_date - 1) +UNION ALL + (SELECT date, 'Fast' AS flag, NULL AS country, NULL AS version, + NULL AS platform, NULL AS ec2bridge, avg_fast AS relays, + NULL AS bridges FROM network_size WHERE date < current_date - 1) +UNION ALL + (SELECT date, 'Stable' AS flag, NULL AS country, NULL AS version, + NULL AS platform, NULL AS ec2bridge, avg_stable AS relays, + NULL AS bridges FROM network_size WHERE date < current_date - 1) +UNION ALL + (SELECT date, 'HSDir' AS flag, NULL AS country, NULL AS version, + NULL AS platform, NULL AS ec2bridge, avg_hsdir AS relays, + NULL AS bridges FROM network_size WHERE date < current_date - 1) +UNION ALL + (SELECT date, NULL AS flag, CASE WHEN country != 'zz' THEN country + ELSE '??' END AS country, NULL AS version, NULL AS platform, + NULL AS ec2bridge, relays, NULL AS bridges FROM relay_countries + WHERE date < current_date - 1) +UNION ALL + (SELECT date, NULL AS flag, NULL AS country, version, NULL AS platform, + NULL AS ec2bridge, relays, NULL AS bridges FROM relay_versions + WHERE date < current_date - 1) +UNION ALL + (SELECT date, NULL AS flag, NULL AS country, NULL AS version, + 'Linux' AS platform, NULL AS ec2bridge, avg_linux AS relays, + NULL AS bridges FROM relay_platforms WHERE date < current_date - 1) +UNION ALL + (SELECT date, NULL AS flag, NULL AS country, NULL AS version, + 'Darwin' AS platform, NULL AS ec2bridge, avg_darwin AS relays, + NULL AS bridges FROM relay_platforms WHERE date < current_date - 1) +UNION ALL + (SELECT date, NULL AS flag, NULL AS country, NULL AS version, + 'FreeBSD' AS platform, NULL AS ec2bridge, avg_bsd AS relays, + NULL AS bridges FROM relay_platforms WHERE date < current_date - 1) +UNION ALL + (SELECT date, NULL AS flag, NULL AS country, NULL AS version, + 'Windows' AS platform, NULL AS ec2bridge, avg_windows AS relays, + NULL AS bridges FROM relay_platforms WHERE date < current_date - 1) +UNION ALL + (SELECT date, NULL AS flag, NULL AS country, NULL AS version, + 'Other' AS platform, NULL AS ec2bridge, avg_other AS relays, + NULL AS bridges FROM relay_platforms WHERE date < current_date - 1) +ORDER BY 1, 2, 3, 4, 5, 6; + +-- View for exporting bandwidth statistics. +CREATE VIEW stats_bandwidth AS + (SELECT COALESCE(bandwidth_flags.date, bwhist_flags.date) AS date, + COALESCE(bandwidth_flags.isexit, bwhist_flags.isexit) AS isexit, + COALESCE(bandwidth_flags.isguard, bwhist_flags.isguard) AS isguard, + bandwidth_flags.bwadvertised AS advbw, + CASE WHEN bwhist_flags.read IS NOT NULL + THEN bwhist_flags.read / 86400 END AS bwread, + CASE WHEN bwhist_flags.written IS NOT NULL + THEN bwhist_flags.written / 86400 END AS bwwrite, + NULL AS dirread, NULL AS dirwrite + FROM bandwidth_flags FULL OUTER JOIN bwhist_flags + ON bandwidth_flags.date = bwhist_flags.date + AND bandwidth_flags.isexit = bwhist_flags.isexit + AND bandwidth_flags.isguard = bwhist_flags.isguard + WHERE COALESCE(bandwidth_flags.date, bwhist_flags.date) < + current_date - 3) +UNION ALL + (SELECT COALESCE(total_bandwidth.date, total_bwhist.date, u.date) + AS date, NULL AS isexit, NULL AS isguard, + total_bandwidth.bwadvertised AS advbw, + CASE WHEN total_bwhist.read IS NOT NULL + THEN total_bwhist.read / 86400 END AS bwread, + CASE WHEN total_bwhist.written IS NOT NULL + THEN total_bwhist.written / 86400 END AS bwwrite, + CASE WHEN u.date IS NOT NULL + THEN FLOOR(CAST(u.dr AS NUMERIC) * CAST(u.brp AS NUMERIC) / + CAST(u.brd AS NUMERIC) / CAST(86400 AS NUMERIC)) END AS dirread, + CASE WHEN u.date IS NOT NULL + THEN FLOOR(CAST(u.dw AS NUMERIC) * CAST(u.bwp AS NUMERIC) / + CAST(u.bwd AS NUMERIC) / CAST(86400 AS NUMERIC)) END AS dirwrite + FROM total_bandwidth FULL OUTER JOIN total_bwhist + ON total_bandwidth.date = total_bwhist.date + FULL OUTER JOIN (SELECT * FROM user_stats WHERE country = 'zy' + AND bwp / bwd <= 3) u + ON COALESCE(total_bandwidth.date, total_bwhist.date) = u.date + WHERE COALESCE(total_bandwidth.date, total_bwhist.date, u.date) < + current_date - 3) +ORDER BY 1, 2, 3; + +-- View for exporting torperf statistics. +CREATE VIEW stats_torperf AS +SELECT date, CASE WHEN source LIKE '%-50kb' THEN 50 * 1024 + WHEN source LIKE '%-1mb' THEN 1024 * 1024 + WHEN source LIKE '%-5mb' THEN 5 * 1024 * 1024 END AS size, + CASE WHEN source NOT LIKE 'all-%' + THEN split_part(source, '-', 1) END AS source, q1, md, q3, timeouts, + failures, requests FROM torperf_stats WHERE date < current_date - 1 + ORDER BY 1, 2, 3; + +-- View for exporting connbidirect statistics. +CREATE VIEW stats_connbidirect AS +SELECT DATE(statsend) AS date, source, belownum AS below, readnum AS read, + writenum AS write, bothnum AS "both" FROM connbidirect + WHERE DATE(statsend) < current_date - 1 ORDER BY 1, 2; + diff --git a/modules/legacy/src/org/torproject/ernie/cron/Configuration.java b/modules/legacy/src/org/torproject/ernie/cron/Configuration.java new file mode 100644 index 0000000..878e882 --- /dev/null +++ b/modules/legacy/src/org/torproject/ernie/cron/Configuration.java @@ -0,0 +1,163 @@ +/* Copyright 2011, 2012 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Initialize configuration with hard-coded defaults, overwrite with + * configuration in config file, if exists, and answer Main.java about our + * configuration. + */ +public class Configuration { + private boolean importDirectoryArchives = false; + private String directoryArchivesDirectory = "in/relay-descriptors/"; + private boolean keepDirectoryArchiveImportHistory = false; + private boolean importSanitizedBridges = false; + private String sanitizedBridgesDirectory = "in/bridge-descriptors/"; + private boolean keepSanitizedBridgesImportHistory = false; + private boolean writeRelayDescriptorDatabase = false; + private String relayDescriptorDatabaseJdbc = + "jdbc:postgresql://localhost/tordir?user=metrics&password=password"; + private boolean writeRelayDescriptorsRawFiles = false; + private String relayDescriptorRawFilesDirectory = "pg-import/"; + private boolean writeBridgeStats = false; + private boolean importWriteTorperfStats = false; + private String torperfDirectory = "in/torperf/"; + private String exoneraTorDatabaseJdbc = "jdbc:postgresql:" + + "//localhost/exonerator?user=metrics&password=password"; + private String exoneraTorImportDirectory = "exonerator-import/"; + + public Configuration() { + + /* Initialize logger. */ + Logger logger = Logger.getLogger(Configuration.class.getName()); + + /* Read config file, if present. */ + File configFile = new File("config"); + if (!configFile.exists()) { + logger.warning("Could not find config file."); + return; + } + String line = null; + try { + BufferedReader br = new BufferedReader(new FileReader(configFile)); + while ((line = br.readLine()) != null) { + if (line.startsWith("#") || line.length() < 1) { + continue; + } else if (line.startsWith("ImportDirectoryArchives")) { + this.importDirectoryArchives = Integer.parseInt( + line.split(" ")[1]) != 0; + } else if (line.startsWith("DirectoryArchivesDirectory")) { + this.directoryArchivesDirectory = line.split(" ")[1]; + } else if (line.startsWith("KeepDirectoryArchiveImportHistory")) { + this.keepDirectoryArchiveImportHistory = Integer.parseInt( + line.split(" ")[1]) != 0; + } else if (line.startsWith("ImportSanitizedBridges")) { + this.importSanitizedBridges = Integer.parseInt( + line.split(" ")[1]) != 0; + } else if (line.startsWith("SanitizedBridgesDirectory")) { + this.sanitizedBridgesDirectory = line.split(" ")[1]; + } else if (line.startsWith("KeepSanitizedBridgesImportHistory")) { + this.keepSanitizedBridgesImportHistory = Integer.parseInt( + line.split(" ")[1]) != 0; + } else if (line.startsWith("WriteRelayDescriptorDatabase")) { + this.writeRelayDescriptorDatabase = Integer.parseInt( + line.split(" ")[1]) != 0; + } else if (line.startsWith("RelayDescriptorDatabaseJDBC")) { + this.relayDescriptorDatabaseJdbc = line.split(" ")[1]; + } else if (line.startsWith("WriteRelayDescriptorsRawFiles")) { + this.writeRelayDescriptorsRawFiles = Integer.parseInt( + line.split(" ")[1]) != 0; + } else if (line.startsWith("RelayDescriptorRawFilesDirectory")) { + this.relayDescriptorRawFilesDirectory = line.split(" ")[1]; + } else if (line.startsWith("WriteBridgeStats")) { + this.writeBridgeStats = Integer.parseInt( + line.split(" ")[1]) != 0; + } else if (line.startsWith("ImportWriteTorperfStats")) { + this.importWriteTorperfStats = Integer.parseInt( + line.split(" ")[1]) != 0; + } else if (line.startsWith("TorperfDirectory")) { + this.torperfDirectory = line.split(" ")[1]; + } else if (line.startsWith("ExoneraTorDatabaseJdbc")) { + this.exoneraTorDatabaseJdbc = line.split(" ")[1]; + } else if (line.startsWith("ExoneraTorImportDirectory")) { + this.exoneraTorImportDirectory = line.split(" ")[1]; + } else { + logger.severe("Configuration file contains unrecognized " + + "configuration key in line '" + line + "'! Exiting!"); + System.exit(1); + } + } + br.close(); + } catch (ArrayIndexOutOfBoundsException e) { + logger.severe("Configuration file contains configuration key " + + "without value in line '" + line + "'. Exiting!"); + System.exit(1); + } catch (MalformedURLException e) { + logger.severe("Configuration file contains illegal URL or IP:port " + + "pair in line '" + line + "'. Exiting!"); + System.exit(1); + } catch (NumberFormatException e) { + logger.severe("Configuration file contains illegal value in line '" + + line + "' with legal values being 0 or 1. Exiting!"); + System.exit(1); + } catch (IOException e) { + logger.log(Level.SEVERE, "Unknown problem while reading config " + + "file! Exiting!", e); + System.exit(1); + } + } + public boolean getImportDirectoryArchives() { + return this.importDirectoryArchives; + } + public String getDirectoryArchivesDirectory() { + return this.directoryArchivesDirectory; + } + public boolean getKeepDirectoryArchiveImportHistory() { + return this.keepDirectoryArchiveImportHistory; + } + public boolean getWriteRelayDescriptorDatabase() { + return this.writeRelayDescriptorDatabase; + } + public boolean getImportSanitizedBridges() { + return this.importSanitizedBridges; + } + public String getSanitizedBridgesDirectory() { + return this.sanitizedBridgesDirectory; + } + public boolean getKeepSanitizedBridgesImportHistory() { + return this.keepSanitizedBridgesImportHistory; + } + public String getRelayDescriptorDatabaseJDBC() { + return this.relayDescriptorDatabaseJdbc; + } + public boolean getWriteRelayDescriptorsRawFiles() { + return this.writeRelayDescriptorsRawFiles; + } + public String getRelayDescriptorRawFilesDirectory() { + return this.relayDescriptorRawFilesDirectory; + } + public boolean getWriteBridgeStats() { + return this.writeBridgeStats; + } + public boolean getImportWriteTorperfStats() { + return this.importWriteTorperfStats; + } + public String getTorperfDirectory() { + return this.torperfDirectory; + } + public String getExoneraTorDatabaseJdbc() { + return this.exoneraTorDatabaseJdbc; + } + public String getExoneraTorImportDirectory() { + return this.exoneraTorImportDirectory; + } +} + diff --git a/modules/legacy/src/org/torproject/ernie/cron/LockFile.java b/modules/legacy/src/org/torproject/ernie/cron/LockFile.java new file mode 100644 index 0000000..4de8da0 --- /dev/null +++ b/modules/legacy/src/org/torproject/ernie/cron/LockFile.java @@ -0,0 +1,52 @@ +/* Copyright 2011, 2012 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.logging.Logger; + +public class LockFile { + + private File lockFile; + private Logger logger; + + public LockFile() { + this.lockFile = new File("lock"); + this.logger = Logger.getLogger(LockFile.class.getName()); + } + + public boolean acquireLock() { + this.logger.fine("Trying to acquire lock..."); + try { + if (this.lockFile.exists()) { + BufferedReader br = new BufferedReader(new FileReader("lock")); + long runStarted = Long.parseLong(br.readLine()); + br.close(); + if (System.currentTimeMillis() - runStarted < 55L * 60L * 1000L) { + return false; + } + } + BufferedWriter bw = new BufferedWriter(new FileWriter("lock")); + bw.append("" + System.currentTimeMillis() + "\n"); + bw.close(); + this.logger.fine("Acquired lock."); + return true; + } catch (IOException e) { + this.logger.warning("Caught exception while trying to acquire " + + "lock!"); + return false; + } + } + + public void releaseLock() { + this.logger.fine("Releasing lock..."); + this.lockFile.delete(); + this.logger.fine("Released lock."); + } +} + diff --git a/modules/legacy/src/org/torproject/ernie/cron/LoggingConfiguration.java b/modules/legacy/src/org/torproject/ernie/cron/LoggingConfiguration.java new file mode 100644 index 0000000..c261d95 --- /dev/null +++ b/modules/legacy/src/org/torproject/ernie/cron/LoggingConfiguration.java @@ -0,0 +1,94 @@ +/* Copyright 2011, 2012 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.TimeZone; +import java.util.logging.ConsoleHandler; +import java.util.logging.FileHandler; +import java.util.logging.Formatter; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; + +/** + * Initialize logging configuration. + * + * Log levels used by ERNIE: + * + * - SEVERE: An event made it impossible to continue program execution. + * - WARNING: A potential problem occurred that requires the operator to + * look after the otherwise unattended setup + * - INFO: Messages on INFO level are meant to help the operator in making + * sure that operation works as expected. + * - FINE: Debug messages that are used to identify problems and which are + * turned on by default. + * - FINER: More detailed debug messages to investigate problems in more + * detail. Not turned on by default. Increase log file limit when using + * FINER. + * - FINEST: Most detailed debug messages. Not used. + */ +public class LoggingConfiguration { + + public LoggingConfiguration() { + + /* Remove default console handler. */ + for (Handler h : Logger.getLogger("").getHandlers()) { + Logger.getLogger("").removeHandler(h); + } + + /* Disable logging of internal Sun classes. */ + Logger.getLogger("sun").setLevel(Level.OFF); + + /* Set minimum log level we care about from INFO to FINER. */ + Logger.getLogger("").setLevel(Level.FINER); + + /* Create log handler that writes messages on WARNING or higher to the + * console. */ + final SimpleDateFormat dateTimeFormat = + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + Formatter cf = new Formatter() { + public String format(LogRecord record) { + return dateTimeFormat.format(new Date(record.getMillis())) + " " + + record.getMessage() + "\n"; + } + }; + Handler ch = new ConsoleHandler(); + ch.setFormatter(cf); + ch.setLevel(Level.WARNING); + Logger.getLogger("").addHandler(ch); + + /* Initialize own logger for this class. */ + Logger logger = Logger.getLogger( + LoggingConfiguration.class.getName()); + + /* Create log handler that writes all messages on FINE or higher to a + * local file. */ + Formatter ff = new Formatter() { + public String format(LogRecord record) { + return dateTimeFormat.format(new Date(record.getMillis())) + " " + + record.getLevel() + " " + record.getSourceClassName() + " " + + record.getSourceMethodName() + " " + record.getMessage() + + (record.getThrown() != null ? " " + record.getThrown() : "") + + "\n"; + } + }; + try { + FileHandler fh = new FileHandler("log", 5000000, 5, true); + fh.setFormatter(ff); + fh.setLevel(Level.FINE); + Logger.getLogger("").addHandler(fh); + } catch (SecurityException e) { + logger.log(Level.WARNING, "No permission to create log file. " + + "Logging to file is disabled.", e); + } catch (IOException e) { + logger.log(Level.WARNING, "Could not write to log file. Logging to " + + "file is disabled.", e); + } + } +} + diff --git a/modules/legacy/src/org/torproject/ernie/cron/Main.java b/modules/legacy/src/org/torproject/ernie/cron/Main.java new file mode 100644 index 0000000..5d561a6 --- /dev/null +++ b/modules/legacy/src/org/torproject/ernie/cron/Main.java @@ -0,0 +1,100 @@ +/* Copyright 2011, 2012 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron; + +import java.io.File; +import java.util.logging.Logger; + +import org.torproject.ernie.cron.network.ConsensusStatsFileHandler; +import org.torproject.ernie.cron.performance.PerformanceStatsImporter; +import org.torproject.ernie.cron.performance.TorperfProcessor; + +/** + * Coordinate downloading and parsing of descriptors and extraction of + * statistically relevant data for later processing with R. + */ +public class Main { + public static void main(String[] args) { + + /* Initialize logging configuration. */ + new LoggingConfiguration(); + + Logger logger = Logger.getLogger(Main.class.getName()); + logger.info("Starting ERNIE."); + + // Initialize configuration + Configuration config = new Configuration(); + + // Use lock file to avoid overlapping runs + LockFile lf = new LockFile(); + if (!lf.acquireLock()) { + logger.severe("Warning: ERNIE is already running or has not exited " + + "cleanly! Exiting!"); + System.exit(1); + } + + // Define stats directory for temporary files + File statsDirectory = new File("stats"); + + // Import relay descriptors + if (config.getImportDirectoryArchives()) { + RelayDescriptorDatabaseImporter rddi = + config.getWriteRelayDescriptorDatabase() || + config.getWriteRelayDescriptorsRawFiles() ? + new RelayDescriptorDatabaseImporter( + config.getWriteRelayDescriptorDatabase() ? + config.getRelayDescriptorDatabaseJDBC() : null, + config.getWriteRelayDescriptorsRawFiles() ? + config.getRelayDescriptorRawFilesDirectory() : null, + new File(config.getDirectoryArchivesDirectory()), + statsDirectory, + config.getKeepDirectoryArchiveImportHistory()) : null; + if (rddi != null) { + rddi.importRelayDescriptors(); + } + rddi.closeConnection(); + + // Import conn-bi-direct statistics. + PerformanceStatsImporter psi = new PerformanceStatsImporter( + config.getWriteRelayDescriptorDatabase() ? + config.getRelayDescriptorDatabaseJDBC() : null, + config.getWriteRelayDescriptorsRawFiles() ? + config.getRelayDescriptorRawFilesDirectory() : null, + new File(config.getDirectoryArchivesDirectory()), + statsDirectory, + config.getKeepDirectoryArchiveImportHistory()); + psi.importRelayDescriptors(); + psi.closeConnection(); + } + + // Prepare consensus stats file handler (used for stats on running + // bridges only) + ConsensusStatsFileHandler csfh = config.getWriteBridgeStats() ? + new ConsensusStatsFileHandler( + config.getRelayDescriptorDatabaseJDBC(), + new File(config.getSanitizedBridgesDirectory()), + statsDirectory, config.getKeepSanitizedBridgesImportHistory()) : + null; + + // Import sanitized bridges and write updated stats files to disk + if (csfh != null) { + if (config.getImportSanitizedBridges()) { + csfh.importSanitizedBridges(); + } + csfh.writeFiles(); + csfh = null; + } + + // Import and process torperf stats + if (config.getImportWriteTorperfStats()) { + new TorperfProcessor(new File(config.getTorperfDirectory()), + statsDirectory, config.getRelayDescriptorDatabaseJDBC()); + } + + // Remove lock file + lf.releaseLock(); + + logger.info("Terminating ERNIE."); + } +} + diff --git a/modules/legacy/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java b/modules/legacy/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java new file mode 100644 index 0000000..a51092e --- /dev/null +++ b/modules/legacy/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java @@ -0,0 +1,1077 @@ +/* Copyright 2011, 2012 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TimeZone; +import java.util.TreeSet; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.postgresql.util.PGbytea; +import org.torproject.descriptor.Descriptor; +import org.torproject.descriptor.DescriptorFile; +import org.torproject.descriptor.DescriptorReader; +import org.torproject.descriptor.DescriptorSourceFactory; +import org.torproject.descriptor.ExtraInfoDescriptor; +import org.torproject.descriptor.NetworkStatusEntry; +import org.torproject.descriptor.RelayNetworkStatusConsensus; +import org.torproject.descriptor.ServerDescriptor; + +/** + * Parse directory data. + */ + +/* TODO Split up this class and move its parts to cron.network, + * cron.users, and status.relaysearch packages. Requires extensive + * changes to the database schema though. */ +public final class RelayDescriptorDatabaseImporter { + + /** + * How many records to commit with each database transaction. + */ + private final long autoCommitCount = 500; + + /** + * Keep track of the number of records committed before each transaction + */ + private int rdsCount = 0; + private int resCount = 0; + private int rhsCount = 0; + private int rrsCount = 0; + private int rcsCount = 0; + private int rvsCount = 0; + private int rqsCount = 0; + + /** + * Relay descriptor database connection. + */ + private Connection conn; + + /** + * Prepared statement to check whether any network status consensus + * entries matching a given valid-after time have been imported into the + * database before. + */ + private PreparedStatement psSs; + + /** + * Prepared statement to check whether a given network status consensus + * entry has been imported into the database before. + */ + private PreparedStatement psRs; + + /** + * Prepared statement to check whether a given server descriptor has + * been imported into the database before. + */ + private PreparedStatement psDs; + + /** + * Prepared statement to check whether a given network status consensus + * has been imported into the database before. + */ + private PreparedStatement psCs; + + /** + * Prepared statement to check whether a given dirreq stats string has + * been imported into the database before. + */ + private PreparedStatement psQs; + + /** + * Set of dates that have been inserted into the database for being + * included in the next refresh run. + */ + private Set<Long> scheduledUpdates; + + /** + * Prepared statement to insert a date into the database that shall be + * included in the next refresh run. + */ + private PreparedStatement psU; + + /** + * Prepared statement to insert a network status consensus entry into + * the database. + */ + private PreparedStatement psR; + + /** + * Prepared statement to insert a server descriptor into the database. + */ + private PreparedStatement psD; + + /** + * Callable statement to insert the bandwidth history of an extra-info + * descriptor into the database. + */ + private CallableStatement csH; + + /** + * Prepared statement to insert a network status consensus into the + * database. + */ + private PreparedStatement psC; + + /** + * Prepared statement to insert a given dirreq stats string into the + * database. + */ + private PreparedStatement psQ; + + /** + * Logger for this class. + */ + private Logger logger; + + /** + * Directory for writing raw import files. + */ + private String rawFilesDirectory; + + /** + * Raw import file containing status entries. + */ + private BufferedWriter statusentryOut; + + /** + * Raw import file containing server descriptors. + */ + private BufferedWriter descriptorOut; + + /** + * Raw import file containing bandwidth histories. + */ + private BufferedWriter bwhistOut; + + /** + * Raw import file containing consensuses. + */ + private BufferedWriter consensusOut; + + /** + * Raw import file containing dirreq stats. + */ + private BufferedWriter dirReqOut; + + /** + * Date format to parse timestamps. + */ + private SimpleDateFormat dateTimeFormat; + + /** + * The last valid-after time for which we checked whether they have been + * any network status entries in the database. + */ + private long lastCheckedStatusEntries; + + /** + * Set of fingerprints that we imported for the valid-after time in + * <code>lastCheckedStatusEntries</code>. + */ + private Set<String> insertedStatusEntries; + + /** + * Flag that tells us whether we need to check whether a network status + * entry is already contained in the database or not. + */ + private boolean separateStatusEntryCheckNecessary; + + private boolean importIntoDatabase; + private boolean writeRawImportFiles; + + private File archivesDirectory; + private File statsDirectory; + private boolean keepImportHistory; + + /** + * Initialize database importer by connecting to the database and + * preparing statements. + */ + public RelayDescriptorDatabaseImporter(String connectionURL, + String rawFilesDirectory, File archivesDirectory, + File statsDirectory, boolean keepImportHistory) { + + if (archivesDirectory == null || + statsDirectory == null) { + throw new IllegalArgumentException(); + } + this.archivesDirectory = archivesDirectory; + this.statsDirectory = statsDirectory; + this.keepImportHistory = keepImportHistory; + + /* Initialize logger. */ + this.logger = Logger.getLogger( + RelayDescriptorDatabaseImporter.class.getName()); + + if (connectionURL != null) { + try { + /* Connect to database. */ + this.conn = DriverManager.getConnection(connectionURL); + + /* Turn autocommit off */ + this.conn.setAutoCommit(false); + + /* Prepare statements. */ + this.psSs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM statusentry WHERE validafter = ?"); + this.psRs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM statusentry WHERE validafter = ? AND " + + "fingerprint = ?"); + this.psDs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM descriptor WHERE descriptor = ?"); + this.psCs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM consensus WHERE validafter = ?"); + this.psQs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM dirreq_stats WHERE source = ? AND statsend = ?"); + this.psR = conn.prepareStatement("INSERT INTO statusentry " + + "(validafter, nickname, fingerprint, descriptor, " + + "published, address, orport, dirport, isauthority, " + + "isbadexit, isbaddirectory, isexit, isfast, isguard, " + + "ishsdir, isnamed, isstable, isrunning, isunnamed, " + + "isvalid, isv2dir, isv3dir, version, bandwidth, ports, " + + "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " + + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); + this.psD = conn.prepareStatement("INSERT INTO descriptor " + + "(descriptor, nickname, address, orport, dirport, " + + "fingerprint, bandwidthavg, bandwidthburst, " + + "bandwidthobserved, platform, published, uptime, " + + "extrainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " + + "?)"); + this.csH = conn.prepareCall("{call insert_bwhist(?, ?, ?, ?, ?, " + + "?)}"); + this.psC = conn.prepareStatement("INSERT INTO consensus " + + "(validafter) VALUES (?)"); + this.psQ = conn.prepareStatement("INSERT INTO dirreq_stats " + + "(source, statsend, seconds, country, requests) VALUES " + + "(?, ?, ?, ?, ?)"); + this.psU = conn.prepareStatement("INSERT INTO scheduled_updates " + + "(date) VALUES (?)"); + this.scheduledUpdates = new HashSet<Long>(); + this.importIntoDatabase = true; + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not connect to database or " + + "prepare statements.", e); + } + + /* Initialize set of fingerprints to remember which status entries + * we already imported. */ + this.insertedStatusEntries = new HashSet<String>(); + } + + /* Remember where we want to write raw import files. */ + if (rawFilesDirectory != null) { + this.rawFilesDirectory = rawFilesDirectory; + this.writeRawImportFiles = true; + } + + /* Initialize date format, so that we can format timestamps. */ + this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + private void addDateToScheduledUpdates(long timestamp) + throws SQLException { + if (!this.importIntoDatabase) { + return; + } + long dateMillis = 0L; + try { + dateMillis = this.dateTimeFormat.parse( + this.dateTimeFormat.format(timestamp).substring(0, 10) + + " 00:00:00").getTime(); + } catch (ParseException e) { + this.logger.log(Level.WARNING, "Internal parsing error.", e); + return; + } + if (!this.scheduledUpdates.contains(dateMillis)) { + this.psU.setDate(1, new java.sql.Date(dateMillis)); + this.psU.execute(); + this.scheduledUpdates.add(dateMillis); + } + } + + /** + * Insert network status consensus entry into database. + */ + public void addStatusEntry(long validAfter, String nickname, + String fingerprint, String descriptor, long published, + String address, long orPort, long dirPort, + SortedSet<String> flags, String version, long bandwidth, + String ports, byte[] rawDescriptor) { + if (this.importIntoDatabase) { + try { + this.addDateToScheduledUpdates(validAfter); + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + Timestamp validAfterTimestamp = new Timestamp(validAfter); + if (lastCheckedStatusEntries != validAfter) { + this.psSs.setTimestamp(1, validAfterTimestamp, cal); + ResultSet rs = psSs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + separateStatusEntryCheckNecessary = false; + insertedStatusEntries.clear(); + } else { + separateStatusEntryCheckNecessary = true; + } + rs.close(); + lastCheckedStatusEntries = validAfter; + } + boolean alreadyContained = false; + if (separateStatusEntryCheckNecessary || + insertedStatusEntries.contains(fingerprint)) { + this.psRs.setTimestamp(1, validAfterTimestamp, cal); + this.psRs.setString(2, fingerprint); + ResultSet rs = psRs.executeQuery(); + rs.next(); + if (rs.getInt(1) > 0) { + alreadyContained = true; + } + rs.close(); + } else { + insertedStatusEntries.add(fingerprint); + } + if (!alreadyContained) { + this.psR.clearParameters(); + this.psR.setTimestamp(1, validAfterTimestamp, cal); + this.psR.setString(2, nickname); + this.psR.setString(3, fingerprint); + this.psR.setString(4, descriptor); + this.psR.setTimestamp(5, new Timestamp(published), cal); + this.psR.setString(6, address); + this.psR.setLong(7, orPort); + this.psR.setLong(8, dirPort); + this.psR.setBoolean(9, flags.contains("Authority")); + this.psR.setBoolean(10, flags.contains("BadExit")); + this.psR.setBoolean(11, flags.contains("BadDirectory")); + this.psR.setBoolean(12, flags.contains("Exit")); + this.psR.setBoolean(13, flags.contains("Fast")); + this.psR.setBoolean(14, flags.contains("Guard")); + this.psR.setBoolean(15, flags.contains("HSDir")); + this.psR.setBoolean(16, flags.contains("Named")); + this.psR.setBoolean(17, flags.contains("Stable")); + this.psR.setBoolean(18, flags.contains("Running")); + this.psR.setBoolean(19, flags.contains("Unnamed")); + this.psR.setBoolean(20, flags.contains("Valid")); + this.psR.setBoolean(21, flags.contains("V2Dir")); + this.psR.setBoolean(22, flags.contains("V3Dir")); + this.psR.setString(23, version); + this.psR.setLong(24, bandwidth); + this.psR.setString(25, ports); + this.psR.setBytes(26, rawDescriptor); + this.psR.executeUpdate(); + rrsCount++; + if (rrsCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add network status " + + "consensus entry. We won't make any further SQL requests " + + "in this execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.statusentryOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.statusentryOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/statusentry.sql")); + this.statusentryOut.write(" COPY statusentry (validafter, " + + "nickname, fingerprint, descriptor, published, address, " + + "orport, dirport, isauthority, isbadExit, " + + "isbaddirectory, isexit, isfast, isguard, ishsdir, " + + "isnamed, isstable, isrunning, isunnamed, isvalid, " + + "isv2dir, isv3dir, version, bandwidth, ports, rawdesc) " + + "FROM stdin;\n"); + } + this.statusentryOut.write( + this.dateTimeFormat.format(validAfter) + "\t" + nickname + + "\t" + fingerprint.toLowerCase() + "\t" + + descriptor.toLowerCase() + "\t" + + this.dateTimeFormat.format(published) + "\t" + address + + "\t" + orPort + "\t" + dirPort + "\t" + + (flags.contains("Authority") ? "t" : "f") + "\t" + + (flags.contains("BadExit") ? "t" : "f") + "\t" + + (flags.contains("BadDirectory") ? "t" : "f") + "\t" + + (flags.contains("Exit") ? "t" : "f") + "\t" + + (flags.contains("Fast") ? "t" : "f") + "\t" + + (flags.contains("Guard") ? "t" : "f") + "\t" + + (flags.contains("HSDir") ? "t" : "f") + "\t" + + (flags.contains("Named") ? "t" : "f") + "\t" + + (flags.contains("Stable") ? "t" : "f") + "\t" + + (flags.contains("Running") ? "t" : "f") + "\t" + + (flags.contains("Unnamed") ? "t" : "f") + "\t" + + (flags.contains("Valid") ? "t" : "f") + "\t" + + (flags.contains("V2Dir") ? "t" : "f") + "\t" + + (flags.contains("V3Dir") ? "t" : "f") + "\t" + + (version != null ? version : "\N") + "\t" + + (bandwidth >= 0 ? bandwidth : "\N") + "\t" + + (ports != null ? ports : "\N") + "\t"); + this.statusentryOut.write(PGbytea.toPGString(rawDescriptor). + replaceAll("\\", "\\\\") + "\n"); + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not write network status " + + "consensus entry to raw database import file. We won't " + + "make any further attempts to write raw import files in " + + "this execution.", e); + this.writeRawImportFiles = false; + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write network status " + + "consensus entry to raw database import file. We won't " + + "make any further attempts to write raw import files in " + + "this execution.", e); + this.writeRawImportFiles = false; + } + } + } + + /** + * Insert server descriptor into database. + */ + public void addServerDescriptor(String descriptor, String nickname, + String address, int orPort, int dirPort, String relayIdentifier, + long bandwidthAvg, long bandwidthBurst, long bandwidthObserved, + String platform, long published, long uptime, + String extraInfoDigest) { + if (this.importIntoDatabase) { + try { + this.addDateToScheduledUpdates(published); + this.addDateToScheduledUpdates( + published + 24L * 60L * 60L * 1000L); + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + this.psDs.setString(1, descriptor); + ResultSet rs = psDs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + this.psD.clearParameters(); + this.psD.setString(1, descriptor); + this.psD.setString(2, nickname); + this.psD.setString(3, address); + this.psD.setInt(4, orPort); + this.psD.setInt(5, dirPort); + this.psD.setString(6, relayIdentifier); + this.psD.setLong(7, bandwidthAvg); + this.psD.setLong(8, bandwidthBurst); + this.psD.setLong(9, bandwidthObserved); + /* Remove all non-ASCII characters from the platform string, or + * we'll make Postgres unhappy. Sun's JDK and OpenJDK behave + * differently when creating a new String with a given encoding. + * That's what the regexp below is for. */ + this.psD.setString(10, new String(platform.getBytes(), + "US-ASCII").replaceAll("[^\p{ASCII}]","")); + this.psD.setTimestamp(11, new Timestamp(published), cal); + this.psD.setLong(12, uptime); + this.psD.setString(13, extraInfoDigest); + this.psD.executeUpdate(); + rdsCount++; + if (rdsCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } catch (UnsupportedEncodingException e) { + // US-ASCII is supported for sure + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add server " + + "descriptor. We won't make any further SQL requests in " + + "this execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.descriptorOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.descriptorOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/descriptor.sql")); + this.descriptorOut.write(" COPY descriptor (descriptor, " + + "nickname, address, orport, dirport, fingerprint, " + + "bandwidthavg, bandwidthburst, bandwidthobserved, " + + "platform, published, uptime, extrainfo) FROM stdin;\n"); + } + this.descriptorOut.write(descriptor.toLowerCase() + "\t" + + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort + + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t" + + bandwidthBurst + "\t" + bandwidthObserved + "\t" + + (platform != null && platform.length() > 0 + ? new String(platform.getBytes(), "US-ASCII") : "\N") + + "\t" + this.dateTimeFormat.format(published) + "\t" + + (uptime >= 0 ? uptime : "\N") + "\t" + + (extraInfoDigest != null ? extraInfoDigest : "\N") + + "\n"); + } catch (UnsupportedEncodingException e) { + // US-ASCII is supported for sure + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write server " + + "descriptor to raw database import file. We won't make " + + "any further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } + } + } + + /** + * Insert extra-info descriptor into database. + */ + public void addExtraInfoDescriptor(String extraInfoDigest, + String nickname, String fingerprint, long published, + List<String> bandwidthHistoryLines) { + if (!bandwidthHistoryLines.isEmpty()) { + this.addBandwidthHistory(fingerprint.toLowerCase(), published, + bandwidthHistoryLines); + } + } + + private static class BigIntArray implements java.sql.Array { + + private final String stringValue; + + public BigIntArray(long[] array, int offset) { + if (array == null) { + this.stringValue = "[-1:-1]={0}"; + } else { + StringBuilder sb = new StringBuilder("[" + offset + ":" + + (offset + array.length - 1) + "]={"); + for (int i = 0; i < array.length; i++) { + sb.append((i > 0 ? "," : "") + array[i]); + } + sb.append('}'); + this.stringValue = sb.toString(); + } + } + + public String toString() { + return stringValue; + } + + public String getBaseTypeName() { + return "int8"; + } + + /* The other methods are never called; no need to implement them. */ + public void free() { + throw new UnsupportedOperationException(); + } + public Object getArray() { + throw new UnsupportedOperationException(); + } + public Object getArray(long index, int count) { + throw new UnsupportedOperationException(); + } + public Object getArray(long index, int count, + Map<String, Class<?>> map) { + throw new UnsupportedOperationException(); + } + public Object getArray(Map<String, Class<?>> map) { + throw new UnsupportedOperationException(); + } + public int getBaseType() { + throw new UnsupportedOperationException(); + } + public ResultSet getResultSet() { + throw new UnsupportedOperationException(); + } + public ResultSet getResultSet(long index, int count) { + throw new UnsupportedOperationException(); + } + public ResultSet getResultSet(long index, int count, + Map<String, Class<?>> map) { + throw new UnsupportedOperationException(); + } + public ResultSet getResultSet(Map<String, Class<?>> map) { + throw new UnsupportedOperationException(); + } + } + + public void addBandwidthHistory(String fingerprint, long published, + List<String> bandwidthHistoryStrings) { + + /* Split history lines by date and rewrite them so that the date + * comes first. */ + SortedSet<String> historyLinesByDate = new TreeSet<String>(); + for (String bandwidthHistoryString : bandwidthHistoryStrings) { + String[] parts = bandwidthHistoryString.split(" "); + if (parts.length != 6) { + this.logger.finer("Bandwidth history line does not have expected " + + "number of elements. Ignoring this line."); + continue; + } + long intervalLength = 0L; + try { + intervalLength = Long.parseLong(parts[3].substring(1)); + } catch (NumberFormatException e) { + this.logger.fine("Bandwidth history line does not have valid " + + "interval length '" + parts[3] + " " + parts[4] + "'. " + + "Ignoring this line."); + continue; + } + if (intervalLength != 900L) { + this.logger.fine("Bandwidth history line does not consist of " + + "15-minute intervals. Ignoring this line."); + continue; + } + String type = parts[0]; + String intervalEndTime = parts[1] + " " + parts[2]; + long intervalEnd, dateStart; + try { + intervalEnd = dateTimeFormat.parse(intervalEndTime).getTime(); + dateStart = dateTimeFormat.parse(parts[1] + " 00:00:00"). + getTime(); + } catch (ParseException e) { + this.logger.fine("Parse exception while parsing timestamp in " + + "bandwidth history line. Ignoring this line."); + continue; + } + if (Math.abs(published - intervalEnd) > + 7L * 24L * 60L * 60L * 1000L) { + this.logger.fine("Extra-info descriptor publication time " + + dateTimeFormat.format(published) + " and last interval " + + "time " + intervalEndTime + " in " + type + " line differ " + + "by more than 7 days! Not adding this line!"); + continue; + } + long currentIntervalEnd = intervalEnd; + StringBuilder sb = new StringBuilder(); + String[] values = parts[5].split(","); + SortedSet<String> newHistoryLines = new TreeSet<String>(); + try { + for (int i = values.length - 1; i >= -1; i--) { + if (i == -1 || currentIntervalEnd < dateStart) { + sb.insert(0, intervalEndTime + " " + type + " (" + + intervalLength + " s) "); + sb.setLength(sb.length() - 1); + String historyLine = sb.toString(); + newHistoryLines.add(historyLine); + sb = new StringBuilder(); + dateStart -= 24L * 60L * 60L * 1000L; + intervalEndTime = dateTimeFormat.format(currentIntervalEnd); + } + if (i == -1) { + break; + } + Long.parseLong(values[i]); + sb.insert(0, values[i] + ","); + currentIntervalEnd -= intervalLength * 1000L; + } + } catch (NumberFormatException e) { + this.logger.fine("Number format exception while parsing " + + "bandwidth history line. Ignoring this line."); + continue; + } + historyLinesByDate.addAll(newHistoryLines); + } + + /* Add split history lines to database. */ + String lastDate = null; + historyLinesByDate.add("EOL"); + long[] readArray = null, writtenArray = null, dirreadArray = null, + dirwrittenArray = null; + int readOffset = 0, writtenOffset = 0, dirreadOffset = 0, + dirwrittenOffset = 0; + for (String historyLine : historyLinesByDate) { + String[] parts = historyLine.split(" "); + String currentDate = parts[0]; + if (lastDate != null && (historyLine.equals("EOL") || + !currentDate.equals(lastDate))) { + BigIntArray readIntArray = new BigIntArray(readArray, + readOffset); + BigIntArray writtenIntArray = new BigIntArray(writtenArray, + writtenOffset); + BigIntArray dirreadIntArray = new BigIntArray(dirreadArray, + dirreadOffset); + BigIntArray dirwrittenIntArray = new BigIntArray(dirwrittenArray, + dirwrittenOffset); + if (this.importIntoDatabase) { + try { + long dateMillis = dateTimeFormat.parse(lastDate + + " 00:00:00").getTime(); + this.addDateToScheduledUpdates(dateMillis); + this.csH.setString(1, fingerprint); + this.csH.setDate(2, new java.sql.Date(dateMillis)); + this.csH.setArray(3, readIntArray); + this.csH.setArray(4, writtenIntArray); + this.csH.setArray(5, dirreadIntArray); + this.csH.setArray(6, dirwrittenIntArray); + this.csH.addBatch(); + rhsCount++; + if (rhsCount % autoCommitCount == 0) { + this.csH.executeBatch(); + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not insert bandwidth " + + "history line into database. We won't make any " + + "further SQL requests in this execution.", e); + this.importIntoDatabase = false; + } catch (ParseException e) { + this.logger.log(Level.WARNING, "Could not insert bandwidth " + + "history line into database. We won't make any " + + "further SQL requests in this execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.bwhistOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.bwhistOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/bwhist.sql")); + } + this.bwhistOut.write("SELECT insert_bwhist('" + fingerprint + + "','" + lastDate + "','" + readIntArray.toString() + + "','" + writtenIntArray.toString() + "','" + + dirreadIntArray.toString() + "','" + + dirwrittenIntArray.toString() + "');\n"); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write bandwidth " + + "history to raw database import file. We won't make " + + "any further attempts to write raw import files in " + + "this execution.", e); + this.writeRawImportFiles = false; + } + } + readArray = writtenArray = dirreadArray = dirwrittenArray = null; + } + if (historyLine.equals("EOL")) { + break; + } + long lastIntervalTime; + try { + lastIntervalTime = dateTimeFormat.parse(parts[0] + " " + + parts[1]).getTime() - dateTimeFormat.parse(parts[0] + + " 00:00:00").getTime(); + } catch (ParseException e) { + continue; + } + String[] stringValues = parts[5].split(","); + long[] longValues = new long[stringValues.length]; + for (int i = 0; i < longValues.length; i++) { + longValues[i] = Long.parseLong(stringValues[i]); + } + + int offset = (int) (lastIntervalTime / (15L * 60L * 1000L)) + - longValues.length + 1; + String type = parts[2]; + if (type.equals("read-history")) { + readArray = longValues; + readOffset = offset; + } else if (type.equals("write-history")) { + writtenArray = longValues; + writtenOffset = offset; + } else if (type.equals("dirreq-read-history")) { + dirreadArray = longValues; + dirreadOffset = offset; + } else if (type.equals("dirreq-write-history")) { + dirwrittenArray = longValues; + dirwrittenOffset = offset; + } + lastDate = currentDate; + } + } + + /** + * Insert network status consensus into database. + */ + public void addConsensus(long validAfter) { + if (this.importIntoDatabase) { + try { + this.addDateToScheduledUpdates(validAfter); + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + Timestamp validAfterTimestamp = new Timestamp(validAfter); + this.psCs.setTimestamp(1, validAfterTimestamp, cal); + ResultSet rs = psCs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + this.psC.clearParameters(); + this.psC.setTimestamp(1, validAfterTimestamp, cal); + this.psC.executeUpdate(); + rcsCount++; + if (rcsCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add network status " + + "consensus. We won't make any further SQL requests in " + + "this execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.consensusOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.consensusOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/consensus.sql")); + this.consensusOut.write(" COPY consensus (validafter) " + + "FROM stdin;\n"); + } + String validAfterString = this.dateTimeFormat.format(validAfter); + this.consensusOut.write(validAfterString + "\n"); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write network status " + + "consensus to raw database import file. We won't make " + + "any further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } + } + } + + /** + * Adds observations on the number of directory requests by country as + * seen on a directory at a given date to the database. + */ + public void addDirReqStats(String source, long statsEndMillis, + long seconds, Map<String, String> dirReqsPerCountry) { + String statsEnd = this.dateTimeFormat.format(statsEndMillis); + if (this.importIntoDatabase) { + try { + this.addDateToScheduledUpdates(statsEndMillis); + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + Timestamp statsEndTimestamp = new Timestamp(statsEndMillis); + this.psQs.setString(1, source); + this.psQs.setTimestamp(2, statsEndTimestamp, cal); + ResultSet rs = psQs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + for (Map.Entry<String, String> e : + dirReqsPerCountry.entrySet()) { + this.psQ.clearParameters(); + this.psQ.setString(1, source); + this.psQ.setTimestamp(2, statsEndTimestamp, cal); + this.psQ.setLong(3, seconds); + this.psQ.setString(4, e.getKey()); + this.psQ.setLong(5, Long.parseLong(e.getValue())); + this.psQ.executeUpdate(); + rqsCount++; + if (rqsCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add dirreq stats. We " + + "won't make any further SQL requests in this execution.", + e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.dirReqOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.dirReqOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/dirreq_stats.sql")); + this.dirReqOut.write(" COPY dirreq_stats (source, statsend, " + + "seconds, country, requests) FROM stdin;\n"); + } + for (Map.Entry<String, String> e : + dirReqsPerCountry.entrySet()) { + this.dirReqOut.write(source + "\t" + statsEnd + "\t" + seconds + + "\t" + e.getKey() + "\t" + e.getValue() + "\n"); + } + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write dirreq stats to " + + "raw database import file. We won't make any further " + + "attempts to write raw import files in this execution.", e); + this.writeRawImportFiles = false; + } + } + } + + public void importRelayDescriptors() { + if (archivesDirectory.exists()) { + logger.fine("Importing files in directory " + archivesDirectory + + "/..."); + DescriptorReader reader = + DescriptorSourceFactory.createDescriptorReader(); + reader.addDirectory(archivesDirectory); + if (keepImportHistory) { + reader.setExcludeFiles(new File(statsDirectory, + "database-importer-relay-descriptor-history")); + } + Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors(); + while (descriptorFiles.hasNext()) { + DescriptorFile descriptorFile = descriptorFiles.next(); + if (descriptorFile.getDescriptors() != null) { + for (Descriptor descriptor : descriptorFile.getDescriptors()) { + if (descriptor instanceof RelayNetworkStatusConsensus) { + this.addRelayNetworkStatusConsensus( + (RelayNetworkStatusConsensus) descriptor); + } else if (descriptor instanceof ServerDescriptor) { + this.addServerDescriptor((ServerDescriptor) descriptor); + } else if (descriptor instanceof ExtraInfoDescriptor) { + this.addExtraInfoDescriptor( + (ExtraInfoDescriptor) descriptor); + } + } + } + } + } + + logger.info("Finished importing relay descriptors."); + } + + private void addRelayNetworkStatusConsensus( + RelayNetworkStatusConsensus consensus) { + for (NetworkStatusEntry statusEntry : + consensus.getStatusEntries().values()) { + this.addStatusEntry(consensus.getValidAfterMillis(), + statusEntry.getNickname(), + statusEntry.getFingerprint().toLowerCase(), + statusEntry.getDescriptor().toLowerCase(), + statusEntry.getPublishedMillis(), statusEntry.getAddress(), + statusEntry.getOrPort(), statusEntry.getDirPort(), + statusEntry.getFlags(), statusEntry.getVersion(), + statusEntry.getBandwidth(), statusEntry.getPortList(), + statusEntry.getStatusEntryBytes()); + } + this.addConsensus(consensus.getValidAfterMillis()); + } + + private void addServerDescriptor(ServerDescriptor descriptor) { + this.addServerDescriptor(descriptor.getServerDescriptorDigest(), + descriptor.getNickname(), descriptor.getAddress(), + descriptor.getOrPort(), descriptor.getDirPort(), + descriptor.getFingerprint(), descriptor.getBandwidthRate(), + descriptor.getBandwidthBurst(), descriptor.getBandwidthObserved(), + descriptor.getPlatform(), descriptor.getPublishedMillis(), + descriptor.getUptime(), descriptor.getExtraInfoDigest()); + } + + private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) { + if (descriptor.getDirreqV3Reqs() != null) { + int allUsers = 0; + Map<String, String> obs = new HashMap<String, String>(); + for (Map.Entry<String, Integer> e : + descriptor.getDirreqV3Reqs().entrySet()) { + String country = e.getKey(); + int users = e.getValue() - 4; + allUsers += users; + obs.put(country, "" + users); + } + obs.put("zy", "" + allUsers); + this.addDirReqStats(descriptor.getFingerprint(), + descriptor.getDirreqStatsEndMillis(), + descriptor.getDirreqStatsIntervalLength(), obs); + } + List<String> bandwidthHistoryLines = new ArrayList<String>(); + if (descriptor.getWriteHistory() != null) { + bandwidthHistoryLines.add(descriptor.getWriteHistory().getLine()); + } + if (descriptor.getReadHistory() != null) { + bandwidthHistoryLines.add(descriptor.getReadHistory().getLine()); + } + if (descriptor.getDirreqWriteHistory() != null) { + bandwidthHistoryLines.add( + descriptor.getDirreqWriteHistory().getLine()); + } + if (descriptor.getDirreqReadHistory() != null) { + bandwidthHistoryLines.add( + descriptor.getDirreqReadHistory().getLine()); + } + this.addExtraInfoDescriptor(descriptor.getExtraInfoDigest(), + descriptor.getNickname(), + descriptor.getFingerprint().toLowerCase(), + descriptor.getPublishedMillis(), bandwidthHistoryLines); + } + + /** + * Close the relay descriptor database connection. + */ + public void closeConnection() { + + /* Log stats about imported descriptors. */ + this.logger.info(String.format("Finished importing relay " + + "descriptors: %d consensuses, %d network status entries, %d " + + "votes, %d server descriptors, %d extra-info descriptors, %d " + + "bandwidth history elements, and %d dirreq stats elements", + rcsCount, rrsCount, rvsCount, rdsCount, resCount, rhsCount, + rqsCount)); + + /* Insert scheduled updates a second time, just in case the refresh + * run has started since inserting them the first time in which case + * it will miss the data inserted afterwards. We cannot, however, + * insert them only now, because if a Java execution fails at a random + * point, we might have added data, but not the corresponding dates to + * update statistics. */ + if (this.importIntoDatabase) { + try { + for (long dateMillis : this.scheduledUpdates) { + this.psU.setDate(1, new java.sql.Date(dateMillis)); + this.psU.execute(); + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add scheduled dates " + + "for the next refresh run.", e); + } + } + + /* Commit any stragglers before closing. */ + if (this.conn != null) { + try { + this.csH.executeBatch(); + + this.conn.commit(); + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not commit final records to " + + "database", e); + } + try { + this.conn.close(); + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not close database " + + "connection.", e); + } + } + + /* Close raw import files. */ + try { + if (this.statusentryOut != null) { + this.statusentryOut.write("\.\n"); + this.statusentryOut.close(); + } + if (this.descriptorOut != null) { + this.descriptorOut.write("\.\n"); + this.descriptorOut.close(); + } + if (this.bwhistOut != null) { + this.bwhistOut.write("\.\n"); + this.bwhistOut.close(); + } + if (this.consensusOut != null) { + this.consensusOut.write("\.\n"); + this.consensusOut.close(); + } + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not close one or more raw " + + "database import files.", e); + } + } +} + diff --git a/modules/legacy/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java b/modules/legacy/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java new file mode 100644 index 0000000..d5cae37 --- /dev/null +++ b/modules/legacy/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java @@ -0,0 +1,380 @@ +/* Copyright 2011, 2012 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron.network; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.torproject.descriptor.BridgeNetworkStatus; +import org.torproject.descriptor.Descriptor; +import org.torproject.descriptor.DescriptorFile; +import org.torproject.descriptor.DescriptorReader; +import org.torproject.descriptor.DescriptorSourceFactory; +import org.torproject.descriptor.NetworkStatusEntry; + +/** + * Generates statistics on the average number of relays and bridges per + * day. Accepts parse results from <code>RelayDescriptorParser</code> and + * <code>BridgeDescriptorParser</code> and stores them in intermediate + * result files <code>stats/consensus-stats-raw</code> and + * <code>stats/bridge-consensus-stats-raw</code>. Writes final results to + * <code>stats/consensus-stats</code> for all days for which at least half + * of the expected consensuses or statuses are known. + */ +public class ConsensusStatsFileHandler { + + /** + * Intermediate results file holding the number of running bridges per + * bridge status. + */ + private File bridgeConsensusStatsRawFile; + + /** + * Number of running bridges in a given bridge status. Map keys are + * bridge status times formatted as "yyyy-MM-dd HH:mm:ss", map values + * are lines as read from <code>stats/bridge-consensus-stats-raw</code>. + */ + private SortedMap<String, String> bridgesRaw; + + /** + * Average number of running bridges per day. Map keys are dates + * formatted as "yyyy-MM-dd", map values are the last column as written + * to <code>stats/consensus-stats</code>. + */ + private SortedMap<String, String> bridgesPerDay; + + /** + * Logger for this class. + */ + private Logger logger; + + private int bridgeResultsAdded = 0; + + /* Database connection string. */ + private String connectionURL = null; + + private SimpleDateFormat dateTimeFormat; + + private File bridgesDir; + + private File statsDirectory; + + private boolean keepImportHistory; + + /** + * Initializes this class, including reading in intermediate results + * files <code>stats/consensus-stats-raw</code> and + * <code>stats/bridge-consensus-stats-raw</code> and final results file + * <code>stats/consensus-stats</code>. + */ + public ConsensusStatsFileHandler(String connectionURL, + File bridgesDir, File statsDirectory, + boolean keepImportHistory) { + + if (bridgesDir == null || statsDirectory == null) { + throw new IllegalArgumentException(); + } + this.bridgesDir = bridgesDir; + this.statsDirectory = statsDirectory; + this.keepImportHistory = keepImportHistory; + + /* Initialize local data structures to hold intermediate and final + * results. */ + this.bridgesPerDay = new TreeMap<String, String>(); + this.bridgesRaw = new TreeMap<String, String>(); + + /* Initialize file names for intermediate and final results files. */ + this.bridgeConsensusStatsRawFile = new File( + "stats/bridge-consensus-stats-raw"); + + /* Initialize database connection string. */ + this.connectionURL = connectionURL; + + this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + + /* Initialize logger. */ + this.logger = Logger.getLogger( + ConsensusStatsFileHandler.class.getName()); + + /* Read in number of running bridges per bridge status. */ + if (this.bridgeConsensusStatsRawFile.exists()) { + try { + this.logger.fine("Reading file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "..."); + BufferedReader br = new BufferedReader(new FileReader( + this.bridgeConsensusStatsRawFile)); + String line = null; + while ((line = br.readLine()) != null) { + if (line.startsWith("date")) { + /* Skip headers. */ + continue; + } + String[] parts = line.split(","); + String dateTime = parts[0]; + if (parts.length == 2) { + this.bridgesRaw.put(dateTime, line + ",0"); + } else if (parts.length == 3) { + this.bridgesRaw.put(dateTime, line); + } else { + this.logger.warning("Corrupt line '" + line + "' in file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + + "! Aborting to read this file!"); + break; + } + } + br.close(); + this.logger.fine("Finished reading file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "."); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Failed to read file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "!", + e); + } + } + } + + /** + * Adds the intermediate results of the number of running bridges in a + * given bridge status to the existing observations. + */ + public void addBridgeConsensusResults(long publishedMillis, int running, + int runningEc2Bridges) { + String published = dateTimeFormat.format(publishedMillis); + String line = published + "," + running + "," + runningEc2Bridges; + if (!this.bridgesRaw.containsKey(published)) { + this.logger.finer("Adding new bridge numbers: " + line); + this.bridgesRaw.put(published, line); + this.bridgeResultsAdded++; + } else if (!line.equals(this.bridgesRaw.get(published))) { + this.logger.warning("The numbers of running bridges we were just " + + "given (" + line + ") are different from what we learned " + + "before (" + this.bridgesRaw.get(published) + ")! " + + "Overwriting!"); + this.bridgesRaw.put(published, line); + } + } + + public void importSanitizedBridges() { + if (bridgesDir.exists()) { + logger.fine("Importing files in directory " + bridgesDir + "/..."); + DescriptorReader reader = + DescriptorSourceFactory.createDescriptorReader(); + reader.addDirectory(bridgesDir); + if (keepImportHistory) { + reader.setExcludeFiles(new File(statsDirectory, + "consensus-stats-bridge-descriptor-history")); + } + Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors(); + while (descriptorFiles.hasNext()) { + DescriptorFile descriptorFile = descriptorFiles.next(); + if (descriptorFile.getDescriptors() != null) { + for (Descriptor descriptor : descriptorFile.getDescriptors()) { + if (descriptor instanceof BridgeNetworkStatus) { + this.addBridgeNetworkStatus( + (BridgeNetworkStatus) descriptor); + } + } + } + } + logger.info("Finished importing bridge descriptors."); + } + } + + private void addBridgeNetworkStatus(BridgeNetworkStatus status) { + int runningBridges = 0, runningEc2Bridges = 0; + for (NetworkStatusEntry statusEntry : + status.getStatusEntries().values()) { + if (statusEntry.getFlags().contains("Running")) { + runningBridges++; + if (statusEntry.getNickname().startsWith("ec2bridge")) { + runningEc2Bridges++; + } + } + } + this.addBridgeConsensusResults(status.getPublishedMillis(), + runningBridges, runningEc2Bridges); + } + + /** + * Aggregates the raw observations on relay and bridge numbers and + * writes both raw and aggregate observations to disk. + */ + public void writeFiles() { + + /* Go through raw observations of numbers of running bridges in bridge + * statuses, calculate averages per day, and add these averages to + * final results. */ + if (!this.bridgesRaw.isEmpty()) { + String tempDate = null; + int brunning = 0, brunningEc2 = 0, statuses = 0; + Iterator<String> it = this.bridgesRaw.values().iterator(); + boolean haveWrittenFinalLine = false; + while (it.hasNext() || !haveWrittenFinalLine) { + String next = it.hasNext() ? it.next() : null; + /* Finished reading a day or even all lines? */ + if (tempDate != null && (next == null + || !next.substring(0, 10).equals(tempDate))) { + /* Only write results if we have seen at least half of all + * statuses. */ + if (statuses >= 24) { + String line = "," + (brunning / statuses) + "," + + (brunningEc2 / statuses); + /* Are our results new? */ + if (!this.bridgesPerDay.containsKey(tempDate)) { + this.logger.finer("Adding new average bridge numbers: " + + tempDate + line); + this.bridgesPerDay.put(tempDate, line); + } else if (!line.equals(this.bridgesPerDay.get(tempDate))) { + this.logger.finer("Replacing existing average bridge " + + "numbers (" + this.bridgesPerDay.get(tempDate) + + " with new numbers: " + line); + this.bridgesPerDay.put(tempDate, line); + } + } + brunning = brunningEc2 = statuses = 0; + haveWrittenFinalLine = (next == null); + } + /* Sum up number of running bridges. */ + if (next != null) { + tempDate = next.substring(0, 10); + statuses++; + String[] parts = next.split(","); + brunning += Integer.parseInt(parts[1]); + brunningEc2 += Integer.parseInt(parts[2]); + } + } + } + + /* Write raw numbers of running bridges to disk. */ + try { + this.logger.fine("Writing file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "..."); + this.bridgeConsensusStatsRawFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter( + new FileWriter(this.bridgeConsensusStatsRawFile)); + bw.append("datetime,brunning,brunningec2\n"); + for (String line : this.bridgesRaw.values()) { + bw.append(line + "\n"); + } + bw.close(); + this.logger.fine("Finished writing file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "."); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Failed to write file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "!", + e); + } + + /* Add average number of bridges per day to the database. */ + if (connectionURL != null) { + try { + Map<String, String> insertRows = new HashMap<String, String>(), + updateRows = new HashMap<String, String>(); + insertRows.putAll(this.bridgesPerDay); + Connection conn = DriverManager.getConnection(connectionURL); + conn.setAutoCommit(false); + Statement statement = conn.createStatement(); + ResultSet rs = statement.executeQuery( + "SELECT date, avg_running, avg_running_ec2 " + + "FROM bridge_network_size"); + while (rs.next()) { + String date = rs.getDate(1).toString(); + if (insertRows.containsKey(date)) { + String insertRow = insertRows.remove(date); + String[] parts = insertRow.substring(1).split(","); + long newAvgRunning = Long.parseLong(parts[0]); + long newAvgRunningEc2 = Long.parseLong(parts[1]); + long oldAvgRunning = rs.getLong(2); + long oldAvgRunningEc2 = rs.getLong(3); + if (newAvgRunning != oldAvgRunning || + newAvgRunningEc2 != oldAvgRunningEc2) { + updateRows.put(date, insertRow); + } + } + } + rs.close(); + PreparedStatement psU = conn.prepareStatement( + "UPDATE bridge_network_size SET avg_running = ?, " + + "avg_running_ec2 = ? WHERE date = ?"); + for (Map.Entry<String, String> e : updateRows.entrySet()) { + java.sql.Date date = java.sql.Date.valueOf(e.getKey()); + String[] parts = e.getValue().substring(1).split(","); + long avgRunning = Long.parseLong(parts[0]); + long avgRunningEc2 = Long.parseLong(parts[1]); + psU.clearParameters(); + psU.setLong(1, avgRunning); + psU.setLong(2, avgRunningEc2); + psU.setDate(3, date); + psU.executeUpdate(); + } + PreparedStatement psI = conn.prepareStatement( + "INSERT INTO bridge_network_size (avg_running, " + + "avg_running_ec2, date) VALUES (?, ?, ?)"); + for (Map.Entry<String, String> e : insertRows.entrySet()) { + java.sql.Date date = java.sql.Date.valueOf(e.getKey()); + String[] parts = e.getValue().substring(1).split(","); + long avgRunning = Long.parseLong(parts[0]); + long avgRunningEc2 = Long.parseLong(parts[1]); + psI.clearParameters(); + psI.setLong(1, avgRunning); + psI.setLong(2, avgRunningEc2); + psI.setDate(3, date); + psI.executeUpdate(); + } + conn.commit(); + conn.close(); + } catch (SQLException e) { + logger.log(Level.WARNING, "Failed to add average bridge numbers " + + "to database.", e); + } + } + + /* Write stats. */ + StringBuilder dumpStats = new StringBuilder("Finished writing " + + "statistics on bridge network statuses to disk.\nAdded " + + this.bridgeResultsAdded + " bridge network status(es) in this " + + "execution."); + long now = System.currentTimeMillis(); + SimpleDateFormat dateTimeFormat = + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + if (this.bridgesRaw.isEmpty()) { + dumpStats.append("\nNo bridge status known yet."); + } else { + dumpStats.append("\nLast known bridge status was published " + + this.bridgesRaw.lastKey() + "."); + try { + if (now - 6L * 60L * 60L * 1000L > dateTimeFormat.parse( + this.bridgesRaw.lastKey()).getTime()) { + logger.warning("Last known bridge status is more than 6 hours " + + "old: " + this.bridgesRaw.lastKey()); + } + } catch (ParseException e) { + /* Can't parse the timestamp? Whatever. */ + } + } + logger.info(dumpStats.toString()); + } +} + diff --git a/modules/legacy/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java b/modules/legacy/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java new file mode 100644 index 0000000..815b37f --- /dev/null +++ b/modules/legacy/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java @@ -0,0 +1,271 @@ +/* Copyright 2012 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron.performance; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Iterator; +import java.util.TimeZone; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.torproject.descriptor.Descriptor; +import org.torproject.descriptor.DescriptorFile; +import org.torproject.descriptor.DescriptorReader; +import org.torproject.descriptor.DescriptorSourceFactory; +import org.torproject.descriptor.ExtraInfoDescriptor; + +public class PerformanceStatsImporter { + + /** + * How many records to commit with each database transaction. + */ + private final long autoCommitCount = 500; + + /** + * Keep track of the number of records committed before each transaction + */ + private int rbsCount = 0; + + /** + * Relay descriptor database connection. + */ + private Connection conn; + + /** + * Prepared statement to check whether a given conn-bi-direct stats + * string has been imported into the database before. + */ + private PreparedStatement psBs; + + /** + * Prepared statement to insert a conn-bi-direct stats string into the + * database. + */ + private PreparedStatement psB; + + /** + * Logger for this class. + */ + private Logger logger; + + /** + * Directory for writing raw import files. + */ + private String rawFilesDirectory; + + /** + * Raw import file containing conn-bi-direct stats strings. + */ + private BufferedWriter connBiDirectOut; + + /** + * Date format to parse timestamps. + */ + private SimpleDateFormat dateTimeFormat; + + private boolean importIntoDatabase; + private boolean writeRawImportFiles; + + private File archivesDirectory; + private File statsDirectory; + private boolean keepImportHistory; + + /** + * Initialize database importer by connecting to the database and + * preparing statements. + */ + public PerformanceStatsImporter(String connectionURL, + String rawFilesDirectory, File archivesDirectory, + File statsDirectory, boolean keepImportHistory) { + + if (archivesDirectory == null || + statsDirectory == null) { + throw new IllegalArgumentException(); + } + this.archivesDirectory = archivesDirectory; + this.statsDirectory = statsDirectory; + this.keepImportHistory = keepImportHistory; + + /* Initialize logger. */ + this.logger = Logger.getLogger( + PerformanceStatsImporter.class.getName()); + + if (connectionURL != null) { + try { + /* Connect to database. */ + this.conn = DriverManager.getConnection(connectionURL); + + /* Turn autocommit off */ + this.conn.setAutoCommit(false); + + /* Prepare statements. */ + this.psBs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM connbidirect WHERE source = ? AND statsend = ?"); + this.psB = conn.prepareStatement("INSERT INTO connbidirect " + + "(source, statsend, seconds, belownum, readnum, writenum, " + + "bothnum) VALUES (?, ?, ?, ?, ?, ?, ?)"); + this.importIntoDatabase = true; + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not connect to database or " + + "prepare statements.", e); + } + } + + /* Remember where we want to write raw import files. */ + if (rawFilesDirectory != null) { + this.rawFilesDirectory = rawFilesDirectory; + this.writeRawImportFiles = true; + } + + /* Initialize date format, so that we can format timestamps. */ + this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + /** + * Insert a conn-bi-direct stats string into the database. + */ + private void addConnBiDirect(String source, long statsEndMillis, + long seconds, long below, long read, long write, long both) { + String statsEnd = this.dateTimeFormat.format(statsEndMillis); + if (this.importIntoDatabase) { + try { + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + Timestamp statsEndTimestamp = new Timestamp(statsEndMillis); + this.psBs.setString(1, source); + this.psBs.setTimestamp(2, statsEndTimestamp, cal); + ResultSet rs = psBs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + this.psB.clearParameters(); + this.psB.setString(1, source); + this.psB.setTimestamp(2, statsEndTimestamp, cal); + this.psB.setLong(3, seconds); + this.psB.setLong(4, below); + this.psB.setLong(5, read); + this.psB.setLong(6, write); + this.psB.setLong(7, both); + this.psB.executeUpdate(); + rbsCount++; + if (rbsCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add conn-bi-direct " + + "stats string. We won't make any further SQL requests in " + + "this execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.connBiDirectOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.connBiDirectOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/connbidirect.sql")); + this.connBiDirectOut.write(" COPY connbidirect (source, " + + "statsend, seconds, belownum, readnum, writenum, " + + "bothnum) FROM stdin;\n"); + } + this.connBiDirectOut.write(source + "\t" + statsEnd + "\t" + + seconds + "\t" + below + "\t" + read + "\t" + write + "\t" + + both + "\n"); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write conn-bi-direct " + + "stats string to raw database import file. We won't make " + + "any further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } + } + } + + public void importRelayDescriptors() { + if (archivesDirectory.exists()) { + logger.fine("Importing files in directory " + archivesDirectory + + "/..."); + DescriptorReader reader = + DescriptorSourceFactory.createDescriptorReader(); + reader.addDirectory(archivesDirectory); + if (keepImportHistory) { + reader.setExcludeFiles(new File(statsDirectory, + "performance-stats-relay-descriptor-history")); + } + Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors(); + while (descriptorFiles.hasNext()) { + DescriptorFile descriptorFile = descriptorFiles.next(); + if (descriptorFile.getDescriptors() != null) { + for (Descriptor descriptor : descriptorFile.getDescriptors()) { + if (descriptor instanceof ExtraInfoDescriptor) { + this.addExtraInfoDescriptor( + (ExtraInfoDescriptor) descriptor); + } + } + } + } + } + + logger.info("Finished importing relay descriptors."); + } + + private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) { + if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) { + this.addConnBiDirect(descriptor.getFingerprint(), + descriptor.getConnBiDirectStatsEndMillis(), + descriptor.getConnBiDirectStatsIntervalLength(), + descriptor.getConnBiDirectBelow(), + descriptor.getConnBiDirectRead(), + descriptor.getConnBiDirectWrite(), + descriptor.getConnBiDirectBoth()); + } + } + + /** + * Close the relay descriptor database connection. + */ + public void closeConnection() { + + /* Log stats about imported descriptors. */ + this.logger.info(String.format("Finished importing relay " + + "descriptors: %d conn-bi-direct stats lines", rbsCount)); + + /* Commit any stragglers before closing. */ + if (this.conn != null) { + try { + this.conn.commit(); + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not commit final records " + + "to database", e); + } + try { + this.conn.close(); + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not close database " + + "connection.", e); + } + } + + /* Close raw import files. */ + try { + if (this.connBiDirectOut != null) { + this.connBiDirectOut.write("\.\n"); + this.connBiDirectOut.close(); + } + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not close one or more raw " + + "database import files.", e); + } + } +} diff --git a/modules/legacy/src/org/torproject/ernie/cron/performance/TorperfProcessor.java b/modules/legacy/src/org/torproject/ernie/cron/performance/TorperfProcessor.java new file mode 100644 index 0000000..d7322db --- /dev/null +++ b/modules/legacy/src/org/torproject/ernie/cron/performance/TorperfProcessor.java @@ -0,0 +1,374 @@ +/* Copyright 2011, 2012 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron.performance; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TimeZone; +import java.util.TreeMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.torproject.descriptor.Descriptor; +import org.torproject.descriptor.DescriptorFile; +import org.torproject.descriptor.DescriptorReader; +import org.torproject.descriptor.DescriptorSourceFactory; +import org.torproject.descriptor.TorperfResult; + +public class TorperfProcessor { + public TorperfProcessor(File torperfDirectory, File statsDirectory, + String connectionURL) { + + if (torperfDirectory == null || statsDirectory == null) { + throw new IllegalArgumentException(); + } + + Logger logger = Logger.getLogger(TorperfProcessor.class.getName()); + File rawFile = new File(statsDirectory, "torperf-raw"); + File statsFile = new File(statsDirectory, "torperf-stats"); + SortedMap<String, String> rawObs = new TreeMap<String, String>(); + SortedMap<String, String> stats = new TreeMap<String, String>(); + int addedRawObs = 0; + SimpleDateFormat formatter = + new SimpleDateFormat("yyyy-MM-dd,HH:mm:ss"); + formatter.setTimeZone(TimeZone.getTimeZone("UTC")); + try { + if (rawFile.exists()) { + logger.fine("Reading file " + rawFile.getAbsolutePath() + "..."); + BufferedReader br = new BufferedReader(new FileReader(rawFile)); + String line = br.readLine(); // ignore header + while ((line = br.readLine()) != null) { + if (line.split(",").length != 4) { + logger.warning("Corrupt line in " + rawFile.getAbsolutePath() + + "!"); + break; + } + String key = line.substring(0, line.lastIndexOf(",")); + rawObs.put(key, line); + } + br.close(); + logger.fine("Finished reading file " + rawFile.getAbsolutePath() + + "."); + } + if (statsFile.exists()) { + logger.fine("Reading file " + statsFile.getAbsolutePath() + + "..."); + BufferedReader br = new BufferedReader(new FileReader(statsFile)); + String line = br.readLine(); // ignore header + while ((line = br.readLine()) != null) { + String key = line.split(",")[0] + "," + line.split(",")[1]; + stats.put(key, line); + } + br.close(); + logger.fine("Finished reading file " + statsFile.getAbsolutePath() + + "."); + } + if (torperfDirectory.exists()) { + logger.fine("Importing files in " + torperfDirectory + "/..."); + DescriptorReader descriptorReader = + DescriptorSourceFactory.createDescriptorReader(); + descriptorReader.addDirectory(torperfDirectory); + descriptorReader.setExcludeFiles(new File(statsDirectory, + "torperf-history")); + Iterator<DescriptorFile> descriptorFiles = + descriptorReader.readDescriptors(); + while (descriptorFiles.hasNext()) { + DescriptorFile descriptorFile = descriptorFiles.next(); + if (descriptorFile.getException() != null) { + logger.log(Level.FINE, "Error parsing file.", + descriptorFile.getException()); + continue; + } + for (Descriptor descriptor : descriptorFile.getDescriptors()) { + if (!(descriptor instanceof TorperfResult)) { + continue; + } + TorperfResult result = (TorperfResult) descriptor; + String source = result.getSource(); + long fileSize = result.getFileSize(); + if (fileSize == 51200) { + source += "-50kb"; + } else if (fileSize == 1048576) { + source += "-1mb"; + } else if (fileSize == 5242880) { + source += "-5mb"; + } else { + logger.fine("Unexpected file size '" + fileSize + + "'. Skipping."); + continue; + } + String dateTime = formatter.format(result.getStartMillis()); + long completeMillis = result.getDataCompleteMillis() + - result.getStartMillis(); + String key = source + "," + dateTime; + String value = key; + if ((result.didTimeout() == null && + result.getDataCompleteMillis() < 1) || + (result.didTimeout() != null && result.didTimeout())) { + value += ",-2"; // -2 for timeout + } else if (result.getReadBytes() < fileSize) { + value += ",-1"; // -1 for failure + } else { + value += "," + completeMillis; + } + if (!rawObs.containsKey(key)) { + rawObs.put(key, value); + addedRawObs++; + } + } + } + logger.fine("Finished importing files in " + torperfDirectory + + "/."); + } + if (rawObs.size() > 0) { + logger.fine("Writing file " + rawFile.getAbsolutePath() + "..."); + rawFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter(new FileWriter(rawFile)); + bw.append("source,date,start,completemillis\n"); + String tempSourceDate = null; + Iterator<Map.Entry<String, String>> it = + rawObs.entrySet().iterator(); + List<Long> dlTimes = new ArrayList<Long>(); + boolean haveWrittenFinalLine = false; + SortedMap<String, List<Long>> dlTimesAllSources = + new TreeMap<String, List<Long>>(); + SortedMap<String, long[]> statusesAllSources = + new TreeMap<String, long[]>(); + long failures = 0, timeouts = 0, requests = 0; + while (it.hasNext() || !haveWrittenFinalLine) { + Map.Entry<String, String> next = it.hasNext() ? it.next() : null; + if (tempSourceDate != null + && (next == null || !(next.getValue().split(",")[0] + "," + + next.getValue().split(",")[1]).equals(tempSourceDate))) { + if (dlTimes.size() > 4) { + Collections.sort(dlTimes); + long q1 = dlTimes.get(dlTimes.size() / 4 - 1); + long md = dlTimes.get(dlTimes.size() / 2 - 1); + long q3 = dlTimes.get(dlTimes.size() * 3 / 4 - 1); + stats.put(tempSourceDate, tempSourceDate + "," + q1 + "," + + md + "," + q3 + "," + timeouts + "," + failures + "," + + requests); + String allSourceDate = "all" + tempSourceDate.substring( + tempSourceDate.indexOf("-")); + if (dlTimesAllSources.containsKey(allSourceDate)) { + dlTimesAllSources.get(allSourceDate).addAll(dlTimes); + } else { + dlTimesAllSources.put(allSourceDate, dlTimes); + } + if (statusesAllSources.containsKey(allSourceDate)) { + long[] status = statusesAllSources.get(allSourceDate); + status[0] += timeouts; + status[1] += failures; + status[2] += requests; + } else { + long[] status = new long[3]; + status[0] = timeouts; + status[1] = failures; + status[2] = requests; + statusesAllSources.put(allSourceDate, status); + } + } + dlTimes = new ArrayList<Long>(); + failures = timeouts = requests = 0; + if (next == null) { + haveWrittenFinalLine = true; + } + } + if (next != null) { + bw.append(next.getValue() + "\n"); + String[] parts = next.getValue().split(","); + tempSourceDate = parts[0] + "," + parts[1]; + long completeMillis = Long.parseLong(parts[3]); + if (completeMillis == -2L) { + timeouts++; + } else if (completeMillis == -1L) { + failures++; + } else { + dlTimes.add(Long.parseLong(parts[3])); + } + requests++; + } + } + bw.close(); + for (Map.Entry<String, List<Long>> e : + dlTimesAllSources.entrySet()) { + String allSourceDate = e.getKey(); + dlTimes = e.getValue(); + Collections.sort(dlTimes); + long q1 = dlTimes.get(dlTimes.size() / 4 - 1); + long md = dlTimes.get(dlTimes.size() / 2 - 1); + long q3 = dlTimes.get(dlTimes.size() * 3 / 4 - 1); + long[] status = statusesAllSources.get(allSourceDate); + timeouts = status[0]; + failures = status[1]; + requests = status[2]; + stats.put(allSourceDate, allSourceDate + "," + q1 + "," + md + + "," + q3 + "," + timeouts + "," + failures + "," + + requests); + } + logger.fine("Finished writing file " + rawFile.getAbsolutePath() + + "."); + } + if (stats.size() > 0) { + logger.fine("Writing file " + statsFile.getAbsolutePath() + + "..."); + statsFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter(new FileWriter(statsFile)); + bw.append("source,date,q1,md,q3,timeouts,failures,requests\n"); + for (String s : stats.values()) { + bw.append(s + "\n"); + } + bw.close(); + logger.fine("Finished writing file " + statsFile.getAbsolutePath() + + "."); + } + } catch (IOException e) { + logger.log(Level.WARNING, "Failed writing " + + rawFile.getAbsolutePath() + " or " + + statsFile.getAbsolutePath() + "!", e); + } + + /* Write stats. */ + StringBuilder dumpStats = new StringBuilder("Finished writing " + + "statistics on torperf results.\nAdded " + addedRawObs + + " new observations in this execution.\n" + + "Last known obserations by source and file size are:"); + String lastSource = null; + String lastLine = null; + for (String s : rawObs.keySet()) { + String[] parts = s.split(","); + if (lastSource == null) { + lastSource = parts[0]; + } else if (!parts[0].equals(lastSource)) { + String lastKnownObservation = lastLine.split(",")[1] + " " + + lastLine.split(",")[2]; + dumpStats.append("\n" + lastSource + " " + lastKnownObservation); + lastSource = parts[0]; + } + lastLine = s; + } + if (lastSource != null) { + String lastKnownObservation = lastLine.split(",")[1] + " " + + lastLine.split(",")[2]; + dumpStats.append("\n" + lastSource + " " + lastKnownObservation); + } + logger.info(dumpStats.toString()); + + /* Write results to database. */ + if (connectionURL != null) { + try { + Map<String, String> insertRows = new HashMap<String, String>(); + insertRows.putAll(stats); + Set<String> updateRows = new HashSet<String>(); + Connection conn = DriverManager.getConnection(connectionURL); + conn.setAutoCommit(false); + Statement statement = conn.createStatement(); + ResultSet rs = statement.executeQuery( + "SELECT date, source, q1, md, q3, timeouts, failures, " + + "requests FROM torperf_stats"); + while (rs.next()) { + String date = rs.getDate(1).toString(); + String source = rs.getString(2); + String key = source + "," + date; + if (insertRows.containsKey(key)) { + String insertRow = insertRows.remove(key); + String[] newStats = insertRow.split(","); + long newQ1 = Long.parseLong(newStats[2]); + long newMd = Long.parseLong(newStats[3]); + long newQ3 = Long.parseLong(newStats[4]); + long newTimeouts = Long.parseLong(newStats[5]); + long newFailures = Long.parseLong(newStats[6]); + long newRequests = Long.parseLong(newStats[7]); + long oldQ1 = rs.getLong(3); + long oldMd = rs.getLong(4); + long oldQ3 = rs.getLong(5); + long oldTimeouts = rs.getLong(6); + long oldFailures = rs.getLong(7); + long oldRequests = rs.getLong(8); + if (newQ1 != oldQ1 || newMd != oldMd || newQ3 != oldQ3 || + newTimeouts != oldTimeouts || + newFailures != oldFailures || + newRequests != oldRequests) { + updateRows.add(insertRow); + } + } + } + PreparedStatement psU = conn.prepareStatement( + "UPDATE torperf_stats SET q1 = ?, md = ?, q3 = ?, " + + "timeouts = ?, failures = ?, requests = ? " + + "WHERE date = ? AND source = ?"); + for (String row : updateRows) { + String[] newStats = row.split(","); + String source = newStats[0]; + java.sql.Date date = java.sql.Date.valueOf(newStats[1]); + long q1 = Long.parseLong(newStats[2]); + long md = Long.parseLong(newStats[3]); + long q3 = Long.parseLong(newStats[4]); + long timeouts = Long.parseLong(newStats[5]); + long failures = Long.parseLong(newStats[6]); + long requests = Long.parseLong(newStats[7]); + psU.clearParameters(); + psU.setLong(1, q1); + psU.setLong(2, md); + psU.setLong(3, q3); + psU.setLong(4, timeouts); + psU.setLong(5, failures); + psU.setLong(6, requests); + psU.setDate(7, date); + psU.setString(8, source); + psU.executeUpdate(); + } + PreparedStatement psI = conn.prepareStatement( + "INSERT INTO torperf_stats (q1, md, q3, timeouts, failures, " + + "requests, date, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"); + for (String row : insertRows.values()) { + String[] newStats = row.split(","); + String source = newStats[0]; + java.sql.Date date = java.sql.Date.valueOf(newStats[1]); + long q1 = Long.parseLong(newStats[2]); + long md = Long.parseLong(newStats[3]); + long q3 = Long.parseLong(newStats[4]); + long timeouts = Long.parseLong(newStats[5]); + long failures = Long.parseLong(newStats[6]); + long requests = Long.parseLong(newStats[7]); + psI.clearParameters(); + psI.setLong(1, q1); + psI.setLong(2, md); + psI.setLong(3, q3); + psI.setLong(4, timeouts); + psI.setLong(5, failures); + psI.setLong(6, requests); + psI.setDate(7, date); + psI.setString(8, source); + psI.executeUpdate(); + } + conn.commit(); + conn.close(); + } catch (SQLException e) { + logger.log(Level.WARNING, "Failed to add torperf stats to " + + "database.", e); + } + } + } +} + diff --git a/run-web.sh b/run-web.sh new file mode 100755 index 0000000..5ee88d8 --- /dev/null +++ b/run-web.sh @@ -0,0 +1,3 @@ +#!/bin/sh +for i in $(ls shared/bin/[0-9]* | sort); do ./$i; done + diff --git a/run.sh b/run.sh deleted file mode 100755 index f8ac867..0000000 --- a/run.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/sh - -# TODO is there a better way to suppress Ant's output? -ant | grep "[java]" - diff --git a/shared/bin/01-rsync-descriptors.sh b/shared/bin/01-rsync-descriptors.sh new file mode 100755 index 0000000..649675f --- /dev/null +++ b/shared/bin/01-rsync-descriptors.sh @@ -0,0 +1,3 @@ +#!/bin/sh +rsync -arz --delete --exclude 'relay-descriptors/votes' metrics.torproject.org::metrics-recent shared/in + diff --git a/shared/bin/10-legacy.sh b/shared/bin/10-legacy.sh new file mode 100755 index 0000000..cd9131f --- /dev/null +++ b/shared/bin/10-legacy.sh @@ -0,0 +1,10 @@ +#!/bin/sh +cd modules/legacy/ +ant | grep "[java]" +psql -U metrics tordir -c 'SELECT * FROM refresh_all();' +psql -c 'COPY (SELECT * FROM stats_servers) TO STDOUT WITH CSV HEADER;' tordir > stats/servers.csv +psql -c 'COPY (SELECT * FROM stats_bandwidth) TO STDOUT WITH CSV HEADER;' tordir > stats/bandwidth.csv +psql -c 'COPY (SELECT * FROM stats_torperf) TO STDOUT WITH CSV HEADER;' tordir > stats/torperf.csv +psql -c 'COPY (SELECT * FROM stats_connbidirect) TO STDOUT WITH CSV HEADER;' tordir > stats/connbidirect.csv +cd ../../ + diff --git a/shared/bin/99-copy-stats-files.sh b/shared/bin/99-copy-stats-files.sh new file mode 100755 index 0000000..6dce205 --- /dev/null +++ b/shared/bin/99-copy-stats-files.sh @@ -0,0 +1,4 @@ +#!/bin/sh +mkdir -p shared/stats +cp -a modules/legacy/stats/*.csv shared/stats/ + diff --git a/src/org/torproject/ernie/cron/Configuration.java b/src/org/torproject/ernie/cron/Configuration.java deleted file mode 100644 index 878e882..0000000 --- a/src/org/torproject/ernie/cron/Configuration.java +++ /dev/null @@ -1,163 +0,0 @@ -/* Copyright 2011, 2012 The Tor Project - * See LICENSE for licensing information */ -package org.torproject.ernie.cron; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.net.MalformedURLException; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Initialize configuration with hard-coded defaults, overwrite with - * configuration in config file, if exists, and answer Main.java about our - * configuration. - */ -public class Configuration { - private boolean importDirectoryArchives = false; - private String directoryArchivesDirectory = "in/relay-descriptors/"; - private boolean keepDirectoryArchiveImportHistory = false; - private boolean importSanitizedBridges = false; - private String sanitizedBridgesDirectory = "in/bridge-descriptors/"; - private boolean keepSanitizedBridgesImportHistory = false; - private boolean writeRelayDescriptorDatabase = false; - private String relayDescriptorDatabaseJdbc = - "jdbc:postgresql://localhost/tordir?user=metrics&password=password"; - private boolean writeRelayDescriptorsRawFiles = false; - private String relayDescriptorRawFilesDirectory = "pg-import/"; - private boolean writeBridgeStats = false; - private boolean importWriteTorperfStats = false; - private String torperfDirectory = "in/torperf/"; - private String exoneraTorDatabaseJdbc = "jdbc:postgresql:" - + "//localhost/exonerator?user=metrics&password=password"; - private String exoneraTorImportDirectory = "exonerator-import/"; - - public Configuration() { - - /* Initialize logger. */ - Logger logger = Logger.getLogger(Configuration.class.getName()); - - /* Read config file, if present. */ - File configFile = new File("config"); - if (!configFile.exists()) { - logger.warning("Could not find config file."); - return; - } - String line = null; - try { - BufferedReader br = new BufferedReader(new FileReader(configFile)); - while ((line = br.readLine()) != null) { - if (line.startsWith("#") || line.length() < 1) { - continue; - } else if (line.startsWith("ImportDirectoryArchives")) { - this.importDirectoryArchives = Integer.parseInt( - line.split(" ")[1]) != 0; - } else if (line.startsWith("DirectoryArchivesDirectory")) { - this.directoryArchivesDirectory = line.split(" ")[1]; - } else if (line.startsWith("KeepDirectoryArchiveImportHistory")) { - this.keepDirectoryArchiveImportHistory = Integer.parseInt( - line.split(" ")[1]) != 0; - } else if (line.startsWith("ImportSanitizedBridges")) { - this.importSanitizedBridges = Integer.parseInt( - line.split(" ")[1]) != 0; - } else if (line.startsWith("SanitizedBridgesDirectory")) { - this.sanitizedBridgesDirectory = line.split(" ")[1]; - } else if (line.startsWith("KeepSanitizedBridgesImportHistory")) { - this.keepSanitizedBridgesImportHistory = Integer.parseInt( - line.split(" ")[1]) != 0; - } else if (line.startsWith("WriteRelayDescriptorDatabase")) { - this.writeRelayDescriptorDatabase = Integer.parseInt( - line.split(" ")[1]) != 0; - } else if (line.startsWith("RelayDescriptorDatabaseJDBC")) { - this.relayDescriptorDatabaseJdbc = line.split(" ")[1]; - } else if (line.startsWith("WriteRelayDescriptorsRawFiles")) { - this.writeRelayDescriptorsRawFiles = Integer.parseInt( - line.split(" ")[1]) != 0; - } else if (line.startsWith("RelayDescriptorRawFilesDirectory")) { - this.relayDescriptorRawFilesDirectory = line.split(" ")[1]; - } else if (line.startsWith("WriteBridgeStats")) { - this.writeBridgeStats = Integer.parseInt( - line.split(" ")[1]) != 0; - } else if (line.startsWith("ImportWriteTorperfStats")) { - this.importWriteTorperfStats = Integer.parseInt( - line.split(" ")[1]) != 0; - } else if (line.startsWith("TorperfDirectory")) { - this.torperfDirectory = line.split(" ")[1]; - } else if (line.startsWith("ExoneraTorDatabaseJdbc")) { - this.exoneraTorDatabaseJdbc = line.split(" ")[1]; - } else if (line.startsWith("ExoneraTorImportDirectory")) { - this.exoneraTorImportDirectory = line.split(" ")[1]; - } else { - logger.severe("Configuration file contains unrecognized " - + "configuration key in line '" + line + "'! Exiting!"); - System.exit(1); - } - } - br.close(); - } catch (ArrayIndexOutOfBoundsException e) { - logger.severe("Configuration file contains configuration key " - + "without value in line '" + line + "'. Exiting!"); - System.exit(1); - } catch (MalformedURLException e) { - logger.severe("Configuration file contains illegal URL or IP:port " - + "pair in line '" + line + "'. Exiting!"); - System.exit(1); - } catch (NumberFormatException e) { - logger.severe("Configuration file contains illegal value in line '" - + line + "' with legal values being 0 or 1. Exiting!"); - System.exit(1); - } catch (IOException e) { - logger.log(Level.SEVERE, "Unknown problem while reading config " - + "file! Exiting!", e); - System.exit(1); - } - } - public boolean getImportDirectoryArchives() { - return this.importDirectoryArchives; - } - public String getDirectoryArchivesDirectory() { - return this.directoryArchivesDirectory; - } - public boolean getKeepDirectoryArchiveImportHistory() { - return this.keepDirectoryArchiveImportHistory; - } - public boolean getWriteRelayDescriptorDatabase() { - return this.writeRelayDescriptorDatabase; - } - public boolean getImportSanitizedBridges() { - return this.importSanitizedBridges; - } - public String getSanitizedBridgesDirectory() { - return this.sanitizedBridgesDirectory; - } - public boolean getKeepSanitizedBridgesImportHistory() { - return this.keepSanitizedBridgesImportHistory; - } - public String getRelayDescriptorDatabaseJDBC() { - return this.relayDescriptorDatabaseJdbc; - } - public boolean getWriteRelayDescriptorsRawFiles() { - return this.writeRelayDescriptorsRawFiles; - } - public String getRelayDescriptorRawFilesDirectory() { - return this.relayDescriptorRawFilesDirectory; - } - public boolean getWriteBridgeStats() { - return this.writeBridgeStats; - } - public boolean getImportWriteTorperfStats() { - return this.importWriteTorperfStats; - } - public String getTorperfDirectory() { - return this.torperfDirectory; - } - public String getExoneraTorDatabaseJdbc() { - return this.exoneraTorDatabaseJdbc; - } - public String getExoneraTorImportDirectory() { - return this.exoneraTorImportDirectory; - } -} - diff --git a/src/org/torproject/ernie/cron/LockFile.java b/src/org/torproject/ernie/cron/LockFile.java deleted file mode 100644 index 4de8da0..0000000 --- a/src/org/torproject/ernie/cron/LockFile.java +++ /dev/null @@ -1,52 +0,0 @@ -/* Copyright 2011, 2012 The Tor Project - * See LICENSE for licensing information */ -package org.torproject.ernie.cron; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.util.logging.Logger; - -public class LockFile { - - private File lockFile; - private Logger logger; - - public LockFile() { - this.lockFile = new File("lock"); - this.logger = Logger.getLogger(LockFile.class.getName()); - } - - public boolean acquireLock() { - this.logger.fine("Trying to acquire lock..."); - try { - if (this.lockFile.exists()) { - BufferedReader br = new BufferedReader(new FileReader("lock")); - long runStarted = Long.parseLong(br.readLine()); - br.close(); - if (System.currentTimeMillis() - runStarted < 55L * 60L * 1000L) { - return false; - } - } - BufferedWriter bw = new BufferedWriter(new FileWriter("lock")); - bw.append("" + System.currentTimeMillis() + "\n"); - bw.close(); - this.logger.fine("Acquired lock."); - return true; - } catch (IOException e) { - this.logger.warning("Caught exception while trying to acquire " - + "lock!"); - return false; - } - } - - public void releaseLock() { - this.logger.fine("Releasing lock..."); - this.lockFile.delete(); - this.logger.fine("Released lock."); - } -} - diff --git a/src/org/torproject/ernie/cron/LoggingConfiguration.java b/src/org/torproject/ernie/cron/LoggingConfiguration.java deleted file mode 100644 index c261d95..0000000 --- a/src/org/torproject/ernie/cron/LoggingConfiguration.java +++ /dev/null @@ -1,94 +0,0 @@ -/* Copyright 2011, 2012 The Tor Project - * See LICENSE for licensing information */ -package org.torproject.ernie.cron; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.TimeZone; -import java.util.logging.ConsoleHandler; -import java.util.logging.FileHandler; -import java.util.logging.Formatter; -import java.util.logging.Handler; -import java.util.logging.Level; -import java.util.logging.LogRecord; -import java.util.logging.Logger; - -/** - * Initialize logging configuration. - * - * Log levels used by ERNIE: - * - * - SEVERE: An event made it impossible to continue program execution. - * - WARNING: A potential problem occurred that requires the operator to - * look after the otherwise unattended setup - * - INFO: Messages on INFO level are meant to help the operator in making - * sure that operation works as expected. - * - FINE: Debug messages that are used to identify problems and which are - * turned on by default. - * - FINER: More detailed debug messages to investigate problems in more - * detail. Not turned on by default. Increase log file limit when using - * FINER. - * - FINEST: Most detailed debug messages. Not used. - */ -public class LoggingConfiguration { - - public LoggingConfiguration() { - - /* Remove default console handler. */ - for (Handler h : Logger.getLogger("").getHandlers()) { - Logger.getLogger("").removeHandler(h); - } - - /* Disable logging of internal Sun classes. */ - Logger.getLogger("sun").setLevel(Level.OFF); - - /* Set minimum log level we care about from INFO to FINER. */ - Logger.getLogger("").setLevel(Level.FINER); - - /* Create log handler that writes messages on WARNING or higher to the - * console. */ - final SimpleDateFormat dateTimeFormat = - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - Formatter cf = new Formatter() { - public String format(LogRecord record) { - return dateTimeFormat.format(new Date(record.getMillis())) + " " - + record.getMessage() + "\n"; - } - }; - Handler ch = new ConsoleHandler(); - ch.setFormatter(cf); - ch.setLevel(Level.WARNING); - Logger.getLogger("").addHandler(ch); - - /* Initialize own logger for this class. */ - Logger logger = Logger.getLogger( - LoggingConfiguration.class.getName()); - - /* Create log handler that writes all messages on FINE or higher to a - * local file. */ - Formatter ff = new Formatter() { - public String format(LogRecord record) { - return dateTimeFormat.format(new Date(record.getMillis())) + " " - + record.getLevel() + " " + record.getSourceClassName() + " " - + record.getSourceMethodName() + " " + record.getMessage() - + (record.getThrown() != null ? " " + record.getThrown() : "") - + "\n"; - } - }; - try { - FileHandler fh = new FileHandler("log", 5000000, 5, true); - fh.setFormatter(ff); - fh.setLevel(Level.FINE); - Logger.getLogger("").addHandler(fh); - } catch (SecurityException e) { - logger.log(Level.WARNING, "No permission to create log file. " - + "Logging to file is disabled.", e); - } catch (IOException e) { - logger.log(Level.WARNING, "Could not write to log file. Logging to " - + "file is disabled.", e); - } - } -} - diff --git a/src/org/torproject/ernie/cron/Main.java b/src/org/torproject/ernie/cron/Main.java deleted file mode 100644 index 5d561a6..0000000 --- a/src/org/torproject/ernie/cron/Main.java +++ /dev/null @@ -1,100 +0,0 @@ -/* Copyright 2011, 2012 The Tor Project - * See LICENSE for licensing information */ -package org.torproject.ernie.cron; - -import java.io.File; -import java.util.logging.Logger; - -import org.torproject.ernie.cron.network.ConsensusStatsFileHandler; -import org.torproject.ernie.cron.performance.PerformanceStatsImporter; -import org.torproject.ernie.cron.performance.TorperfProcessor; - -/** - * Coordinate downloading and parsing of descriptors and extraction of - * statistically relevant data for later processing with R. - */ -public class Main { - public static void main(String[] args) { - - /* Initialize logging configuration. */ - new LoggingConfiguration(); - - Logger logger = Logger.getLogger(Main.class.getName()); - logger.info("Starting ERNIE."); - - // Initialize configuration - Configuration config = new Configuration(); - - // Use lock file to avoid overlapping runs - LockFile lf = new LockFile(); - if (!lf.acquireLock()) { - logger.severe("Warning: ERNIE is already running or has not exited " - + "cleanly! Exiting!"); - System.exit(1); - } - - // Define stats directory for temporary files - File statsDirectory = new File("stats"); - - // Import relay descriptors - if (config.getImportDirectoryArchives()) { - RelayDescriptorDatabaseImporter rddi = - config.getWriteRelayDescriptorDatabase() || - config.getWriteRelayDescriptorsRawFiles() ? - new RelayDescriptorDatabaseImporter( - config.getWriteRelayDescriptorDatabase() ? - config.getRelayDescriptorDatabaseJDBC() : null, - config.getWriteRelayDescriptorsRawFiles() ? - config.getRelayDescriptorRawFilesDirectory() : null, - new File(config.getDirectoryArchivesDirectory()), - statsDirectory, - config.getKeepDirectoryArchiveImportHistory()) : null; - if (rddi != null) { - rddi.importRelayDescriptors(); - } - rddi.closeConnection(); - - // Import conn-bi-direct statistics. - PerformanceStatsImporter psi = new PerformanceStatsImporter( - config.getWriteRelayDescriptorDatabase() ? - config.getRelayDescriptorDatabaseJDBC() : null, - config.getWriteRelayDescriptorsRawFiles() ? - config.getRelayDescriptorRawFilesDirectory() : null, - new File(config.getDirectoryArchivesDirectory()), - statsDirectory, - config.getKeepDirectoryArchiveImportHistory()); - psi.importRelayDescriptors(); - psi.closeConnection(); - } - - // Prepare consensus stats file handler (used for stats on running - // bridges only) - ConsensusStatsFileHandler csfh = config.getWriteBridgeStats() ? - new ConsensusStatsFileHandler( - config.getRelayDescriptorDatabaseJDBC(), - new File(config.getSanitizedBridgesDirectory()), - statsDirectory, config.getKeepSanitizedBridgesImportHistory()) : - null; - - // Import sanitized bridges and write updated stats files to disk - if (csfh != null) { - if (config.getImportSanitizedBridges()) { - csfh.importSanitizedBridges(); - } - csfh.writeFiles(); - csfh = null; - } - - // Import and process torperf stats - if (config.getImportWriteTorperfStats()) { - new TorperfProcessor(new File(config.getTorperfDirectory()), - statsDirectory, config.getRelayDescriptorDatabaseJDBC()); - } - - // Remove lock file - lf.releaseLock(); - - logger.info("Terminating ERNIE."); - } -} - diff --git a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java deleted file mode 100644 index a51092e..0000000 --- a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java +++ /dev/null @@ -1,1077 +0,0 @@ -/* Copyright 2011, 2012 The Tor Project - * See LICENSE for licensing information */ -package org.torproject.ernie.cron; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.sql.CallableStatement; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TimeZone; -import java.util.TreeSet; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.postgresql.util.PGbytea; -import org.torproject.descriptor.Descriptor; -import org.torproject.descriptor.DescriptorFile; -import org.torproject.descriptor.DescriptorReader; -import org.torproject.descriptor.DescriptorSourceFactory; -import org.torproject.descriptor.ExtraInfoDescriptor; -import org.torproject.descriptor.NetworkStatusEntry; -import org.torproject.descriptor.RelayNetworkStatusConsensus; -import org.torproject.descriptor.ServerDescriptor; - -/** - * Parse directory data. - */ - -/* TODO Split up this class and move its parts to cron.network, - * cron.users, and status.relaysearch packages. Requires extensive - * changes to the database schema though. */ -public final class RelayDescriptorDatabaseImporter { - - /** - * How many records to commit with each database transaction. - */ - private final long autoCommitCount = 500; - - /** - * Keep track of the number of records committed before each transaction - */ - private int rdsCount = 0; - private int resCount = 0; - private int rhsCount = 0; - private int rrsCount = 0; - private int rcsCount = 0; - private int rvsCount = 0; - private int rqsCount = 0; - - /** - * Relay descriptor database connection. - */ - private Connection conn; - - /** - * Prepared statement to check whether any network status consensus - * entries matching a given valid-after time have been imported into the - * database before. - */ - private PreparedStatement psSs; - - /** - * Prepared statement to check whether a given network status consensus - * entry has been imported into the database before. - */ - private PreparedStatement psRs; - - /** - * Prepared statement to check whether a given server descriptor has - * been imported into the database before. - */ - private PreparedStatement psDs; - - /** - * Prepared statement to check whether a given network status consensus - * has been imported into the database before. - */ - private PreparedStatement psCs; - - /** - * Prepared statement to check whether a given dirreq stats string has - * been imported into the database before. - */ - private PreparedStatement psQs; - - /** - * Set of dates that have been inserted into the database for being - * included in the next refresh run. - */ - private Set<Long> scheduledUpdates; - - /** - * Prepared statement to insert a date into the database that shall be - * included in the next refresh run. - */ - private PreparedStatement psU; - - /** - * Prepared statement to insert a network status consensus entry into - * the database. - */ - private PreparedStatement psR; - - /** - * Prepared statement to insert a server descriptor into the database. - */ - private PreparedStatement psD; - - /** - * Callable statement to insert the bandwidth history of an extra-info - * descriptor into the database. - */ - private CallableStatement csH; - - /** - * Prepared statement to insert a network status consensus into the - * database. - */ - private PreparedStatement psC; - - /** - * Prepared statement to insert a given dirreq stats string into the - * database. - */ - private PreparedStatement psQ; - - /** - * Logger for this class. - */ - private Logger logger; - - /** - * Directory for writing raw import files. - */ - private String rawFilesDirectory; - - /** - * Raw import file containing status entries. - */ - private BufferedWriter statusentryOut; - - /** - * Raw import file containing server descriptors. - */ - private BufferedWriter descriptorOut; - - /** - * Raw import file containing bandwidth histories. - */ - private BufferedWriter bwhistOut; - - /** - * Raw import file containing consensuses. - */ - private BufferedWriter consensusOut; - - /** - * Raw import file containing dirreq stats. - */ - private BufferedWriter dirReqOut; - - /** - * Date format to parse timestamps. - */ - private SimpleDateFormat dateTimeFormat; - - /** - * The last valid-after time for which we checked whether they have been - * any network status entries in the database. - */ - private long lastCheckedStatusEntries; - - /** - * Set of fingerprints that we imported for the valid-after time in - * <code>lastCheckedStatusEntries</code>. - */ - private Set<String> insertedStatusEntries; - - /** - * Flag that tells us whether we need to check whether a network status - * entry is already contained in the database or not. - */ - private boolean separateStatusEntryCheckNecessary; - - private boolean importIntoDatabase; - private boolean writeRawImportFiles; - - private File archivesDirectory; - private File statsDirectory; - private boolean keepImportHistory; - - /** - * Initialize database importer by connecting to the database and - * preparing statements. - */ - public RelayDescriptorDatabaseImporter(String connectionURL, - String rawFilesDirectory, File archivesDirectory, - File statsDirectory, boolean keepImportHistory) { - - if (archivesDirectory == null || - statsDirectory == null) { - throw new IllegalArgumentException(); - } - this.archivesDirectory = archivesDirectory; - this.statsDirectory = statsDirectory; - this.keepImportHistory = keepImportHistory; - - /* Initialize logger. */ - this.logger = Logger.getLogger( - RelayDescriptorDatabaseImporter.class.getName()); - - if (connectionURL != null) { - try { - /* Connect to database. */ - this.conn = DriverManager.getConnection(connectionURL); - - /* Turn autocommit off */ - this.conn.setAutoCommit(false); - - /* Prepare statements. */ - this.psSs = conn.prepareStatement("SELECT COUNT(*) " - + "FROM statusentry WHERE validafter = ?"); - this.psRs = conn.prepareStatement("SELECT COUNT(*) " - + "FROM statusentry WHERE validafter = ? AND " - + "fingerprint = ?"); - this.psDs = conn.prepareStatement("SELECT COUNT(*) " - + "FROM descriptor WHERE descriptor = ?"); - this.psCs = conn.prepareStatement("SELECT COUNT(*) " - + "FROM consensus WHERE validafter = ?"); - this.psQs = conn.prepareStatement("SELECT COUNT(*) " - + "FROM dirreq_stats WHERE source = ? AND statsend = ?"); - this.psR = conn.prepareStatement("INSERT INTO statusentry " - + "(validafter, nickname, fingerprint, descriptor, " - + "published, address, orport, dirport, isauthority, " - + "isbadexit, isbaddirectory, isexit, isfast, isguard, " - + "ishsdir, isnamed, isstable, isrunning, isunnamed, " - + "isvalid, isv2dir, isv3dir, version, bandwidth, ports, " - + "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " - + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); - this.psD = conn.prepareStatement("INSERT INTO descriptor " - + "(descriptor, nickname, address, orport, dirport, " - + "fingerprint, bandwidthavg, bandwidthburst, " - + "bandwidthobserved, platform, published, uptime, " - + "extrainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " - + "?)"); - this.csH = conn.prepareCall("{call insert_bwhist(?, ?, ?, ?, ?, " - + "?)}"); - this.psC = conn.prepareStatement("INSERT INTO consensus " - + "(validafter) VALUES (?)"); - this.psQ = conn.prepareStatement("INSERT INTO dirreq_stats " - + "(source, statsend, seconds, country, requests) VALUES " - + "(?, ?, ?, ?, ?)"); - this.psU = conn.prepareStatement("INSERT INTO scheduled_updates " - + "(date) VALUES (?)"); - this.scheduledUpdates = new HashSet<Long>(); - this.importIntoDatabase = true; - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not connect to database or " - + "prepare statements.", e); - } - - /* Initialize set of fingerprints to remember which status entries - * we already imported. */ - this.insertedStatusEntries = new HashSet<String>(); - } - - /* Remember where we want to write raw import files. */ - if (rawFilesDirectory != null) { - this.rawFilesDirectory = rawFilesDirectory; - this.writeRawImportFiles = true; - } - - /* Initialize date format, so that we can format timestamps. */ - this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - } - - private void addDateToScheduledUpdates(long timestamp) - throws SQLException { - if (!this.importIntoDatabase) { - return; - } - long dateMillis = 0L; - try { - dateMillis = this.dateTimeFormat.parse( - this.dateTimeFormat.format(timestamp).substring(0, 10) - + " 00:00:00").getTime(); - } catch (ParseException e) { - this.logger.log(Level.WARNING, "Internal parsing error.", e); - return; - } - if (!this.scheduledUpdates.contains(dateMillis)) { - this.psU.setDate(1, new java.sql.Date(dateMillis)); - this.psU.execute(); - this.scheduledUpdates.add(dateMillis); - } - } - - /** - * Insert network status consensus entry into database. - */ - public void addStatusEntry(long validAfter, String nickname, - String fingerprint, String descriptor, long published, - String address, long orPort, long dirPort, - SortedSet<String> flags, String version, long bandwidth, - String ports, byte[] rawDescriptor) { - if (this.importIntoDatabase) { - try { - this.addDateToScheduledUpdates(validAfter); - Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - Timestamp validAfterTimestamp = new Timestamp(validAfter); - if (lastCheckedStatusEntries != validAfter) { - this.psSs.setTimestamp(1, validAfterTimestamp, cal); - ResultSet rs = psSs.executeQuery(); - rs.next(); - if (rs.getInt(1) == 0) { - separateStatusEntryCheckNecessary = false; - insertedStatusEntries.clear(); - } else { - separateStatusEntryCheckNecessary = true; - } - rs.close(); - lastCheckedStatusEntries = validAfter; - } - boolean alreadyContained = false; - if (separateStatusEntryCheckNecessary || - insertedStatusEntries.contains(fingerprint)) { - this.psRs.setTimestamp(1, validAfterTimestamp, cal); - this.psRs.setString(2, fingerprint); - ResultSet rs = psRs.executeQuery(); - rs.next(); - if (rs.getInt(1) > 0) { - alreadyContained = true; - } - rs.close(); - } else { - insertedStatusEntries.add(fingerprint); - } - if (!alreadyContained) { - this.psR.clearParameters(); - this.psR.setTimestamp(1, validAfterTimestamp, cal); - this.psR.setString(2, nickname); - this.psR.setString(3, fingerprint); - this.psR.setString(4, descriptor); - this.psR.setTimestamp(5, new Timestamp(published), cal); - this.psR.setString(6, address); - this.psR.setLong(7, orPort); - this.psR.setLong(8, dirPort); - this.psR.setBoolean(9, flags.contains("Authority")); - this.psR.setBoolean(10, flags.contains("BadExit")); - this.psR.setBoolean(11, flags.contains("BadDirectory")); - this.psR.setBoolean(12, flags.contains("Exit")); - this.psR.setBoolean(13, flags.contains("Fast")); - this.psR.setBoolean(14, flags.contains("Guard")); - this.psR.setBoolean(15, flags.contains("HSDir")); - this.psR.setBoolean(16, flags.contains("Named")); - this.psR.setBoolean(17, flags.contains("Stable")); - this.psR.setBoolean(18, flags.contains("Running")); - this.psR.setBoolean(19, flags.contains("Unnamed")); - this.psR.setBoolean(20, flags.contains("Valid")); - this.psR.setBoolean(21, flags.contains("V2Dir")); - this.psR.setBoolean(22, flags.contains("V3Dir")); - this.psR.setString(23, version); - this.psR.setLong(24, bandwidth); - this.psR.setString(25, ports); - this.psR.setBytes(26, rawDescriptor); - this.psR.executeUpdate(); - rrsCount++; - if (rrsCount % autoCommitCount == 0) { - this.conn.commit(); - } - } - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not add network status " - + "consensus entry. We won't make any further SQL requests " - + "in this execution.", e); - this.importIntoDatabase = false; - } - } - if (this.writeRawImportFiles) { - try { - if (this.statusentryOut == null) { - new File(rawFilesDirectory).mkdirs(); - this.statusentryOut = new BufferedWriter(new FileWriter( - rawFilesDirectory + "/statusentry.sql")); - this.statusentryOut.write(" COPY statusentry (validafter, " - + "nickname, fingerprint, descriptor, published, address, " - + "orport, dirport, isauthority, isbadExit, " - + "isbaddirectory, isexit, isfast, isguard, ishsdir, " - + "isnamed, isstable, isrunning, isunnamed, isvalid, " - + "isv2dir, isv3dir, version, bandwidth, ports, rawdesc) " - + "FROM stdin;\n"); - } - this.statusentryOut.write( - this.dateTimeFormat.format(validAfter) + "\t" + nickname - + "\t" + fingerprint.toLowerCase() + "\t" - + descriptor.toLowerCase() + "\t" - + this.dateTimeFormat.format(published) + "\t" + address - + "\t" + orPort + "\t" + dirPort + "\t" - + (flags.contains("Authority") ? "t" : "f") + "\t" - + (flags.contains("BadExit") ? "t" : "f") + "\t" - + (flags.contains("BadDirectory") ? "t" : "f") + "\t" - + (flags.contains("Exit") ? "t" : "f") + "\t" - + (flags.contains("Fast") ? "t" : "f") + "\t" - + (flags.contains("Guard") ? "t" : "f") + "\t" - + (flags.contains("HSDir") ? "t" : "f") + "\t" - + (flags.contains("Named") ? "t" : "f") + "\t" - + (flags.contains("Stable") ? "t" : "f") + "\t" - + (flags.contains("Running") ? "t" : "f") + "\t" - + (flags.contains("Unnamed") ? "t" : "f") + "\t" - + (flags.contains("Valid") ? "t" : "f") + "\t" - + (flags.contains("V2Dir") ? "t" : "f") + "\t" - + (flags.contains("V3Dir") ? "t" : "f") + "\t" - + (version != null ? version : "\N") + "\t" - + (bandwidth >= 0 ? bandwidth : "\N") + "\t" - + (ports != null ? ports : "\N") + "\t"); - this.statusentryOut.write(PGbytea.toPGString(rawDescriptor). - replaceAll("\\", "\\\\") + "\n"); - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not write network status " - + "consensus entry to raw database import file. We won't " - + "make any further attempts to write raw import files in " - + "this execution.", e); - this.writeRawImportFiles = false; - } catch (IOException e) { - this.logger.log(Level.WARNING, "Could not write network status " - + "consensus entry to raw database import file. We won't " - + "make any further attempts to write raw import files in " - + "this execution.", e); - this.writeRawImportFiles = false; - } - } - } - - /** - * Insert server descriptor into database. - */ - public void addServerDescriptor(String descriptor, String nickname, - String address, int orPort, int dirPort, String relayIdentifier, - long bandwidthAvg, long bandwidthBurst, long bandwidthObserved, - String platform, long published, long uptime, - String extraInfoDigest) { - if (this.importIntoDatabase) { - try { - this.addDateToScheduledUpdates(published); - this.addDateToScheduledUpdates( - published + 24L * 60L * 60L * 1000L); - Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - this.psDs.setString(1, descriptor); - ResultSet rs = psDs.executeQuery(); - rs.next(); - if (rs.getInt(1) == 0) { - this.psD.clearParameters(); - this.psD.setString(1, descriptor); - this.psD.setString(2, nickname); - this.psD.setString(3, address); - this.psD.setInt(4, orPort); - this.psD.setInt(5, dirPort); - this.psD.setString(6, relayIdentifier); - this.psD.setLong(7, bandwidthAvg); - this.psD.setLong(8, bandwidthBurst); - this.psD.setLong(9, bandwidthObserved); - /* Remove all non-ASCII characters from the platform string, or - * we'll make Postgres unhappy. Sun's JDK and OpenJDK behave - * differently when creating a new String with a given encoding. - * That's what the regexp below is for. */ - this.psD.setString(10, new String(platform.getBytes(), - "US-ASCII").replaceAll("[^\p{ASCII}]","")); - this.psD.setTimestamp(11, new Timestamp(published), cal); - this.psD.setLong(12, uptime); - this.psD.setString(13, extraInfoDigest); - this.psD.executeUpdate(); - rdsCount++; - if (rdsCount % autoCommitCount == 0) { - this.conn.commit(); - } - } - } catch (UnsupportedEncodingException e) { - // US-ASCII is supported for sure - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not add server " - + "descriptor. We won't make any further SQL requests in " - + "this execution.", e); - this.importIntoDatabase = false; - } - } - if (this.writeRawImportFiles) { - try { - if (this.descriptorOut == null) { - new File(rawFilesDirectory).mkdirs(); - this.descriptorOut = new BufferedWriter(new FileWriter( - rawFilesDirectory + "/descriptor.sql")); - this.descriptorOut.write(" COPY descriptor (descriptor, " - + "nickname, address, orport, dirport, fingerprint, " - + "bandwidthavg, bandwidthburst, bandwidthobserved, " - + "platform, published, uptime, extrainfo) FROM stdin;\n"); - } - this.descriptorOut.write(descriptor.toLowerCase() + "\t" - + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort - + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t" - + bandwidthBurst + "\t" + bandwidthObserved + "\t" - + (platform != null && platform.length() > 0 - ? new String(platform.getBytes(), "US-ASCII") : "\N") - + "\t" + this.dateTimeFormat.format(published) + "\t" - + (uptime >= 0 ? uptime : "\N") + "\t" - + (extraInfoDigest != null ? extraInfoDigest : "\N") - + "\n"); - } catch (UnsupportedEncodingException e) { - // US-ASCII is supported for sure - } catch (IOException e) { - this.logger.log(Level.WARNING, "Could not write server " - + "descriptor to raw database import file. We won't make " - + "any further attempts to write raw import files in this " - + "execution.", e); - this.writeRawImportFiles = false; - } - } - } - - /** - * Insert extra-info descriptor into database. - */ - public void addExtraInfoDescriptor(String extraInfoDigest, - String nickname, String fingerprint, long published, - List<String> bandwidthHistoryLines) { - if (!bandwidthHistoryLines.isEmpty()) { - this.addBandwidthHistory(fingerprint.toLowerCase(), published, - bandwidthHistoryLines); - } - } - - private static class BigIntArray implements java.sql.Array { - - private final String stringValue; - - public BigIntArray(long[] array, int offset) { - if (array == null) { - this.stringValue = "[-1:-1]={0}"; - } else { - StringBuilder sb = new StringBuilder("[" + offset + ":" - + (offset + array.length - 1) + "]={"); - for (int i = 0; i < array.length; i++) { - sb.append((i > 0 ? "," : "") + array[i]); - } - sb.append('}'); - this.stringValue = sb.toString(); - } - } - - public String toString() { - return stringValue; - } - - public String getBaseTypeName() { - return "int8"; - } - - /* The other methods are never called; no need to implement them. */ - public void free() { - throw new UnsupportedOperationException(); - } - public Object getArray() { - throw new UnsupportedOperationException(); - } - public Object getArray(long index, int count) { - throw new UnsupportedOperationException(); - } - public Object getArray(long index, int count, - Map<String, Class<?>> map) { - throw new UnsupportedOperationException(); - } - public Object getArray(Map<String, Class<?>> map) { - throw new UnsupportedOperationException(); - } - public int getBaseType() { - throw new UnsupportedOperationException(); - } - public ResultSet getResultSet() { - throw new UnsupportedOperationException(); - } - public ResultSet getResultSet(long index, int count) { - throw new UnsupportedOperationException(); - } - public ResultSet getResultSet(long index, int count, - Map<String, Class<?>> map) { - throw new UnsupportedOperationException(); - } - public ResultSet getResultSet(Map<String, Class<?>> map) { - throw new UnsupportedOperationException(); - } - } - - public void addBandwidthHistory(String fingerprint, long published, - List<String> bandwidthHistoryStrings) { - - /* Split history lines by date and rewrite them so that the date - * comes first. */ - SortedSet<String> historyLinesByDate = new TreeSet<String>(); - for (String bandwidthHistoryString : bandwidthHistoryStrings) { - String[] parts = bandwidthHistoryString.split(" "); - if (parts.length != 6) { - this.logger.finer("Bandwidth history line does not have expected " - + "number of elements. Ignoring this line."); - continue; - } - long intervalLength = 0L; - try { - intervalLength = Long.parseLong(parts[3].substring(1)); - } catch (NumberFormatException e) { - this.logger.fine("Bandwidth history line does not have valid " - + "interval length '" + parts[3] + " " + parts[4] + "'. " - + "Ignoring this line."); - continue; - } - if (intervalLength != 900L) { - this.logger.fine("Bandwidth history line does not consist of " - + "15-minute intervals. Ignoring this line."); - continue; - } - String type = parts[0]; - String intervalEndTime = parts[1] + " " + parts[2]; - long intervalEnd, dateStart; - try { - intervalEnd = dateTimeFormat.parse(intervalEndTime).getTime(); - dateStart = dateTimeFormat.parse(parts[1] + " 00:00:00"). - getTime(); - } catch (ParseException e) { - this.logger.fine("Parse exception while parsing timestamp in " - + "bandwidth history line. Ignoring this line."); - continue; - } - if (Math.abs(published - intervalEnd) > - 7L * 24L * 60L * 60L * 1000L) { - this.logger.fine("Extra-info descriptor publication time " - + dateTimeFormat.format(published) + " and last interval " - + "time " + intervalEndTime + " in " + type + " line differ " - + "by more than 7 days! Not adding this line!"); - continue; - } - long currentIntervalEnd = intervalEnd; - StringBuilder sb = new StringBuilder(); - String[] values = parts[5].split(","); - SortedSet<String> newHistoryLines = new TreeSet<String>(); - try { - for (int i = values.length - 1; i >= -1; i--) { - if (i == -1 || currentIntervalEnd < dateStart) { - sb.insert(0, intervalEndTime + " " + type + " (" - + intervalLength + " s) "); - sb.setLength(sb.length() - 1); - String historyLine = sb.toString(); - newHistoryLines.add(historyLine); - sb = new StringBuilder(); - dateStart -= 24L * 60L * 60L * 1000L; - intervalEndTime = dateTimeFormat.format(currentIntervalEnd); - } - if (i == -1) { - break; - } - Long.parseLong(values[i]); - sb.insert(0, values[i] + ","); - currentIntervalEnd -= intervalLength * 1000L; - } - } catch (NumberFormatException e) { - this.logger.fine("Number format exception while parsing " - + "bandwidth history line. Ignoring this line."); - continue; - } - historyLinesByDate.addAll(newHistoryLines); - } - - /* Add split history lines to database. */ - String lastDate = null; - historyLinesByDate.add("EOL"); - long[] readArray = null, writtenArray = null, dirreadArray = null, - dirwrittenArray = null; - int readOffset = 0, writtenOffset = 0, dirreadOffset = 0, - dirwrittenOffset = 0; - for (String historyLine : historyLinesByDate) { - String[] parts = historyLine.split(" "); - String currentDate = parts[0]; - if (lastDate != null && (historyLine.equals("EOL") || - !currentDate.equals(lastDate))) { - BigIntArray readIntArray = new BigIntArray(readArray, - readOffset); - BigIntArray writtenIntArray = new BigIntArray(writtenArray, - writtenOffset); - BigIntArray dirreadIntArray = new BigIntArray(dirreadArray, - dirreadOffset); - BigIntArray dirwrittenIntArray = new BigIntArray(dirwrittenArray, - dirwrittenOffset); - if (this.importIntoDatabase) { - try { - long dateMillis = dateTimeFormat.parse(lastDate - + " 00:00:00").getTime(); - this.addDateToScheduledUpdates(dateMillis); - this.csH.setString(1, fingerprint); - this.csH.setDate(2, new java.sql.Date(dateMillis)); - this.csH.setArray(3, readIntArray); - this.csH.setArray(4, writtenIntArray); - this.csH.setArray(5, dirreadIntArray); - this.csH.setArray(6, dirwrittenIntArray); - this.csH.addBatch(); - rhsCount++; - if (rhsCount % autoCommitCount == 0) { - this.csH.executeBatch(); - } - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not insert bandwidth " - + "history line into database. We won't make any " - + "further SQL requests in this execution.", e); - this.importIntoDatabase = false; - } catch (ParseException e) { - this.logger.log(Level.WARNING, "Could not insert bandwidth " - + "history line into database. We won't make any " - + "further SQL requests in this execution.", e); - this.importIntoDatabase = false; - } - } - if (this.writeRawImportFiles) { - try { - if (this.bwhistOut == null) { - new File(rawFilesDirectory).mkdirs(); - this.bwhistOut = new BufferedWriter(new FileWriter( - rawFilesDirectory + "/bwhist.sql")); - } - this.bwhistOut.write("SELECT insert_bwhist('" + fingerprint - + "','" + lastDate + "','" + readIntArray.toString() - + "','" + writtenIntArray.toString() + "','" - + dirreadIntArray.toString() + "','" - + dirwrittenIntArray.toString() + "');\n"); - } catch (IOException e) { - this.logger.log(Level.WARNING, "Could not write bandwidth " - + "history to raw database import file. We won't make " - + "any further attempts to write raw import files in " - + "this execution.", e); - this.writeRawImportFiles = false; - } - } - readArray = writtenArray = dirreadArray = dirwrittenArray = null; - } - if (historyLine.equals("EOL")) { - break; - } - long lastIntervalTime; - try { - lastIntervalTime = dateTimeFormat.parse(parts[0] + " " - + parts[1]).getTime() - dateTimeFormat.parse(parts[0] - + " 00:00:00").getTime(); - } catch (ParseException e) { - continue; - } - String[] stringValues = parts[5].split(","); - long[] longValues = new long[stringValues.length]; - for (int i = 0; i < longValues.length; i++) { - longValues[i] = Long.parseLong(stringValues[i]); - } - - int offset = (int) (lastIntervalTime / (15L * 60L * 1000L)) - - longValues.length + 1; - String type = parts[2]; - if (type.equals("read-history")) { - readArray = longValues; - readOffset = offset; - } else if (type.equals("write-history")) { - writtenArray = longValues; - writtenOffset = offset; - } else if (type.equals("dirreq-read-history")) { - dirreadArray = longValues; - dirreadOffset = offset; - } else if (type.equals("dirreq-write-history")) { - dirwrittenArray = longValues; - dirwrittenOffset = offset; - } - lastDate = currentDate; - } - } - - /** - * Insert network status consensus into database. - */ - public void addConsensus(long validAfter) { - if (this.importIntoDatabase) { - try { - this.addDateToScheduledUpdates(validAfter); - Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - Timestamp validAfterTimestamp = new Timestamp(validAfter); - this.psCs.setTimestamp(1, validAfterTimestamp, cal); - ResultSet rs = psCs.executeQuery(); - rs.next(); - if (rs.getInt(1) == 0) { - this.psC.clearParameters(); - this.psC.setTimestamp(1, validAfterTimestamp, cal); - this.psC.executeUpdate(); - rcsCount++; - if (rcsCount % autoCommitCount == 0) { - this.conn.commit(); - } - } - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not add network status " - + "consensus. We won't make any further SQL requests in " - + "this execution.", e); - this.importIntoDatabase = false; - } - } - if (this.writeRawImportFiles) { - try { - if (this.consensusOut == null) { - new File(rawFilesDirectory).mkdirs(); - this.consensusOut = new BufferedWriter(new FileWriter( - rawFilesDirectory + "/consensus.sql")); - this.consensusOut.write(" COPY consensus (validafter) " - + "FROM stdin;\n"); - } - String validAfterString = this.dateTimeFormat.format(validAfter); - this.consensusOut.write(validAfterString + "\n"); - } catch (IOException e) { - this.logger.log(Level.WARNING, "Could not write network status " - + "consensus to raw database import file. We won't make " - + "any further attempts to write raw import files in this " - + "execution.", e); - this.writeRawImportFiles = false; - } - } - } - - /** - * Adds observations on the number of directory requests by country as - * seen on a directory at a given date to the database. - */ - public void addDirReqStats(String source, long statsEndMillis, - long seconds, Map<String, String> dirReqsPerCountry) { - String statsEnd = this.dateTimeFormat.format(statsEndMillis); - if (this.importIntoDatabase) { - try { - this.addDateToScheduledUpdates(statsEndMillis); - Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - Timestamp statsEndTimestamp = new Timestamp(statsEndMillis); - this.psQs.setString(1, source); - this.psQs.setTimestamp(2, statsEndTimestamp, cal); - ResultSet rs = psQs.executeQuery(); - rs.next(); - if (rs.getInt(1) == 0) { - for (Map.Entry<String, String> e : - dirReqsPerCountry.entrySet()) { - this.psQ.clearParameters(); - this.psQ.setString(1, source); - this.psQ.setTimestamp(2, statsEndTimestamp, cal); - this.psQ.setLong(3, seconds); - this.psQ.setString(4, e.getKey()); - this.psQ.setLong(5, Long.parseLong(e.getValue())); - this.psQ.executeUpdate(); - rqsCount++; - if (rqsCount % autoCommitCount == 0) { - this.conn.commit(); - } - } - } - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not add dirreq stats. We " - + "won't make any further SQL requests in this execution.", - e); - this.importIntoDatabase = false; - } - } - if (this.writeRawImportFiles) { - try { - if (this.dirReqOut == null) { - new File(rawFilesDirectory).mkdirs(); - this.dirReqOut = new BufferedWriter(new FileWriter( - rawFilesDirectory + "/dirreq_stats.sql")); - this.dirReqOut.write(" COPY dirreq_stats (source, statsend, " - + "seconds, country, requests) FROM stdin;\n"); - } - for (Map.Entry<String, String> e : - dirReqsPerCountry.entrySet()) { - this.dirReqOut.write(source + "\t" + statsEnd + "\t" + seconds - + "\t" + e.getKey() + "\t" + e.getValue() + "\n"); - } - } catch (IOException e) { - this.logger.log(Level.WARNING, "Could not write dirreq stats to " - + "raw database import file. We won't make any further " - + "attempts to write raw import files in this execution.", e); - this.writeRawImportFiles = false; - } - } - } - - public void importRelayDescriptors() { - if (archivesDirectory.exists()) { - logger.fine("Importing files in directory " + archivesDirectory - + "/..."); - DescriptorReader reader = - DescriptorSourceFactory.createDescriptorReader(); - reader.addDirectory(archivesDirectory); - if (keepImportHistory) { - reader.setExcludeFiles(new File(statsDirectory, - "database-importer-relay-descriptor-history")); - } - Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors(); - while (descriptorFiles.hasNext()) { - DescriptorFile descriptorFile = descriptorFiles.next(); - if (descriptorFile.getDescriptors() != null) { - for (Descriptor descriptor : descriptorFile.getDescriptors()) { - if (descriptor instanceof RelayNetworkStatusConsensus) { - this.addRelayNetworkStatusConsensus( - (RelayNetworkStatusConsensus) descriptor); - } else if (descriptor instanceof ServerDescriptor) { - this.addServerDescriptor((ServerDescriptor) descriptor); - } else if (descriptor instanceof ExtraInfoDescriptor) { - this.addExtraInfoDescriptor( - (ExtraInfoDescriptor) descriptor); - } - } - } - } - } - - logger.info("Finished importing relay descriptors."); - } - - private void addRelayNetworkStatusConsensus( - RelayNetworkStatusConsensus consensus) { - for (NetworkStatusEntry statusEntry : - consensus.getStatusEntries().values()) { - this.addStatusEntry(consensus.getValidAfterMillis(), - statusEntry.getNickname(), - statusEntry.getFingerprint().toLowerCase(), - statusEntry.getDescriptor().toLowerCase(), - statusEntry.getPublishedMillis(), statusEntry.getAddress(), - statusEntry.getOrPort(), statusEntry.getDirPort(), - statusEntry.getFlags(), statusEntry.getVersion(), - statusEntry.getBandwidth(), statusEntry.getPortList(), - statusEntry.getStatusEntryBytes()); - } - this.addConsensus(consensus.getValidAfterMillis()); - } - - private void addServerDescriptor(ServerDescriptor descriptor) { - this.addServerDescriptor(descriptor.getServerDescriptorDigest(), - descriptor.getNickname(), descriptor.getAddress(), - descriptor.getOrPort(), descriptor.getDirPort(), - descriptor.getFingerprint(), descriptor.getBandwidthRate(), - descriptor.getBandwidthBurst(), descriptor.getBandwidthObserved(), - descriptor.getPlatform(), descriptor.getPublishedMillis(), - descriptor.getUptime(), descriptor.getExtraInfoDigest()); - } - - private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) { - if (descriptor.getDirreqV3Reqs() != null) { - int allUsers = 0; - Map<String, String> obs = new HashMap<String, String>(); - for (Map.Entry<String, Integer> e : - descriptor.getDirreqV3Reqs().entrySet()) { - String country = e.getKey(); - int users = e.getValue() - 4; - allUsers += users; - obs.put(country, "" + users); - } - obs.put("zy", "" + allUsers); - this.addDirReqStats(descriptor.getFingerprint(), - descriptor.getDirreqStatsEndMillis(), - descriptor.getDirreqStatsIntervalLength(), obs); - } - List<String> bandwidthHistoryLines = new ArrayList<String>(); - if (descriptor.getWriteHistory() != null) { - bandwidthHistoryLines.add(descriptor.getWriteHistory().getLine()); - } - if (descriptor.getReadHistory() != null) { - bandwidthHistoryLines.add(descriptor.getReadHistory().getLine()); - } - if (descriptor.getDirreqWriteHistory() != null) { - bandwidthHistoryLines.add( - descriptor.getDirreqWriteHistory().getLine()); - } - if (descriptor.getDirreqReadHistory() != null) { - bandwidthHistoryLines.add( - descriptor.getDirreqReadHistory().getLine()); - } - this.addExtraInfoDescriptor(descriptor.getExtraInfoDigest(), - descriptor.getNickname(), - descriptor.getFingerprint().toLowerCase(), - descriptor.getPublishedMillis(), bandwidthHistoryLines); - } - - /** - * Close the relay descriptor database connection. - */ - public void closeConnection() { - - /* Log stats about imported descriptors. */ - this.logger.info(String.format("Finished importing relay " - + "descriptors: %d consensuses, %d network status entries, %d " - + "votes, %d server descriptors, %d extra-info descriptors, %d " - + "bandwidth history elements, and %d dirreq stats elements", - rcsCount, rrsCount, rvsCount, rdsCount, resCount, rhsCount, - rqsCount)); - - /* Insert scheduled updates a second time, just in case the refresh - * run has started since inserting them the first time in which case - * it will miss the data inserted afterwards. We cannot, however, - * insert them only now, because if a Java execution fails at a random - * point, we might have added data, but not the corresponding dates to - * update statistics. */ - if (this.importIntoDatabase) { - try { - for (long dateMillis : this.scheduledUpdates) { - this.psU.setDate(1, new java.sql.Date(dateMillis)); - this.psU.execute(); - } - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not add scheduled dates " - + "for the next refresh run.", e); - } - } - - /* Commit any stragglers before closing. */ - if (this.conn != null) { - try { - this.csH.executeBatch(); - - this.conn.commit(); - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not commit final records to " - + "database", e); - } - try { - this.conn.close(); - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not close database " - + "connection.", e); - } - } - - /* Close raw import files. */ - try { - if (this.statusentryOut != null) { - this.statusentryOut.write("\.\n"); - this.statusentryOut.close(); - } - if (this.descriptorOut != null) { - this.descriptorOut.write("\.\n"); - this.descriptorOut.close(); - } - if (this.bwhistOut != null) { - this.bwhistOut.write("\.\n"); - this.bwhistOut.close(); - } - if (this.consensusOut != null) { - this.consensusOut.write("\.\n"); - this.consensusOut.close(); - } - } catch (IOException e) { - this.logger.log(Level.WARNING, "Could not close one or more raw " - + "database import files.", e); - } - } -} - diff --git a/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java b/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java deleted file mode 100644 index d5cae37..0000000 --- a/src/org/torproject/ernie/cron/network/ConsensusStatsFileHandler.java +++ /dev/null @@ -1,380 +0,0 @@ -/* Copyright 2011, 2012 The Tor Project - * See LICENSE for licensing information */ -package org.torproject.ernie.cron.network; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.SortedMap; -import java.util.TimeZone; -import java.util.TreeMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.torproject.descriptor.BridgeNetworkStatus; -import org.torproject.descriptor.Descriptor; -import org.torproject.descriptor.DescriptorFile; -import org.torproject.descriptor.DescriptorReader; -import org.torproject.descriptor.DescriptorSourceFactory; -import org.torproject.descriptor.NetworkStatusEntry; - -/** - * Generates statistics on the average number of relays and bridges per - * day. Accepts parse results from <code>RelayDescriptorParser</code> and - * <code>BridgeDescriptorParser</code> and stores them in intermediate - * result files <code>stats/consensus-stats-raw</code> and - * <code>stats/bridge-consensus-stats-raw</code>. Writes final results to - * <code>stats/consensus-stats</code> for all days for which at least half - * of the expected consensuses or statuses are known. - */ -public class ConsensusStatsFileHandler { - - /** - * Intermediate results file holding the number of running bridges per - * bridge status. - */ - private File bridgeConsensusStatsRawFile; - - /** - * Number of running bridges in a given bridge status. Map keys are - * bridge status times formatted as "yyyy-MM-dd HH:mm:ss", map values - * are lines as read from <code>stats/bridge-consensus-stats-raw</code>. - */ - private SortedMap<String, String> bridgesRaw; - - /** - * Average number of running bridges per day. Map keys are dates - * formatted as "yyyy-MM-dd", map values are the last column as written - * to <code>stats/consensus-stats</code>. - */ - private SortedMap<String, String> bridgesPerDay; - - /** - * Logger for this class. - */ - private Logger logger; - - private int bridgeResultsAdded = 0; - - /* Database connection string. */ - private String connectionURL = null; - - private SimpleDateFormat dateTimeFormat; - - private File bridgesDir; - - private File statsDirectory; - - private boolean keepImportHistory; - - /** - * Initializes this class, including reading in intermediate results - * files <code>stats/consensus-stats-raw</code> and - * <code>stats/bridge-consensus-stats-raw</code> and final results file - * <code>stats/consensus-stats</code>. - */ - public ConsensusStatsFileHandler(String connectionURL, - File bridgesDir, File statsDirectory, - boolean keepImportHistory) { - - if (bridgesDir == null || statsDirectory == null) { - throw new IllegalArgumentException(); - } - this.bridgesDir = bridgesDir; - this.statsDirectory = statsDirectory; - this.keepImportHistory = keepImportHistory; - - /* Initialize local data structures to hold intermediate and final - * results. */ - this.bridgesPerDay = new TreeMap<String, String>(); - this.bridgesRaw = new TreeMap<String, String>(); - - /* Initialize file names for intermediate and final results files. */ - this.bridgeConsensusStatsRawFile = new File( - "stats/bridge-consensus-stats-raw"); - - /* Initialize database connection string. */ - this.connectionURL = connectionURL; - - this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - - /* Initialize logger. */ - this.logger = Logger.getLogger( - ConsensusStatsFileHandler.class.getName()); - - /* Read in number of running bridges per bridge status. */ - if (this.bridgeConsensusStatsRawFile.exists()) { - try { - this.logger.fine("Reading file " - + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "..."); - BufferedReader br = new BufferedReader(new FileReader( - this.bridgeConsensusStatsRawFile)); - String line = null; - while ((line = br.readLine()) != null) { - if (line.startsWith("date")) { - /* Skip headers. */ - continue; - } - String[] parts = line.split(","); - String dateTime = parts[0]; - if (parts.length == 2) { - this.bridgesRaw.put(dateTime, line + ",0"); - } else if (parts.length == 3) { - this.bridgesRaw.put(dateTime, line); - } else { - this.logger.warning("Corrupt line '" + line + "' in file " - + this.bridgeConsensusStatsRawFile.getAbsolutePath() - + "! Aborting to read this file!"); - break; - } - } - br.close(); - this.logger.fine("Finished reading file " - + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "."); - } catch (IOException e) { - this.logger.log(Level.WARNING, "Failed to read file " - + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "!", - e); - } - } - } - - /** - * Adds the intermediate results of the number of running bridges in a - * given bridge status to the existing observations. - */ - public void addBridgeConsensusResults(long publishedMillis, int running, - int runningEc2Bridges) { - String published = dateTimeFormat.format(publishedMillis); - String line = published + "," + running + "," + runningEc2Bridges; - if (!this.bridgesRaw.containsKey(published)) { - this.logger.finer("Adding new bridge numbers: " + line); - this.bridgesRaw.put(published, line); - this.bridgeResultsAdded++; - } else if (!line.equals(this.bridgesRaw.get(published))) { - this.logger.warning("The numbers of running bridges we were just " - + "given (" + line + ") are different from what we learned " - + "before (" + this.bridgesRaw.get(published) + ")! " - + "Overwriting!"); - this.bridgesRaw.put(published, line); - } - } - - public void importSanitizedBridges() { - if (bridgesDir.exists()) { - logger.fine("Importing files in directory " + bridgesDir + "/..."); - DescriptorReader reader = - DescriptorSourceFactory.createDescriptorReader(); - reader.addDirectory(bridgesDir); - if (keepImportHistory) { - reader.setExcludeFiles(new File(statsDirectory, - "consensus-stats-bridge-descriptor-history")); - } - Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors(); - while (descriptorFiles.hasNext()) { - DescriptorFile descriptorFile = descriptorFiles.next(); - if (descriptorFile.getDescriptors() != null) { - for (Descriptor descriptor : descriptorFile.getDescriptors()) { - if (descriptor instanceof BridgeNetworkStatus) { - this.addBridgeNetworkStatus( - (BridgeNetworkStatus) descriptor); - } - } - } - } - logger.info("Finished importing bridge descriptors."); - } - } - - private void addBridgeNetworkStatus(BridgeNetworkStatus status) { - int runningBridges = 0, runningEc2Bridges = 0; - for (NetworkStatusEntry statusEntry : - status.getStatusEntries().values()) { - if (statusEntry.getFlags().contains("Running")) { - runningBridges++; - if (statusEntry.getNickname().startsWith("ec2bridge")) { - runningEc2Bridges++; - } - } - } - this.addBridgeConsensusResults(status.getPublishedMillis(), - runningBridges, runningEc2Bridges); - } - - /** - * Aggregates the raw observations on relay and bridge numbers and - * writes both raw and aggregate observations to disk. - */ - public void writeFiles() { - - /* Go through raw observations of numbers of running bridges in bridge - * statuses, calculate averages per day, and add these averages to - * final results. */ - if (!this.bridgesRaw.isEmpty()) { - String tempDate = null; - int brunning = 0, brunningEc2 = 0, statuses = 0; - Iterator<String> it = this.bridgesRaw.values().iterator(); - boolean haveWrittenFinalLine = false; - while (it.hasNext() || !haveWrittenFinalLine) { - String next = it.hasNext() ? it.next() : null; - /* Finished reading a day or even all lines? */ - if (tempDate != null && (next == null - || !next.substring(0, 10).equals(tempDate))) { - /* Only write results if we have seen at least half of all - * statuses. */ - if (statuses >= 24) { - String line = "," + (brunning / statuses) + "," - + (brunningEc2 / statuses); - /* Are our results new? */ - if (!this.bridgesPerDay.containsKey(tempDate)) { - this.logger.finer("Adding new average bridge numbers: " - + tempDate + line); - this.bridgesPerDay.put(tempDate, line); - } else if (!line.equals(this.bridgesPerDay.get(tempDate))) { - this.logger.finer("Replacing existing average bridge " - + "numbers (" + this.bridgesPerDay.get(tempDate) - + " with new numbers: " + line); - this.bridgesPerDay.put(tempDate, line); - } - } - brunning = brunningEc2 = statuses = 0; - haveWrittenFinalLine = (next == null); - } - /* Sum up number of running bridges. */ - if (next != null) { - tempDate = next.substring(0, 10); - statuses++; - String[] parts = next.split(","); - brunning += Integer.parseInt(parts[1]); - brunningEc2 += Integer.parseInt(parts[2]); - } - } - } - - /* Write raw numbers of running bridges to disk. */ - try { - this.logger.fine("Writing file " - + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "..."); - this.bridgeConsensusStatsRawFile.getParentFile().mkdirs(); - BufferedWriter bw = new BufferedWriter( - new FileWriter(this.bridgeConsensusStatsRawFile)); - bw.append("datetime,brunning,brunningec2\n"); - for (String line : this.bridgesRaw.values()) { - bw.append(line + "\n"); - } - bw.close(); - this.logger.fine("Finished writing file " - + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "."); - } catch (IOException e) { - this.logger.log(Level.WARNING, "Failed to write file " - + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "!", - e); - } - - /* Add average number of bridges per day to the database. */ - if (connectionURL != null) { - try { - Map<String, String> insertRows = new HashMap<String, String>(), - updateRows = new HashMap<String, String>(); - insertRows.putAll(this.bridgesPerDay); - Connection conn = DriverManager.getConnection(connectionURL); - conn.setAutoCommit(false); - Statement statement = conn.createStatement(); - ResultSet rs = statement.executeQuery( - "SELECT date, avg_running, avg_running_ec2 " - + "FROM bridge_network_size"); - while (rs.next()) { - String date = rs.getDate(1).toString(); - if (insertRows.containsKey(date)) { - String insertRow = insertRows.remove(date); - String[] parts = insertRow.substring(1).split(","); - long newAvgRunning = Long.parseLong(parts[0]); - long newAvgRunningEc2 = Long.parseLong(parts[1]); - long oldAvgRunning = rs.getLong(2); - long oldAvgRunningEc2 = rs.getLong(3); - if (newAvgRunning != oldAvgRunning || - newAvgRunningEc2 != oldAvgRunningEc2) { - updateRows.put(date, insertRow); - } - } - } - rs.close(); - PreparedStatement psU = conn.prepareStatement( - "UPDATE bridge_network_size SET avg_running = ?, " - + "avg_running_ec2 = ? WHERE date = ?"); - for (Map.Entry<String, String> e : updateRows.entrySet()) { - java.sql.Date date = java.sql.Date.valueOf(e.getKey()); - String[] parts = e.getValue().substring(1).split(","); - long avgRunning = Long.parseLong(parts[0]); - long avgRunningEc2 = Long.parseLong(parts[1]); - psU.clearParameters(); - psU.setLong(1, avgRunning); - psU.setLong(2, avgRunningEc2); - psU.setDate(3, date); - psU.executeUpdate(); - } - PreparedStatement psI = conn.prepareStatement( - "INSERT INTO bridge_network_size (avg_running, " - + "avg_running_ec2, date) VALUES (?, ?, ?)"); - for (Map.Entry<String, String> e : insertRows.entrySet()) { - java.sql.Date date = java.sql.Date.valueOf(e.getKey()); - String[] parts = e.getValue().substring(1).split(","); - long avgRunning = Long.parseLong(parts[0]); - long avgRunningEc2 = Long.parseLong(parts[1]); - psI.clearParameters(); - psI.setLong(1, avgRunning); - psI.setLong(2, avgRunningEc2); - psI.setDate(3, date); - psI.executeUpdate(); - } - conn.commit(); - conn.close(); - } catch (SQLException e) { - logger.log(Level.WARNING, "Failed to add average bridge numbers " - + "to database.", e); - } - } - - /* Write stats. */ - StringBuilder dumpStats = new StringBuilder("Finished writing " - + "statistics on bridge network statuses to disk.\nAdded " - + this.bridgeResultsAdded + " bridge network status(es) in this " - + "execution."); - long now = System.currentTimeMillis(); - SimpleDateFormat dateTimeFormat = - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - if (this.bridgesRaw.isEmpty()) { - dumpStats.append("\nNo bridge status known yet."); - } else { - dumpStats.append("\nLast known bridge status was published " - + this.bridgesRaw.lastKey() + "."); - try { - if (now - 6L * 60L * 60L * 1000L > dateTimeFormat.parse( - this.bridgesRaw.lastKey()).getTime()) { - logger.warning("Last known bridge status is more than 6 hours " - + "old: " + this.bridgesRaw.lastKey()); - } - } catch (ParseException e) { - /* Can't parse the timestamp? Whatever. */ - } - } - logger.info(dumpStats.toString()); - } -} - diff --git a/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java b/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java deleted file mode 100644 index 815b37f..0000000 --- a/src/org/torproject/ernie/cron/performance/PerformanceStatsImporter.java +++ /dev/null @@ -1,271 +0,0 @@ -/* Copyright 2012 The Tor Project - * See LICENSE for licensing information */ -package org.torproject.ernie.cron.performance; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Iterator; -import java.util.TimeZone; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.torproject.descriptor.Descriptor; -import org.torproject.descriptor.DescriptorFile; -import org.torproject.descriptor.DescriptorReader; -import org.torproject.descriptor.DescriptorSourceFactory; -import org.torproject.descriptor.ExtraInfoDescriptor; - -public class PerformanceStatsImporter { - - /** - * How many records to commit with each database transaction. - */ - private final long autoCommitCount = 500; - - /** - * Keep track of the number of records committed before each transaction - */ - private int rbsCount = 0; - - /** - * Relay descriptor database connection. - */ - private Connection conn; - - /** - * Prepared statement to check whether a given conn-bi-direct stats - * string has been imported into the database before. - */ - private PreparedStatement psBs; - - /** - * Prepared statement to insert a conn-bi-direct stats string into the - * database. - */ - private PreparedStatement psB; - - /** - * Logger for this class. - */ - private Logger logger; - - /** - * Directory for writing raw import files. - */ - private String rawFilesDirectory; - - /** - * Raw import file containing conn-bi-direct stats strings. - */ - private BufferedWriter connBiDirectOut; - - /** - * Date format to parse timestamps. - */ - private SimpleDateFormat dateTimeFormat; - - private boolean importIntoDatabase; - private boolean writeRawImportFiles; - - private File archivesDirectory; - private File statsDirectory; - private boolean keepImportHistory; - - /** - * Initialize database importer by connecting to the database and - * preparing statements. - */ - public PerformanceStatsImporter(String connectionURL, - String rawFilesDirectory, File archivesDirectory, - File statsDirectory, boolean keepImportHistory) { - - if (archivesDirectory == null || - statsDirectory == null) { - throw new IllegalArgumentException(); - } - this.archivesDirectory = archivesDirectory; - this.statsDirectory = statsDirectory; - this.keepImportHistory = keepImportHistory; - - /* Initialize logger. */ - this.logger = Logger.getLogger( - PerformanceStatsImporter.class.getName()); - - if (connectionURL != null) { - try { - /* Connect to database. */ - this.conn = DriverManager.getConnection(connectionURL); - - /* Turn autocommit off */ - this.conn.setAutoCommit(false); - - /* Prepare statements. */ - this.psBs = conn.prepareStatement("SELECT COUNT(*) " - + "FROM connbidirect WHERE source = ? AND statsend = ?"); - this.psB = conn.prepareStatement("INSERT INTO connbidirect " - + "(source, statsend, seconds, belownum, readnum, writenum, " - + "bothnum) VALUES (?, ?, ?, ?, ?, ?, ?)"); - this.importIntoDatabase = true; - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not connect to database or " - + "prepare statements.", e); - } - } - - /* Remember where we want to write raw import files. */ - if (rawFilesDirectory != null) { - this.rawFilesDirectory = rawFilesDirectory; - this.writeRawImportFiles = true; - } - - /* Initialize date format, so that we can format timestamps. */ - this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - } - - /** - * Insert a conn-bi-direct stats string into the database. - */ - private void addConnBiDirect(String source, long statsEndMillis, - long seconds, long below, long read, long write, long both) { - String statsEnd = this.dateTimeFormat.format(statsEndMillis); - if (this.importIntoDatabase) { - try { - Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - Timestamp statsEndTimestamp = new Timestamp(statsEndMillis); - this.psBs.setString(1, source); - this.psBs.setTimestamp(2, statsEndTimestamp, cal); - ResultSet rs = psBs.executeQuery(); - rs.next(); - if (rs.getInt(1) == 0) { - this.psB.clearParameters(); - this.psB.setString(1, source); - this.psB.setTimestamp(2, statsEndTimestamp, cal); - this.psB.setLong(3, seconds); - this.psB.setLong(4, below); - this.psB.setLong(5, read); - this.psB.setLong(6, write); - this.psB.setLong(7, both); - this.psB.executeUpdate(); - rbsCount++; - if (rbsCount % autoCommitCount == 0) { - this.conn.commit(); - } - } - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not add conn-bi-direct " - + "stats string. We won't make any further SQL requests in " - + "this execution.", e); - this.importIntoDatabase = false; - } - } - if (this.writeRawImportFiles) { - try { - if (this.connBiDirectOut == null) { - new File(rawFilesDirectory).mkdirs(); - this.connBiDirectOut = new BufferedWriter(new FileWriter( - rawFilesDirectory + "/connbidirect.sql")); - this.connBiDirectOut.write(" COPY connbidirect (source, " - + "statsend, seconds, belownum, readnum, writenum, " - + "bothnum) FROM stdin;\n"); - } - this.connBiDirectOut.write(source + "\t" + statsEnd + "\t" - + seconds + "\t" + below + "\t" + read + "\t" + write + "\t" - + both + "\n"); - } catch (IOException e) { - this.logger.log(Level.WARNING, "Could not write conn-bi-direct " - + "stats string to raw database import file. We won't make " - + "any further attempts to write raw import files in this " - + "execution.", e); - this.writeRawImportFiles = false; - } - } - } - - public void importRelayDescriptors() { - if (archivesDirectory.exists()) { - logger.fine("Importing files in directory " + archivesDirectory - + "/..."); - DescriptorReader reader = - DescriptorSourceFactory.createDescriptorReader(); - reader.addDirectory(archivesDirectory); - if (keepImportHistory) { - reader.setExcludeFiles(new File(statsDirectory, - "performance-stats-relay-descriptor-history")); - } - Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors(); - while (descriptorFiles.hasNext()) { - DescriptorFile descriptorFile = descriptorFiles.next(); - if (descriptorFile.getDescriptors() != null) { - for (Descriptor descriptor : descriptorFile.getDescriptors()) { - if (descriptor instanceof ExtraInfoDescriptor) { - this.addExtraInfoDescriptor( - (ExtraInfoDescriptor) descriptor); - } - } - } - } - } - - logger.info("Finished importing relay descriptors."); - } - - private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) { - if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) { - this.addConnBiDirect(descriptor.getFingerprint(), - descriptor.getConnBiDirectStatsEndMillis(), - descriptor.getConnBiDirectStatsIntervalLength(), - descriptor.getConnBiDirectBelow(), - descriptor.getConnBiDirectRead(), - descriptor.getConnBiDirectWrite(), - descriptor.getConnBiDirectBoth()); - } - } - - /** - * Close the relay descriptor database connection. - */ - public void closeConnection() { - - /* Log stats about imported descriptors. */ - this.logger.info(String.format("Finished importing relay " - + "descriptors: %d conn-bi-direct stats lines", rbsCount)); - - /* Commit any stragglers before closing. */ - if (this.conn != null) { - try { - this.conn.commit(); - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not commit final records " - + "to database", e); - } - try { - this.conn.close(); - } catch (SQLException e) { - this.logger.log(Level.WARNING, "Could not close database " - + "connection.", e); - } - } - - /* Close raw import files. */ - try { - if (this.connBiDirectOut != null) { - this.connBiDirectOut.write("\.\n"); - this.connBiDirectOut.close(); - } - } catch (IOException e) { - this.logger.log(Level.WARNING, "Could not close one or more raw " - + "database import files.", e); - } - } -} diff --git a/src/org/torproject/ernie/cron/performance/TorperfProcessor.java b/src/org/torproject/ernie/cron/performance/TorperfProcessor.java deleted file mode 100644 index d7322db..0000000 --- a/src/org/torproject/ernie/cron/performance/TorperfProcessor.java +++ /dev/null @@ -1,374 +0,0 @@ -/* Copyright 2011, 2012 The Tor Project - * See LICENSE for licensing information */ -package org.torproject.ernie.cron.performance; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.TimeZone; -import java.util.TreeMap; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.torproject.descriptor.Descriptor; -import org.torproject.descriptor.DescriptorFile; -import org.torproject.descriptor.DescriptorReader; -import org.torproject.descriptor.DescriptorSourceFactory; -import org.torproject.descriptor.TorperfResult; - -public class TorperfProcessor { - public TorperfProcessor(File torperfDirectory, File statsDirectory, - String connectionURL) { - - if (torperfDirectory == null || statsDirectory == null) { - throw new IllegalArgumentException(); - } - - Logger logger = Logger.getLogger(TorperfProcessor.class.getName()); - File rawFile = new File(statsDirectory, "torperf-raw"); - File statsFile = new File(statsDirectory, "torperf-stats"); - SortedMap<String, String> rawObs = new TreeMap<String, String>(); - SortedMap<String, String> stats = new TreeMap<String, String>(); - int addedRawObs = 0; - SimpleDateFormat formatter = - new SimpleDateFormat("yyyy-MM-dd,HH:mm:ss"); - formatter.setTimeZone(TimeZone.getTimeZone("UTC")); - try { - if (rawFile.exists()) { - logger.fine("Reading file " + rawFile.getAbsolutePath() + "..."); - BufferedReader br = new BufferedReader(new FileReader(rawFile)); - String line = br.readLine(); // ignore header - while ((line = br.readLine()) != null) { - if (line.split(",").length != 4) { - logger.warning("Corrupt line in " + rawFile.getAbsolutePath() - + "!"); - break; - } - String key = line.substring(0, line.lastIndexOf(",")); - rawObs.put(key, line); - } - br.close(); - logger.fine("Finished reading file " + rawFile.getAbsolutePath() - + "."); - } - if (statsFile.exists()) { - logger.fine("Reading file " + statsFile.getAbsolutePath() - + "..."); - BufferedReader br = new BufferedReader(new FileReader(statsFile)); - String line = br.readLine(); // ignore header - while ((line = br.readLine()) != null) { - String key = line.split(",")[0] + "," + line.split(",")[1]; - stats.put(key, line); - } - br.close(); - logger.fine("Finished reading file " + statsFile.getAbsolutePath() - + "."); - } - if (torperfDirectory.exists()) { - logger.fine("Importing files in " + torperfDirectory + "/..."); - DescriptorReader descriptorReader = - DescriptorSourceFactory.createDescriptorReader(); - descriptorReader.addDirectory(torperfDirectory); - descriptorReader.setExcludeFiles(new File(statsDirectory, - "torperf-history")); - Iterator<DescriptorFile> descriptorFiles = - descriptorReader.readDescriptors(); - while (descriptorFiles.hasNext()) { - DescriptorFile descriptorFile = descriptorFiles.next(); - if (descriptorFile.getException() != null) { - logger.log(Level.FINE, "Error parsing file.", - descriptorFile.getException()); - continue; - } - for (Descriptor descriptor : descriptorFile.getDescriptors()) { - if (!(descriptor instanceof TorperfResult)) { - continue; - } - TorperfResult result = (TorperfResult) descriptor; - String source = result.getSource(); - long fileSize = result.getFileSize(); - if (fileSize == 51200) { - source += "-50kb"; - } else if (fileSize == 1048576) { - source += "-1mb"; - } else if (fileSize == 5242880) { - source += "-5mb"; - } else { - logger.fine("Unexpected file size '" + fileSize - + "'. Skipping."); - continue; - } - String dateTime = formatter.format(result.getStartMillis()); - long completeMillis = result.getDataCompleteMillis() - - result.getStartMillis(); - String key = source + "," + dateTime; - String value = key; - if ((result.didTimeout() == null && - result.getDataCompleteMillis() < 1) || - (result.didTimeout() != null && result.didTimeout())) { - value += ",-2"; // -2 for timeout - } else if (result.getReadBytes() < fileSize) { - value += ",-1"; // -1 for failure - } else { - value += "," + completeMillis; - } - if (!rawObs.containsKey(key)) { - rawObs.put(key, value); - addedRawObs++; - } - } - } - logger.fine("Finished importing files in " + torperfDirectory - + "/."); - } - if (rawObs.size() > 0) { - logger.fine("Writing file " + rawFile.getAbsolutePath() + "..."); - rawFile.getParentFile().mkdirs(); - BufferedWriter bw = new BufferedWriter(new FileWriter(rawFile)); - bw.append("source,date,start,completemillis\n"); - String tempSourceDate = null; - Iterator<Map.Entry<String, String>> it = - rawObs.entrySet().iterator(); - List<Long> dlTimes = new ArrayList<Long>(); - boolean haveWrittenFinalLine = false; - SortedMap<String, List<Long>> dlTimesAllSources = - new TreeMap<String, List<Long>>(); - SortedMap<String, long[]> statusesAllSources = - new TreeMap<String, long[]>(); - long failures = 0, timeouts = 0, requests = 0; - while (it.hasNext() || !haveWrittenFinalLine) { - Map.Entry<String, String> next = it.hasNext() ? it.next() : null; - if (tempSourceDate != null - && (next == null || !(next.getValue().split(",")[0] + "," - + next.getValue().split(",")[1]).equals(tempSourceDate))) { - if (dlTimes.size() > 4) { - Collections.sort(dlTimes); - long q1 = dlTimes.get(dlTimes.size() / 4 - 1); - long md = dlTimes.get(dlTimes.size() / 2 - 1); - long q3 = dlTimes.get(dlTimes.size() * 3 / 4 - 1); - stats.put(tempSourceDate, tempSourceDate + "," + q1 + "," - + md + "," + q3 + "," + timeouts + "," + failures + "," - + requests); - String allSourceDate = "all" + tempSourceDate.substring( - tempSourceDate.indexOf("-")); - if (dlTimesAllSources.containsKey(allSourceDate)) { - dlTimesAllSources.get(allSourceDate).addAll(dlTimes); - } else { - dlTimesAllSources.put(allSourceDate, dlTimes); - } - if (statusesAllSources.containsKey(allSourceDate)) { - long[] status = statusesAllSources.get(allSourceDate); - status[0] += timeouts; - status[1] += failures; - status[2] += requests; - } else { - long[] status = new long[3]; - status[0] = timeouts; - status[1] = failures; - status[2] = requests; - statusesAllSources.put(allSourceDate, status); - } - } - dlTimes = new ArrayList<Long>(); - failures = timeouts = requests = 0; - if (next == null) { - haveWrittenFinalLine = true; - } - } - if (next != null) { - bw.append(next.getValue() + "\n"); - String[] parts = next.getValue().split(","); - tempSourceDate = parts[0] + "," + parts[1]; - long completeMillis = Long.parseLong(parts[3]); - if (completeMillis == -2L) { - timeouts++; - } else if (completeMillis == -1L) { - failures++; - } else { - dlTimes.add(Long.parseLong(parts[3])); - } - requests++; - } - } - bw.close(); - for (Map.Entry<String, List<Long>> e : - dlTimesAllSources.entrySet()) { - String allSourceDate = e.getKey(); - dlTimes = e.getValue(); - Collections.sort(dlTimes); - long q1 = dlTimes.get(dlTimes.size() / 4 - 1); - long md = dlTimes.get(dlTimes.size() / 2 - 1); - long q3 = dlTimes.get(dlTimes.size() * 3 / 4 - 1); - long[] status = statusesAllSources.get(allSourceDate); - timeouts = status[0]; - failures = status[1]; - requests = status[2]; - stats.put(allSourceDate, allSourceDate + "," + q1 + "," + md - + "," + q3 + "," + timeouts + "," + failures + "," - + requests); - } - logger.fine("Finished writing file " + rawFile.getAbsolutePath() - + "."); - } - if (stats.size() > 0) { - logger.fine("Writing file " + statsFile.getAbsolutePath() - + "..."); - statsFile.getParentFile().mkdirs(); - BufferedWriter bw = new BufferedWriter(new FileWriter(statsFile)); - bw.append("source,date,q1,md,q3,timeouts,failures,requests\n"); - for (String s : stats.values()) { - bw.append(s + "\n"); - } - bw.close(); - logger.fine("Finished writing file " + statsFile.getAbsolutePath() - + "."); - } - } catch (IOException e) { - logger.log(Level.WARNING, "Failed writing " - + rawFile.getAbsolutePath() + " or " - + statsFile.getAbsolutePath() + "!", e); - } - - /* Write stats. */ - StringBuilder dumpStats = new StringBuilder("Finished writing " - + "statistics on torperf results.\nAdded " + addedRawObs - + " new observations in this execution.\n" - + "Last known obserations by source and file size are:"); - String lastSource = null; - String lastLine = null; - for (String s : rawObs.keySet()) { - String[] parts = s.split(","); - if (lastSource == null) { - lastSource = parts[0]; - } else if (!parts[0].equals(lastSource)) { - String lastKnownObservation = lastLine.split(",")[1] + " " - + lastLine.split(",")[2]; - dumpStats.append("\n" + lastSource + " " + lastKnownObservation); - lastSource = parts[0]; - } - lastLine = s; - } - if (lastSource != null) { - String lastKnownObservation = lastLine.split(",")[1] + " " - + lastLine.split(",")[2]; - dumpStats.append("\n" + lastSource + " " + lastKnownObservation); - } - logger.info(dumpStats.toString()); - - /* Write results to database. */ - if (connectionURL != null) { - try { - Map<String, String> insertRows = new HashMap<String, String>(); - insertRows.putAll(stats); - Set<String> updateRows = new HashSet<String>(); - Connection conn = DriverManager.getConnection(connectionURL); - conn.setAutoCommit(false); - Statement statement = conn.createStatement(); - ResultSet rs = statement.executeQuery( - "SELECT date, source, q1, md, q3, timeouts, failures, " - + "requests FROM torperf_stats"); - while (rs.next()) { - String date = rs.getDate(1).toString(); - String source = rs.getString(2); - String key = source + "," + date; - if (insertRows.containsKey(key)) { - String insertRow = insertRows.remove(key); - String[] newStats = insertRow.split(","); - long newQ1 = Long.parseLong(newStats[2]); - long newMd = Long.parseLong(newStats[3]); - long newQ3 = Long.parseLong(newStats[4]); - long newTimeouts = Long.parseLong(newStats[5]); - long newFailures = Long.parseLong(newStats[6]); - long newRequests = Long.parseLong(newStats[7]); - long oldQ1 = rs.getLong(3); - long oldMd = rs.getLong(4); - long oldQ3 = rs.getLong(5); - long oldTimeouts = rs.getLong(6); - long oldFailures = rs.getLong(7); - long oldRequests = rs.getLong(8); - if (newQ1 != oldQ1 || newMd != oldMd || newQ3 != oldQ3 || - newTimeouts != oldTimeouts || - newFailures != oldFailures || - newRequests != oldRequests) { - updateRows.add(insertRow); - } - } - } - PreparedStatement psU = conn.prepareStatement( - "UPDATE torperf_stats SET q1 = ?, md = ?, q3 = ?, " - + "timeouts = ?, failures = ?, requests = ? " - + "WHERE date = ? AND source = ?"); - for (String row : updateRows) { - String[] newStats = row.split(","); - String source = newStats[0]; - java.sql.Date date = java.sql.Date.valueOf(newStats[1]); - long q1 = Long.parseLong(newStats[2]); - long md = Long.parseLong(newStats[3]); - long q3 = Long.parseLong(newStats[4]); - long timeouts = Long.parseLong(newStats[5]); - long failures = Long.parseLong(newStats[6]); - long requests = Long.parseLong(newStats[7]); - psU.clearParameters(); - psU.setLong(1, q1); - psU.setLong(2, md); - psU.setLong(3, q3); - psU.setLong(4, timeouts); - psU.setLong(5, failures); - psU.setLong(6, requests); - psU.setDate(7, date); - psU.setString(8, source); - psU.executeUpdate(); - } - PreparedStatement psI = conn.prepareStatement( - "INSERT INTO torperf_stats (q1, md, q3, timeouts, failures, " - + "requests, date, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"); - for (String row : insertRows.values()) { - String[] newStats = row.split(","); - String source = newStats[0]; - java.sql.Date date = java.sql.Date.valueOf(newStats[1]); - long q1 = Long.parseLong(newStats[2]); - long md = Long.parseLong(newStats[3]); - long q3 = Long.parseLong(newStats[4]); - long timeouts = Long.parseLong(newStats[5]); - long failures = Long.parseLong(newStats[6]); - long requests = Long.parseLong(newStats[7]); - psI.clearParameters(); - psI.setLong(1, q1); - psI.setLong(2, md); - psI.setLong(3, q3); - psI.setLong(4, timeouts); - psI.setLong(5, failures); - psI.setLong(6, requests); - psI.setDate(7, date); - psI.setString(8, source); - psI.executeUpdate(); - } - conn.commit(); - conn.close(); - } catch (SQLException e) { - logger.log(Level.WARNING, "Failed to add torperf stats to " - + "database.", e); - } - } - } -} -