[metrics-tasks/master] Add new user counting code (#8462).

commit 138b6c492bb4eddb7793a0740c2199bed7ac55fc Author: Karsten Loesing <karsten.loesing@gmx.net> Date: Wed Apr 24 08:54:35 2013 +0200 Add new user counting code (#8462). --- task-8462/.gitignore | 7 + task-8462/README | 50 ++++ task-8462/init-userstats.sql | 573 ++++++++++++++++++++++++++++++++++++++++++ task-8462/run-userstats.sh | 17 ++ task-8462/src/Parse.java | 449 +++++++++++++++++++++++++++++++++ 5 files changed, 1096 insertions(+), 0 deletions(-) diff --git a/task-8462/.gitignore b/task-8462/.gitignore new file mode 100644 index 0000000..4c1e6ed --- /dev/null +++ b/task-8462/.gitignore @@ -0,0 +1,7 @@ +in/ +bin/ +lib/ +out/ +status/ +*.csv + diff --git a/task-8462/README b/task-8462/README new file mode 100644 index 0000000..9547efa --- /dev/null +++ b/task-8462/README @@ -0,0 +1,50 @@ ++------------------------------------------------------------------------+ +| An implementation of the user counting algorithm suggested in | +| Tor Tech Report 2012-10-001 for later integration with metrics-web | ++------------------------------------------------------------------------+ + +Instructions (for Debian Squeeze): + +Install Java 6 for descriptor parsing and PostgreSQL 8.4 for descriptor +data storage and aggregation: + + $ sudo apt-get install openjdk-6-jdk postgresql-8.4 + +Create a database user and database: + + $ sudo -u postgres createuser -P karsten + $ sudo -u postgres createdb -O karsten userstats + $ echo "password" > ~/.pgpass + $ chmod 0600 ~/.pgpass + $ psql -f init-userstats.sql userstats + +Create empty bin/, lib/, in/, status/, and out/ directories. + +Put required .jar files into the lib/ directory. See metrics-lib.git for +instructions: + + - lib/commons-codec-1.6.jar + - lib/commons-compress-1.4.1.jar + - lib/descriptor.jar + +Run the run-userstats.sh script: + + $ ./run-userstats.sh + +Be patient. + +Advanced stuff: the database can also be initialized using descriptor +archives available at https://metrics.torproject.org/data.html. Only +relay consensuses, relay extra-info descriptors, and bridge descriptors +are required. Put them into the following directories, ideally after +decompressing (but not extracting them) using bunzip2: + + - in/relay-descriptors/ (consensuses-*.tar and extra-infos-*.tar) + - in/bridge-descriptors/ (bridge-descriptors-*.tar) + +Also comment out the rsync command in run-userstats.sh. Then run +run-userstats.sh. After initializing the database, clean up the in/ and +out/ directory and don't forget to put back the rsync command in +run-userstats.sh. It may be easier to set up separate instances of this +tool for initializing the database and for running it on a regular basis. + diff --git a/task-8462/init-userstats.sql b/task-8462/init-userstats.sql new file mode 100644 index 0000000..c586285 --- /dev/null +++ b/task-8462/init-userstats.sql @@ -0,0 +1,573 @@ +-- Copyright 2013 The Tor Project +-- See LICENSE for licensing information + +-- Use enum types for dimensions that may only change if we write new code +-- to support them. For example, if there's a new node type beyond relay +-- and bridge, we'll have to write code to support it. This is in +-- contrast to dimensions like country, transport, or version which don't +-- have their possible values hard-coded anywhere. +CREATE TYPE node AS ENUM ('relay', 'bridge'); +CREATE TYPE metric AS ENUM ('responses', 'bytes', 'status'); + +-- All new data first goes into the imported table. The import tool +-- should do some trivial checks for invalid or duplicate data, but +-- ultimately, we're going to do these checks in the database. For +-- example, the import tool could avoid importing data from the same +-- descriptor more than once, but it's fine to import the same history +-- string from distinct descriptors multiple times. The import tool must, +-- however, make sure that stats_end is not greater than 00:00:00 of the +-- day following stats_start. There are no constraints set on this table, +-- because importing data should be really, really fast. Once the newly +-- imported data is successfully processed, the imported table is emptied. +CREATE TABLE imported ( + + -- The 40-character upper-case hex string identifies a descriptor + -- uniquely and is used to join metrics (responses, bytes, status) + -- published by the same node (relay or bridge). + fingerprint CHARACTER(40) NOT NULL, + + -- The node type is used to decide the statistics that this entry will + -- be part of. + node node NOT NULL, + + -- The metric of this entry describes the stored observation type. + -- We'll want to store different metrics published by a node: + -- - 'responses' are the number of v3 network status consensus requests + -- that the node responded to; + -- - 'bytes' are the number of bytes that the node wrote when answering + -- directory requests; + -- - 'status' are the intervals when the node was listed as running in + -- the network status published by either the directory authorities or + -- bridge authority. + metric metric NOT NULL, + + -- The two-letter lower-case country code that the observation in this + -- entry can be attributed to; can be '??' if no country information is + -- known for this entry, or '' (empty string) if this entry summarizes + -- observations for all countries. + country CHARACTER VARYING(2) NOT NULL, + + -- The pluggable transport name that the observation in this entry can + -- be attributed to; can be '<OR>' if no pluggable transport was used, + -- '<??>' if an unknown pluggable transport was used, or '' (empty + -- string) if this entry summarizes observations for all transports. + transport CHARACTER VARYING(20) NOT NULL, + + -- The IP address version that the observation in this entry can be + -- attributed to; can be 'v4' or 'v6' or '' (empty string) if this entry + -- summarizes observations for all IP address versions. + version CHARACTER VARYING(2) NOT NULL, + + -- The interval start of this observation. + stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL, + + -- The interval end of this observation. This timestamp must be greater + -- than stats_start and must not be greater than 00:00:00 of the day + -- following stats_start, which the import tool must make sure. + stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL, + + -- Finally, the observed value. + val DOUBLE PRECISION NOT NULL +); + +-- After importing new data into the imported table, they are merged into +-- the merged table using the merge() function. The merged table contains +-- the same data as the imported table, except: +-- (1) there are no duplicate or overlapping entries in the merged table +-- with respect to stats_start and stats_end and the same fingerprint, +-- node, metric, country, transport, and version columns; +-- (2) all subsequent intervals with the same node, metric, country, +-- transport, version, and stats_start date are compressed into a +-- single entry. +CREATE TABLE merged ( + + -- The unique key that is only used when merging newly imported data + -- into this table. + id SERIAL PRIMARY KEY, + + -- All other columns have the same meaning as in the imported table. + fingerprint CHARACTER(40) NOT NULL, + node node NOT NULL, + metric metric NOT NULL, + country CHARACTER VARYING(2) NOT NULL, + transport CHARACTER VARYING(20) NOT NULL, + version CHARACTER VARYING(2) NOT NULL, + stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL, + stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL, + val DOUBLE PRECISION NOT NULL +); + +-- After merging new data into the merged table, they are aggregated to +-- daily user number estimates using the aggregate() function. Only dates +-- with new data in the imported table will be recomputed in the +-- aggregated table. The aggregated components follow the algorithm +-- proposed in Tor Tech Report 2012-10-001. +CREATE TABLE aggregated ( + + -- The date of these aggregated observations. + date DATE NOT NULL, + + -- The node, country, transport, and version columns all have the same + -- meaning as in the imported table. + node node NOT NULL, + country CHARACTER VARYING(2) NOT NULL DEFAULT '', + transport CHARACTER VARYING(20) NOT NULL DEFAULT '', + version CHARACTER VARYING(2) NOT NULL DEFAULT '', + + -- Total number of reported responses, possibly broken down by country, + -- transport, or version if either of them is not ''. See r(R) in the + -- tech report. + rrx DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Total number of seconds of nodes reporting responses, possibly broken + -- down by country, transport, or version if either of them is not ''. + -- This would be referred to as n(R) in the tech report, though it's not + -- used there. + nrx DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Total number of reported bytes. See h(H) in the tech report. + hh DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Total number of seconds of nodes in the status. See n(N) in the tech + -- report. + nn DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Number of reported bytes of nodes that reported both responses and + -- bytes. See h(R intersect H) in the tech report. + hrh DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Number of seconds of nodes reporting bytes. See n(H) in the tech + -- report. + nh DOUBLE PRECISION NOT NULL DEFAULT 0, + + -- Number of seconds of nodes reporting responses but no bytes. See + -- n(R \ H) in the tech report. + nrh DOUBLE PRECISION NOT NULL DEFAULT 0 +); + +CREATE LANGUAGE plpgsql; + +-- Merge new entries from the imported table into the merged table, and +-- compress them while doing so. This function first executes a query to +-- match all entries in the imported table with adjacent or even +-- overlapping entries in the merged table. It then loops over query +-- results and either inserts or updates entries in the merged table. The +-- idea is to leave query optimization to the database and only touch +-- as few entries as possible while running this function. +CREATE OR REPLACE FUNCTION merge() RETURNS VOID AS $$ +DECLARE + + -- The current record that we're handling in the loop body. + cur RECORD; + + -- Various information about the last record we processed, so that we + -- can merge the current record with the last one if possible. + last_fingerprint CHARACTER(40) := NULL; + last_node node; + last_metric metric; + last_country CHARACTER VARYING(2); + last_transport CHARACTER VARYING(20); + last_version CHARACTER VARYING(2); + last_start TIMESTAMP WITHOUT TIME ZONE; + last_end TIMESTAMP WITHOUT TIME ZONE; + last_id INTEGER; + last_val DOUBLE PRECISION; + + -- Interval end and value of the last record before updating them in the + -- last loop step. In a few edge cases, we may update an entry and + -- learn in the next loop step that the updated entry overlaps with the + -- subsequent entry. In these cases we'll have to undo the update, + -- which is why we're storing the updated values. + undo_end TIMESTAMP WITHOUT TIME ZONE; + undo_val DOUBLE PRECISION; + +BEGIN + RAISE NOTICE '% Starting to merge.', timeofday(); + + -- TODO Maybe we'll have to materialize a merged_part table that only + -- contains dates IN (SELECT DISTINCT DATE(stats_start) FROM imported) + -- and use that in the query below. + + -- Loop over results from a query that joins new entries in the imported + -- table with existing entries in the merged table. + FOR cur IN SELECT DISTINCT + + -- Select id, interval start and end, and value of the existing entry + -- in merged; all these fields may be null if the imported entry is + -- not adjacent to an existing one. + merged.id AS merged_id, + merged.stats_start AS merged_start, + merged.stats_end AS merged_end, + merged.val AS merged_val, + + -- Select interval start and end and value of the newly imported + -- entry. + imported.stats_start AS imported_start, + imported.stats_end AS imported_end, + imported.val AS imported_val, + + -- Select columns that define the group of entries that can be merged + -- in the merged table. + imported.fingerprint AS fingerprint, + imported.node AS node, + imported.metric AS metric, + imported.country AS country, + imported.transport AS transport, + imported.version AS version + + -- Select these columns from all entries in the imported table, plus + -- do an outer join on the merged table to find adjacent entries that + -- we might want to merge the new entries with. It's possible that we + -- handle the same imported entry twice, if it starts directly after + -- one existing entry and ends directly before another existing entry. + FROM imported LEFT JOIN merged + + -- First two join conditions are to find adjacent intervals. In fact, + -- we also include overlapping intervals here, so that we can skip the + -- overlapping entry in the imported table. + ON imported.stats_end >= merged.stats_start AND + imported.stats_start <= merged.stats_end AND + + -- Further join conditions are same date, fingerprint, node, etc., + -- so that we don't merge entries that don't belong together. + DATE(imported.stats_start) = DATE(merged.stats_start) AND + imported.fingerprint = merged.fingerprint AND + imported.node = merged.node AND + imported.metric = merged.metric AND + imported.country = merged.country AND + imported.transport = merged.transport AND + imported.version = merged.version + + -- Ordering is key, or our approach to merge subsequent entries is + -- going to break. + ORDER BY imported.fingerprint, imported.node, imported.metric, + imported.country, imported.transport, imported.version, + imported.stats_start, merged.stats_start, imported.stats_end + + -- Now go through the results one by one. + LOOP + + -- Log that we're done with the query and about to start merging. + IF last_fingerprint IS NULL THEN + RAISE NOTICE '% Query returned, now merging entries.', timeofday(); + END IF; + + -- If we're processing the very first entry or if we have reached a + -- new group of entries that belong together, (re-)set last_* + -- variables. + IF last_fingerprint IS NULL OR + DATE(cur.imported_start) <> DATE(last_start) OR + cur.fingerprint <> last_fingerprint OR + cur.node <> last_node OR + cur.metric <> last_metric OR + cur.country <> last_country OR + cur.transport <> last_transport OR + cur.version <> last_version THEN + last_id := -1; + last_start := '1970-01-01 00:00:00'; + last_end := '1970-01-01 00:00:00'; + last_val := -1; + END IF; + + -- Remember all fields that determine the group of which entries + -- belong together. + last_fingerprint := cur.fingerprint; + last_node := cur.node; + last_metric := cur.metric; + last_country := cur.country; + last_transport := cur.transport; + last_version := cur.version; + + -- If the existing entry that we're currently looking at starts before + -- the previous entry ends, we have created two overlapping entries in + -- the last iteration, and that is not allowed. Undo the previous + -- change. + IF cur.merged_start IS NOT NULL AND + cur.merged_start < last_end AND + undo_end IS NOT NULL AND undo_val IS NOT NULL THEN + UPDATE merged SET stats_end = undo_end, val = undo_val + WHERE id = last_id; + undo_end := NULL; + undo_val := NULL; + + -- If there is no adjacent entry to the one we're about to merge, + -- insert it as new entry. + ELSIF cur.merged_end IS NULL THEN + IF cur.imported_start > last_end THEN + last_start := cur.imported_start; + last_end := cur.imported_end; + last_val := cur.imported_val; + INSERT INTO merged (fingerprint, node, metric, country, transport, + version, stats_start, stats_end, val) + VALUES (last_fingerprint, last_node, last_metric, last_country, + last_transport, last_version, last_start, last_end, + last_val) + RETURNING id INTO last_id; + + -- If there was no adjacent entry before starting to merge, but + -- there is now one ending right before the new entry starts, merge + -- the new entry into the existing one. + ELSIF cur.imported_start = last_end THEN + last_val := last_val + cur.imported_val; + last_end := cur.imported_end; + UPDATE merged SET stats_end = last_end, val = last_val + WHERE id = last_id; + END IF; + + -- There's no risk of this entry overlapping with the next. + undo_end := NULL; + undo_val := NULL; + + -- If the new entry ends right when an existing entry starts, but + -- there's a gap between when the previously processed entry ends and + -- when the new entry starts, merge the new entry with the existing + -- entry we're currently looking at. + ELSIF cur.imported_end = cur.merged_start THEN + IF cur.imported_start > last_end THEN + last_id := cur.merged_id; + last_start := cur.imported_start; + last_end := cur.merged_end; + last_val := cur.imported_val + cur.merged_val; + UPDATE merged SET stats_start = last_start, val = last_val + WHERE id = last_id; + + -- If the new entry ends right when an existing entry starts and + -- there's no gap between when the previousl processed entry ends + -- and when the new entry starts, merge the new entry with the other + -- two entries. This happens by deleting the previous entry and + -- expanding the subsequent entry to cover all three entries. + ELSIF cur.imported_start = last_end THEN + DELETE FROM merged WHERE id = last_id; + last_id := cur.merged_id; + last_end := cur.merged_end; + last_val := last_val + cur.merged_val; + UPDATE merged SET stats_start = last_start, val = last_val + WHERE id = last_id; + END IF; + + -- There's no risk of this entry overlapping with the next. + undo_end := NULL; + undo_val := NULL; + + -- If the new entry starts right when an existing entry ends, but + -- there's a gap between the previously processed entry and the + -- existing one, extend the existing entry. There's a special case + -- when this operation is false and must be undone, which is when the + -- newly added entry overlaps with the subsequent entry. That's why + -- we have to store the old interval end and value, so that this + -- operation can be undone in the next loop iteration. + ELSIF cur.imported_start = cur.merged_end THEN + IF last_end < cur.imported_start THEN + undo_end := cur.merged_end; + undo_val := cur.merged_val; + last_id := cur.merged_id; + last_start := cur.merged_start; + last_end := cur.imported_end; + last_val := cur.merged_val + cur.imported_val; + UPDATE merged SET stats_end = last_end, val = last_val + WHERE id = last_id; + + -- If the new entry starts right when an existing entry ends and + -- there's no gap between the previously processed entry and the + -- existing entry, extend the existing entry. This is very similar + -- to the previous case. The same reasoning about possibly having + -- to undo this operation applies. + ELSE + undo_end := cur.merged_end; + undo_val := last_val; + last_end := cur.imported_end; + last_val := last_val + cur.imported_val; + UPDATE merged SET stats_end = last_end, val = last_val + WHERE id = last_id; + END IF; + + -- If none of the cases above applies, there must have been an overlap + -- between the new entry and an existing one. Skip the new entry. + ELSE + last_id := cur.merged_id; + last_start := cur.merged_start; + last_end := cur.merged_end; + last_val := cur.merged_val; + END IF; + END LOOP; + + -- That's it, we're done merging. + RAISE NOTICE '% Finishing merge.', timeofday(); + RETURN; +END; +$$ LANGUAGE plpgsql; + +-- Aggregate user estimates for all dates that have updated entries in the +-- merged table. This function first creates a temporary table with +-- new or updated observations, then removes all existing estimates for +-- the dates to be updated, and finally inserts newly computed aggregates +-- for these dates. +CREATE OR REPLACE FUNCTION aggregate() RETURNS VOID AS $$ +BEGIN + RAISE NOTICE '% Starting aggregate step.', timeofday(); + + -- Create a new temporary table containing all relevant information + -- needed to update the aggregated table. In this table, we sum up all + -- observations of a given type by reporting node. This query is + -- (temporarily) materialized, because we need to combine its entries + -- multiple times in various ways. A (non-materialized) view would have + -- meant to re-compute this query multiple times. + CREATE TEMPORARY TABLE update AS + SELECT fingerprint, node, metric, country, transport, version, + DATE(stats_start), SUM(val) AS val, + SUM(CAST(EXTRACT(EPOCH FROM stats_end - stats_start) + AS DOUBLE PRECISION)) AS seconds + FROM merged + WHERE DATE(stats_start) IN ( + SELECT DISTINCT DATE(stats_start) FROM imported) + GROUP BY fingerprint, node, metric, country, transport, version, + DATE(stats_start); + + -- Delete all entries from the aggregated table that we're about to + -- re-compute. + DELETE FROM aggregated WHERE date IN (SELECT DISTINCT date FROM update); + + -- Insert partly empty results for all existing combinations of date, + -- node ('relay' or 'bridge'), country, transport, and version. Only + -- the rrx and nrx fields will contain number and seconds of reported + -- responses for the given combination of date, node, etc., while the + -- other fields will be updated below. + INSERT INTO aggregated (date, node, country, transport, version, rrx, + nrx) + SELECT date, node, country, transport, version, SUM(val) AS rrx, + SUM(seconds) AS nrx + FROM update WHERE metric = 'responses' + GROUP BY date, node, country, transport, version; + + -- Create another temporary table with only those entries that aren't + -- broken down by any dimension. This table is much smaller, so the + -- following operations are much faster. + CREATE TEMPORARY TABLE update_no_dimensions AS + SELECT fingerprint, node, metric, date, val, seconds FROM update + WHERE country = '' + AND transport = '' + AND version = ''; + + -- Update results in the aggregated table by setting aggregates based + -- on reported directory bytes. These aggregates are only based on + -- date and node, so that the same values are set for all combinations + -- of country, transport, and version. + UPDATE aggregated + SET hh = aggregated_bytes.hh, nh = aggregated_bytes.nh + FROM ( + SELECT date, node, SUM(val) AS hh, SUM(seconds) AS nh + FROM update_no_dimensions + WHERE metric = 'bytes' + GROUP BY date, node + ) aggregated_bytes + WHERE aggregated.date = aggregated_bytes.date + AND aggregated.node = aggregated_bytes.node; + + -- Update results based on nodes being contained in the network status. + UPDATE aggregated + SET nn = aggregated_status.nn + FROM ( + SELECT date, node, SUM(seconds) AS nn + FROM update_no_dimensions + WHERE metric = 'status' + GROUP BY date, node + ) aggregated_status + WHERE aggregated.date = aggregated_status.date + AND aggregated.node = aggregated_status.node; + + -- Update results based on nodes reporting both bytes and responses. + UPDATE aggregated + SET hrh = aggregated_bytes_responses.hrh + FROM ( + SELECT bytes.date, bytes.node, + SUM((LEAST(bytes.seconds, responses.seconds) + * bytes.val) / bytes.seconds) AS hrh + FROM update_no_dimensions bytes + LEFT JOIN update_no_dimensions responses + ON bytes.date = responses.date + AND bytes.fingerprint = responses.fingerprint + AND bytes.node = responses.node + WHERE bytes.metric = 'bytes' + AND responses.metric = 'responses' + GROUP BY bytes.date, bytes.node + ) aggregated_bytes_responses + WHERE aggregated.date = aggregated_bytes_responses.date + AND aggregated.node = aggregated_bytes_responses.node; + + -- Update results based on notes reporting responses but no bytes. + UPDATE aggregated + SET nrh = aggregated_responses_bytes.nrh + FROM ( + SELECT responses.date, responses.node, + SUM(GREATEST(0, responses.seconds + - COALESCE(bytes.seconds, 0))) AS nrh + FROM update_no_dimensions responses + LEFT JOIN update_no_dimensions bytes + ON responses.date = bytes.date + AND responses.fingerprint = bytes.fingerprint + AND responses.node = bytes.node + WHERE responses.metric = 'responses' + AND bytes.metric = 'bytes' + GROUP BY responses.date, responses.node + ) aggregated_responses_bytes + WHERE aggregated.date = aggregated_responses_bytes.date + AND aggregated.node = aggregated_responses_bytes.node; + + -- We're done aggregating new data. + RAISE NOTICE '% Finishing aggregate step.', timeofday(); + RETURN; +END; +$$ LANGUAGE plpgsql; + +-- User-friendly view on the aggregated table that implements the +-- algorithm proposed in Tor Tech Report 2012-10-001. This view returns +-- user number estimates for both relay and bridge staistics, possibly +-- broken down by country or transport or version. +CREATE OR REPLACE VIEW estimated AS SELECT + + -- The date of this user number estimate. + a.date, + + -- The node type, which is either 'relay' or 'bridge'. + a.node, + + -- The two-letter lower-case country code of this estimate; can be '??' + -- for an estimate of users that could not be resolved to any country, + -- or '' (empty string) for an estimate of all users, regardless of + -- country. + a.country, + + -- The pluggable transport name of this estimate; can be '<OR>' for an + -- estimate of users that did not use any pluggable transport, '<??>' + -- for unknown pluggable transports, or '' (empty string) for an + -- estimate of all users, regardless of transport. + a.transport, + + -- The IP address version of this estimate; can be 'v4' or 'v6', or '' + -- (empty string) for an estimate of all users, regardless of IP address + -- version. + a.version, + + -- Estimated fraction of nodes reporting directory requests, which is + -- used to extrapolate observed requests to estimated total requests in + -- the network. The closer this fraction is to 1.0, the more precise + -- the estimation. + CAST(a.frac * 100 AS INTEGER) AS frac, + + -- Finally, the estimate number of users. + CAST(a.rrx / (a.frac * 10) AS INTEGER) AS users + + -- Implement the estimation method in a subquery, so that the ugly + -- formula only has to be written once. + FROM ( + SELECT date, node, country, transport, version, rrx, nrx, + (hrh * nh + hh * nrh) / (hh * nn) AS frac + FROM aggregated WHERE hh * nn > 0.0) a + + -- Only include estimates with at least 10% of nodes reporting directory + -- request statistics. + WHERE a.frac BETWEEN 0.1 AND 1.0 + + -- Order results. + ORDER BY date DESC, node, version, transport, country; + diff --git a/task-8462/run-userstats.sh b/task-8462/run-userstats.sh new file mode 100644 index 0000000..9a759ee --- /dev/null +++ b/task-8462/run-userstats.sh @@ -0,0 +1,17 @@ +#!/bin/sh +set -e +echo `date` "Starting." +echo `date` "Downloading descriptors." +rsync -arz --delete --exclude 'relay-descriptors/votes' metrics.torproject.org::metrics-recent in +echo `date` "Parsing descriptors." +javac -d bin/ -cp lib/commons-codec-1.6.jar:lib/commons-compress-1.4.1.jar:lib/descriptor.jar src/Parse.java +java -cp bin/:lib/commons-codec-1.6.jar:lib/commons-compress-1.4.1.jar:lib/descriptor.jar Parse +for i in $(ls out/*.sql) +do + echo `date` "Importing $i." + psql -f $i userstats +done +echo `date` "Exporting results." +psql -c 'COPY (SELECT * FROM estimated) TO STDOUT WITH CSV HEADER;' userstats > userstats.csv +echo `date` "Terminating." + diff --git a/task-8462/src/Parse.java b/task-8462/src/Parse.java new file mode 100644 index 0000000..fdf9bf2 --- /dev/null +++ b/task-8462/src/Parse.java @@ -0,0 +1,449 @@ +/* Copyright 2013 The Tor Project + * See LICENSE for licensing information */ + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.Stack; +import java.util.TimeZone; +import java.util.TreeMap; + +import org.torproject.descriptor.BandwidthHistory; +import org.torproject.descriptor.BridgeNetworkStatus; +import org.torproject.descriptor.Descriptor; +import org.torproject.descriptor.DescriptorFile; +import org.torproject.descriptor.DescriptorReader; +import org.torproject.descriptor.DescriptorSourceFactory; +import org.torproject.descriptor.ExtraInfoDescriptor; +import org.torproject.descriptor.NetworkStatusEntry; +import org.torproject.descriptor.RelayNetworkStatusConsensus; + +public class Parse { + + public static void main(String[] args) throws Exception { + detectBulkOrRegular(); + parseRelayDescriptors(); + parseBridgeDescriptors(); + closeOutputFiles(); + } + + private static boolean isBulkImport = false; + private static void detectBulkOrRegular() { + Stack<File> inFiles = new Stack<File>(); + inFiles.add(new File("in")); + while (!inFiles.isEmpty()) { + File file = inFiles.pop(); + if (file.isDirectory()) { + inFiles.addAll(Arrays.asList(file.listFiles())); + } else if (file.getName().endsWith(".tar") || + file.getName().endsWith(".tar.bz2")) { + isBulkImport = true; + break; + } else { + isBulkImport = false; + break; + } + } + } + + private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L, + ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS, + ONE_WEEK_MILLIS = 7L * ONE_DAY_MILLIS; + + private static void parseRelayDescriptors() throws Exception { + DescriptorReader descriptorReader = + DescriptorSourceFactory.createDescriptorReader(); + descriptorReader.setExcludeFiles(new File( + "status/relay-descriptors")); + descriptorReader.addDirectory(new File("in/relay-descriptors/")); + Iterator<DescriptorFile> descriptorFiles = + descriptorReader.readDescriptors(); + while (descriptorFiles.hasNext()) { + DescriptorFile descriptorFile = descriptorFiles.next(); + for (Descriptor descriptor : descriptorFile.getDescriptors()) { + if (descriptor instanceof ExtraInfoDescriptor) { + parseRelayExtraInfoDescriptor((ExtraInfoDescriptor) descriptor); + } else if (descriptor instanceof RelayNetworkStatusConsensus) { + parseRelayNetworkStatusConsensus( + (RelayNetworkStatusConsensus) descriptor); + } + } + } + } + + private static void parseRelayExtraInfoDescriptor( + ExtraInfoDescriptor descriptor) throws IOException { + long publishedMillis = descriptor.getPublishedMillis(); + String fingerprint = descriptor.getFingerprint(). + toUpperCase(); + long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis(); + long dirreqStatsIntervalLengthMillis = + descriptor.getDirreqStatsIntervalLength() * 1000L; + SortedMap<String, Integer> requests = descriptor.getDirreqV3Reqs(); + BandwidthHistory dirreqWriteHistory = + descriptor.getDirreqWriteHistory(); + parseRelayDirreqV3Reqs(fingerprint, publishedMillis, + dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis, requests); + parseRelayDirreqWriteHistory(fingerprint, publishedMillis, + dirreqWriteHistory); + } + + private static void parseRelayDirreqV3Reqs(String fingerprint, + long publishedMillis, long dirreqStatsEndMillis, + long dirreqStatsIntervalLengthMillis, + SortedMap<String, Integer> requests) throws IOException { + if (requests == null || + publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS || + dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) { + /* Cut off all observations that are one week older than + * the descriptor publication time, or we'll have to update + * weeks of aggregate values every hour. */ + return; + } + long statsStartMillis = dirreqStatsEndMillis + - dirreqStatsIntervalLengthMillis; + long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS) + * ONE_DAY_MILLIS; + for (int i = 0; i < 2; i++) { + long fromMillis = i == 0 ? statsStartMillis + : utcBreakMillis; + long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis; + if (fromMillis >= toMillis) { + continue; + } + double intervalFraction = ((double) (toMillis - fromMillis)) + / ((double) dirreqStatsIntervalLengthMillis); + double sum = 0L; + for (Map.Entry<String, Integer> e : requests.entrySet()) { + String country = e.getKey(); + double reqs = ((double) e.getValue()) - 4.0; + sum += reqs; + writeOutputLine(fingerprint, "relay", "responses", country, + "", "", fromMillis, toMillis, reqs * intervalFraction); + } + writeOutputLine(fingerprint, "relay", "responses", "", "", + "", fromMillis, toMillis, sum * intervalFraction); + } + } + + private static void parseRelayDirreqWriteHistory(String fingerprint, + long publishedMillis, BandwidthHistory dirreqWriteHistory) + throws IOException { + if (dirreqWriteHistory == null || + publishedMillis - dirreqWriteHistory.getHistoryEndMillis() + > ONE_WEEK_MILLIS) { + return; + /* Cut off all observations that are one week older than + * the descriptor publication time, or we'll have to update + * weeks of aggregate values every hour. */ + } + long intervalLengthMillis = + dirreqWriteHistory.getIntervalLength() * 1000L; + for (Map.Entry<Long, Long> e : + dirreqWriteHistory.getBandwidthValues().entrySet()) { + long intervalEndMillis = e.getKey(); + long intervalStartMillis = + intervalEndMillis - intervalLengthMillis; + for (int i = 0; i < 2; i++) { + long fromMillis = intervalStartMillis; + long toMillis = intervalEndMillis; + double writtenBytes = (double) e.getValue(); + if (intervalStartMillis / ONE_DAY_MILLIS < + intervalEndMillis / ONE_DAY_MILLIS) { + long utcBreakMillis = (intervalEndMillis + / ONE_DAY_MILLIS) * ONE_DAY_MILLIS; + if (i == 0) { + toMillis = utcBreakMillis; + } else if (i == 1) { + fromMillis = utcBreakMillis; + } + double intervalFraction = ((double) (toMillis - fromMillis)) + / ((double) intervalLengthMillis); + writtenBytes *= intervalFraction; + } else if (i == 1) { + break; + } + writeOutputLine(fingerprint, "relay", "bytes", "", "", "", + fromMillis, toMillis, writtenBytes); + } + } + } + + private static void parseRelayNetworkStatusConsensus( + RelayNetworkStatusConsensus consensus) throws IOException { + long fromMillis = consensus.getValidAfterMillis(); + long toMillis = consensus.getFreshUntilMillis(); + for (NetworkStatusEntry statusEntry : + consensus.getStatusEntries().values()) { + String fingerprint = statusEntry.getFingerprint(). + toUpperCase(); + if (statusEntry.getFlags().contains("Running")) { + writeOutputLine(fingerprint, "relay", "status", "", "", "", + fromMillis, toMillis, 0.0); + } + } + } + + private static void parseBridgeDescriptors() throws Exception { + DescriptorReader descriptorReader = + DescriptorSourceFactory.createDescriptorReader(); + descriptorReader.setExcludeFiles(new File( + "status/bridge-descriptors")); + descriptorReader.addDirectory(new File( + "in/bridge-descriptors/")); + Iterator<DescriptorFile> descriptorFiles = + descriptorReader.readDescriptors(); + while (descriptorFiles.hasNext()) { + DescriptorFile descriptorFile = descriptorFiles.next(); + for (Descriptor descriptor : descriptorFile.getDescriptors()) { + if (descriptor instanceof ExtraInfoDescriptor) { + parseBridgeExtraInfoDescriptor( + (ExtraInfoDescriptor) descriptor); + } else if (descriptor instanceof BridgeNetworkStatus) { + parseBridgeNetworkStatus((BridgeNetworkStatus) descriptor); + } + } + } + } + + private static void parseBridgeExtraInfoDescriptor( + ExtraInfoDescriptor descriptor) throws IOException { + String fingerprint = descriptor.getFingerprint().toUpperCase(); + long publishedMillis = descriptor.getPublishedMillis(); + long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis(); + long dirreqStatsIntervalLengthMillis = + descriptor.getDirreqStatsIntervalLength() * 1000L; + parseBridgeDirreqV3Resp(fingerprint, publishedMillis, + dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis, + descriptor.getDirreqV3Resp(), + descriptor.getBridgeIps(), + descriptor.getBridgeIpTransports(), + descriptor.getBridgeIpVersions()); + + parseBridgeDirreqWriteHistory(fingerprint, publishedMillis, + descriptor.getDirreqWriteHistory()); + } + + private static void parseBridgeDirreqV3Resp(String fingerprint, + long publishedMillis, long dirreqStatsEndMillis, + long dirreqStatsIntervalLengthMillis, + SortedMap<String, Integer> responses, + SortedMap<String, Integer> bridgeIps, + SortedMap<String, Integer> bridgeIpTransports, + SortedMap<String, Integer> bridgeIpVersions) throws IOException { + if (responses == null || + publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS || + dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) { + /* Cut off all observations that are one week older than + * the descriptor publication time, or we'll have to update + * weeks of aggregate values every hour. */ + return; + } + long statsStartMillis = dirreqStatsEndMillis + - dirreqStatsIntervalLengthMillis; + long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS) + * ONE_DAY_MILLIS; + double resp = ((double) responses.get("ok")) - 4.0; + if (resp > 0.0) { + for (int i = 0; i < 2; i++) { + long fromMillis = i == 0 ? statsStartMillis + : utcBreakMillis; + long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis; + if (fromMillis >= toMillis) { + continue; + } + double intervalFraction = ((double) (toMillis - fromMillis)) + / ((double) dirreqStatsIntervalLengthMillis); + writeOutputLine(fingerprint, "bridge", "responses", "", "", + "", fromMillis, toMillis, resp * intervalFraction); + parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp, + dirreqStatsIntervalLengthMillis, "country", bridgeIps); + parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp, + dirreqStatsIntervalLengthMillis, "transport", + bridgeIpTransports); + parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp, + dirreqStatsIntervalLengthMillis, "version", bridgeIpVersions); + } + } + } + + private static void parseBridgeRespByCategory(String fingerprint, + long fromMillis, long toMillis, double resp, + long dirreqStatsIntervalLengthMillis, String category, + SortedMap<String, Integer> frequencies) throws IOException { + double total = 0.0; + SortedMap<String, Double> frequenciesCopy = + new TreeMap<String, Double>(); + if (frequencies != null) { + for (Map.Entry<String, Integer> e : frequencies.entrySet()) { + if (e.getValue() < 4.0) { + continue; + } + double r = ((double) e.getValue()) - 4.0; + frequenciesCopy.put(e.getKey(), r); + total += r; + } + } + /* If we're not told any frequencies, or at least none of them are + * greater than 4, put in a default that we'll attribute all responses + * to. */ + if (total == 0) { + if (category.equals("country")) { + frequenciesCopy.put("??", 4.0); + } else if (category.equals("transport")) { + frequenciesCopy.put("<OR>", 4.0); + } else if (category.equals("version")) { + frequenciesCopy.put("v4", 4.0); + } + total = 4.0; + } + for (Map.Entry<String, Double> e : frequenciesCopy.entrySet()) { + double intervalFraction = ((double) (toMillis - fromMillis)) + / ((double) dirreqStatsIntervalLengthMillis); + double val = resp * intervalFraction * e.getValue() / total; + if (category.equals("country")) { + writeOutputLine(fingerprint, "bridge", "responses", e.getKey(), + "", "", fromMillis, toMillis, val); + } else if (category.equals("transport")) { + writeOutputLine(fingerprint, "bridge", "responses", "", + e.getKey(), "", fromMillis, toMillis, val); + } else if (category.equals("version")) { + writeOutputLine(fingerprint, "bridge", "responses", "", "", + e.getKey(), fromMillis, toMillis, val); + } + } + } + + private static void parseBridgeDirreqWriteHistory(String fingerprint, + long publishedMillis, BandwidthHistory dirreqWriteHistory) + throws IOException { + if (dirreqWriteHistory == null || + publishedMillis - dirreqWriteHistory.getHistoryEndMillis() + > ONE_WEEK_MILLIS) { + /* Cut off all observations that are one week older than + * the descriptor publication time, or we'll have to update + * weeks of aggregate values every hour. */ + return; + } + long intervalLengthMillis = + dirreqWriteHistory.getIntervalLength() * 1000L; + for (Map.Entry<Long, Long> e : + dirreqWriteHistory.getBandwidthValues().entrySet()) { + long intervalEndMillis = e.getKey(); + long intervalStartMillis = + intervalEndMillis - intervalLengthMillis; + for (int i = 0; i < 2; i++) { + long fromMillis = intervalStartMillis; + long toMillis = intervalEndMillis; + double writtenBytes = (double) e.getValue(); + if (intervalStartMillis / ONE_DAY_MILLIS < + intervalEndMillis / ONE_DAY_MILLIS) { + long utcBreakMillis = (intervalEndMillis + / ONE_DAY_MILLIS) * ONE_DAY_MILLIS; + if (i == 0) { + toMillis = utcBreakMillis; + } else if (i == 1) { + fromMillis = utcBreakMillis; + } + double intervalFraction = ((double) (toMillis - fromMillis)) + / ((double) intervalLengthMillis); + writtenBytes *= intervalFraction; + } else if (i == 1) { + break; + } + writeOutputLine(fingerprint, "bridge", "bytes", "", + "", "", fromMillis, toMillis, writtenBytes); + } + } + } + + private static void parseBridgeNetworkStatus(BridgeNetworkStatus status) + throws IOException { + if (status.getPublishedMillis() % ONE_HOUR_MILLIS + > ONE_HOUR_MILLIS / 2) { + return; + } + long fromMillis = (status.getPublishedMillis() + / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS; + long toMillis = fromMillis + ONE_HOUR_MILLIS; + for (NetworkStatusEntry statusEntry : + status.getStatusEntries().values()) { + String fingerprint = statusEntry.getFingerprint(). + toUpperCase(); + if (statusEntry.getFlags().contains("Running")) { + writeOutputLine(fingerprint, "bridge", "status", "", "", "", + fromMillis, toMillis, 0.0); + } + } + } + + private static Map<String, BufferedWriter> openOutputFiles = + new HashMap<String, BufferedWriter>(); + private static void writeOutputLine(String fingerprint, String node, + String metric, String country, String transport, String version, + long fromMillis, long toMillis, double val) throws IOException { + if (fromMillis > toMillis) { + return; + } + String fromDateTime = formatDateTimeMillis(fromMillis); + String toDateTime = formatDateTimeMillis(toMillis); + BufferedWriter bw = getOutputFile(fromDateTime); + bw.write(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%.1f\n", + fingerprint, node, metric, country, transport, version, + fromDateTime, toDateTime, val)); + } + + private static SimpleDateFormat dateTimeFormat = null; + private static String formatDateTimeMillis(long millis) { + if (dateTimeFormat == null) { + dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTimeFormat.setLenient(false); + dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + } + return dateTimeFormat.format(millis); + } + + private static BufferedWriter getOutputFile(String fromDateTime) + throws IOException { + String outputFileName = isBulkImport + ? "out/userstats-" + fromDateTime.substring(0, 10) + ".sql" + : "out/userstats.sql"; + BufferedWriter bw = openOutputFiles.get(outputFileName); + if (bw == null) { + bw = openOutputFile(outputFileName); + openOutputFiles.put(outputFileName, bw); + } + return bw; + } + + private static BufferedWriter openOutputFile(String outputFileName) + throws IOException { + BufferedWriter bw = new BufferedWriter(new FileWriter( + outputFileName)); + bw.write("BEGIN;\n"); + bw.write("LOCK TABLE imported NOWAIT;\n"); + bw.write("COPY imported (fingerprint, node, metric, country, " + + "transport, version, stats_start, stats_end, val) FROM " + + "stdin;\n"); + return bw; + } + + private static void closeOutputFiles() throws IOException { + for (BufferedWriter bw : openOutputFiles.values()) { + bw.write("\\.\n"); + bw.write("SELECT merge();\n"); + bw.write("SELECT aggregate();\n"); + bw.write("TRUNCATE imported;\n"); + bw.write("COMMIT;\n"); + bw.close(); + } + } +} +
participants (1)
-
karsten@torproject.org