[tor-commits] [metrics-tasks/master] Add new user counting code (#8462).

karsten at torproject.org karsten at torproject.org
Wed Apr 24 07:00:29 UTC 2013


commit 138b6c492bb4eddb7793a0740c2199bed7ac55fc
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Wed Apr 24 08:54:35 2013 +0200

    Add new user counting code (#8462).
---
 task-8462/.gitignore         |    7 +
 task-8462/README             |   50 ++++
 task-8462/init-userstats.sql |  573 ++++++++++++++++++++++++++++++++++++++++++
 task-8462/run-userstats.sh   |   17 ++
 task-8462/src/Parse.java     |  449 +++++++++++++++++++++++++++++++++
 5 files changed, 1096 insertions(+), 0 deletions(-)

diff --git a/task-8462/.gitignore b/task-8462/.gitignore
new file mode 100644
index 0000000..4c1e6ed
--- /dev/null
+++ b/task-8462/.gitignore
@@ -0,0 +1,7 @@
+in/
+bin/
+lib/
+out/
+status/
+*.csv
+
diff --git a/task-8462/README b/task-8462/README
new file mode 100644
index 0000000..9547efa
--- /dev/null
+++ b/task-8462/README
@@ -0,0 +1,50 @@
++------------------------------------------------------------------------+
+|     An implementation of the user counting algorithm suggested in      |
+|   Tor Tech Report 2012-10-001 for later integration with metrics-web   |
++------------------------------------------------------------------------+
+
+Instructions (for Debian Squeeze):
+
+Install Java 6 for descriptor parsing and PostgreSQL 8.4 for descriptor
+data storage and aggregation:
+
+  $ sudo apt-get install openjdk-6-jdk postgresql-8.4
+
+Create a database user and database:
+
+  $ sudo -u postgres createuser -P karsten
+  $ sudo -u postgres createdb -O karsten userstats
+  $ echo "password" > ~/.pgpass
+  $ chmod 0600 ~/.pgpass
+  $ psql -f init-userstats.sql userstats
+
+Create empty bin/, lib/, in/, status/, and out/ directories.
+
+Put required .jar files into the lib/ directory.  See metrics-lib.git for
+instructions:
+
+  - lib/commons-codec-1.6.jar
+  - lib/commons-compress-1.4.1.jar
+  - lib/descriptor.jar
+
+Run the run-userstats.sh script:
+
+  $ ./run-userstats.sh
+
+Be patient.
+
+Advanced stuff: the database can also be initialized using descriptor
+archives available at https://metrics.torproject.org/data.html.  Only
+relay consensuses, relay extra-info descriptors, and bridge descriptors
+are required.  Put them into the following directories, ideally after
+decompressing (but not extracting them) using bunzip2:
+
+  - in/relay-descriptors/    (consensuses-*.tar and extra-infos-*.tar)
+  - in/bridge-descriptors/   (bridge-descriptors-*.tar)
+
+Also comment out the rsync command in run-userstats.sh.  Then run
+run-userstats.sh.  After initializing the database, clean up the in/ and
+out/ directory and don't forget to put back the rsync command in
+run-userstats.sh.  It may be easier to set up separate instances of this
+tool for initializing the database and for running it on a regular basis.
+
diff --git a/task-8462/init-userstats.sql b/task-8462/init-userstats.sql
new file mode 100644
index 0000000..c586285
--- /dev/null
+++ b/task-8462/init-userstats.sql
@@ -0,0 +1,573 @@
+-- Copyright 2013 The Tor Project
+-- See LICENSE for licensing information
+
+-- Use enum types for dimensions that may only change if we write new code
+-- to support them.  For example, if there's a new node type beyond relay
+-- and bridge, we'll have to write code to support it.  This is in
+-- contrast to dimensions like country, transport, or version which don't
+-- have their possible values hard-coded anywhere.
+CREATE TYPE node AS ENUM ('relay', 'bridge');
+CREATE TYPE metric AS ENUM ('responses', 'bytes', 'status');
+
+-- All new data first goes into the imported table.  The import tool
+-- should do some trivial checks for invalid or duplicate data, but
+-- ultimately, we're going to do these checks in the database.  For
+-- example, the import tool could avoid importing data from the same
+-- descriptor more than once, but it's fine to import the same history
+-- string from distinct descriptors multiple times.  The import tool must,
+-- however, make sure that stats_end is not greater than 00:00:00 of the
+-- day following stats_start.  There are no constraints set on this table,
+-- because importing data should be really, really fast.  Once the newly
+-- imported data is successfully processed, the imported table is emptied.
+CREATE TABLE imported (
+
+  -- The 40-character upper-case hex string identifies a descriptor
+  -- uniquely and is used to join metrics (responses, bytes, status)
+  -- published by the same node (relay or bridge).
+  fingerprint CHARACTER(40) NOT NULL,
+
+  -- The node type is used to decide the statistics that this entry will
+  -- be part of.
+  node node NOT NULL,
+
+  -- The metric of this entry describes the stored observation type.
+  -- We'll want to store different metrics published by a node:
+  -- - 'responses' are the number of v3 network status consensus requests
+  --   that the node responded to;
+  -- - 'bytes' are the number of bytes that the node wrote when answering
+  --   directory requests;
+  -- - 'status' are the intervals when the node was listed as running in
+  --   the network status published by either the directory authorities or
+  --   bridge authority.
+  metric metric NOT NULL,
+
+  -- The two-letter lower-case country code that the observation in this
+  -- entry can be attributed to; can be '??' if no country information is
+  -- known for this entry, or '' (empty string) if this entry summarizes
+  -- observations for all countries.
+  country CHARACTER VARYING(2) NOT NULL,
+
+  -- The pluggable transport name that the observation in this entry can
+  -- be attributed to; can be '<OR>' if no pluggable transport was used,
+  -- '<??>' if an unknown pluggable transport was used, or '' (empty
+  -- string) if this entry summarizes observations for all transports.
+  transport CHARACTER VARYING(20) NOT NULL,
+
+  -- The IP address version that the observation in this entry can be
+  -- attributed to; can be 'v4' or 'v6' or '' (empty string) if this entry
+  -- summarizes observations for all IP address versions.
+  version CHARACTER VARYING(2) NOT NULL,
+
+  -- The interval start of this observation.
+  stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+
+  -- The interval end of this observation.  This timestamp must be greater
+  -- than stats_start and must not be greater than 00:00:00 of the day
+  -- following stats_start, which the import tool must make sure.
+  stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+
+  -- Finally, the observed value.
+  val DOUBLE PRECISION NOT NULL
+);
+
+-- After importing new data into the imported table, they are merged into
+-- the merged table using the merge() function.  The merged table contains
+-- the same data as the imported table, except:
+-- (1) there are no duplicate or overlapping entries in the merged table
+--     with respect to stats_start and stats_end and the same fingerprint,
+--     node, metric, country, transport, and version columns;
+-- (2) all subsequent intervals with the same node, metric, country,
+--     transport, version, and stats_start date are compressed into a
+--     single entry.
+CREATE TABLE merged (
+
+  -- The unique key that is only used when merging newly imported data
+  -- into this table.
+  id SERIAL PRIMARY KEY,
+
+  -- All other columns have the same meaning as in the imported table.
+  fingerprint CHARACTER(40) NOT NULL,
+  node node NOT NULL,
+  metric metric NOT NULL,
+  country CHARACTER VARYING(2) NOT NULL,
+  transport CHARACTER VARYING(20) NOT NULL,
+  version CHARACTER VARYING(2) NOT NULL,
+  stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+  stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+  val DOUBLE PRECISION NOT NULL
+);
+
+-- After merging new data into the merged table, they are aggregated to
+-- daily user number estimates using the aggregate() function.  Only dates
+-- with new data in the imported table will be recomputed in the
+-- aggregated table.  The aggregated components follow the algorithm
+-- proposed in Tor Tech Report 2012-10-001.
+CREATE TABLE aggregated (
+
+  -- The date of these aggregated observations.
+  date DATE NOT NULL,
+
+  -- The node, country, transport, and version columns all have the same
+  -- meaning as in the imported table.
+  node node NOT NULL,
+  country CHARACTER VARYING(2) NOT NULL DEFAULT '',
+  transport CHARACTER VARYING(20) NOT NULL DEFAULT '',
+  version CHARACTER VARYING(2) NOT NULL DEFAULT '',
+
+  -- Total number of reported responses, possibly broken down by country,
+  -- transport, or version if either of them is not ''.  See r(R) in the
+  -- tech report.
+  rrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Total number of seconds of nodes reporting responses, possibly broken
+  -- down by country, transport, or version if either of them is not ''.
+  -- This would be referred to as n(R) in the tech report, though it's not
+  -- used there.
+  nrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Total number of reported bytes.  See h(H) in the tech report.
+  hh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Total number of seconds of nodes in the status.  See n(N) in the tech
+  -- report.
+  nn DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Number of reported bytes of nodes that reported both responses and
+  -- bytes.  See h(R intersect H) in the tech report.
+  hrh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Number of seconds of nodes reporting bytes.  See n(H) in the tech
+  -- report.
+  nh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+  -- Number of seconds of nodes reporting responses but no bytes.  See
+  -- n(R \ H) in the tech report.
+  nrh DOUBLE PRECISION NOT NULL DEFAULT 0
+);
+
+CREATE LANGUAGE plpgsql;
+
+-- Merge new entries from the imported table into the merged table, and
+-- compress them while doing so.  This function first executes a query to
+-- match all entries in the imported table with adjacent or even
+-- overlapping entries in the merged table.  It then loops over query
+-- results and either inserts or updates entries in the merged table.  The
+-- idea is to leave query optimization to the database and only touch
+-- as few entries as possible while running this function.
+CREATE OR REPLACE FUNCTION merge() RETURNS VOID AS $$
+DECLARE
+
+  -- The current record that we're handling in the loop body.
+  cur RECORD;
+
+  -- Various information about the last record we processed, so that we
+  -- can merge the current record with the last one if possible.
+  last_fingerprint CHARACTER(40) := NULL;
+  last_node node;
+  last_metric metric;
+  last_country CHARACTER VARYING(2);
+  last_transport CHARACTER VARYING(20);
+  last_version CHARACTER VARYING(2);
+  last_start TIMESTAMP WITHOUT TIME ZONE;
+  last_end TIMESTAMP WITHOUT TIME ZONE;
+  last_id INTEGER;
+  last_val DOUBLE PRECISION;
+
+  -- Interval end and value of the last record before updating them in the
+  -- last loop step.  In a few edge cases, we may update an entry and
+  -- learn in the next loop step that the updated entry overlaps with the
+  -- subsequent entry.  In these cases we'll have to undo the update,
+  -- which is why we're storing the updated values.
+  undo_end TIMESTAMP WITHOUT TIME ZONE;
+  undo_val DOUBLE PRECISION;
+
+BEGIN
+  RAISE NOTICE '% Starting to merge.', timeofday();
+
+  -- TODO Maybe we'll have to materialize a merged_part table that only
+  -- contains dates IN (SELECT DISTINCT DATE(stats_start) FROM imported)
+  -- and use that in the query below.
+
+  -- Loop over results from a query that joins new entries in the imported
+  -- table with existing entries in the merged table.
+  FOR cur IN SELECT DISTINCT
+
+    -- Select id, interval start and end, and value of the existing entry
+    -- in merged; all these fields may be null if the imported entry is
+    -- not adjacent to an existing one.
+    merged.id AS merged_id,
+    merged.stats_start AS merged_start,
+    merged.stats_end AS merged_end,
+    merged.val AS merged_val,
+
+    -- Select interval start and end and value of the newly imported
+    -- entry.
+    imported.stats_start AS imported_start,
+    imported.stats_end AS imported_end,
+    imported.val AS imported_val,
+
+    -- Select columns that define the group of entries that can be merged
+    -- in the merged table.
+    imported.fingerprint AS fingerprint,
+    imported.node AS node,
+    imported.metric AS metric,
+    imported.country AS country,
+    imported.transport AS transport,
+    imported.version AS version
+
+    -- Select these columns from all entries in the imported table, plus
+    -- do an outer join on the merged table to find adjacent entries that
+    -- we might want to merge the new entries with.  It's possible that we
+    -- handle the same imported entry twice, if it starts directly after
+    -- one existing entry and ends directly before another existing entry.
+    FROM imported LEFT JOIN merged
+
+    -- First two join conditions are to find adjacent intervals.  In fact,
+    -- we also include overlapping intervals here, so that we can skip the
+    -- overlapping entry in the imported table.
+    ON imported.stats_end >= merged.stats_start AND
+       imported.stats_start <= merged.stats_end AND
+
+       -- Further join conditions are same date, fingerprint, node, etc.,
+       -- so that we don't merge entries that don't belong together.
+       DATE(imported.stats_start) = DATE(merged.stats_start) AND
+       imported.fingerprint = merged.fingerprint AND
+       imported.node = merged.node AND
+       imported.metric = merged.metric AND
+       imported.country = merged.country AND
+       imported.transport = merged.transport AND
+       imported.version = merged.version
+
+    -- Ordering is key, or our approach to merge subsequent entries is
+    -- going to break.
+    ORDER BY imported.fingerprint, imported.node, imported.metric,
+             imported.country, imported.transport, imported.version,
+             imported.stats_start, merged.stats_start, imported.stats_end
+
+  -- Now go through the results one by one.
+  LOOP
+
+    -- Log that we're done with the query and about to start merging.
+    IF last_fingerprint IS NULL THEN
+      RAISE NOTICE '% Query returned, now merging entries.', timeofday();
+    END IF;
+
+    -- If we're processing the very first entry or if we have reached a
+    -- new group of entries that belong together, (re-)set last_*
+    -- variables.
+    IF last_fingerprint IS NULL OR
+        DATE(cur.imported_start) <> DATE(last_start) OR
+        cur.fingerprint <> last_fingerprint OR
+        cur.node <> last_node OR
+        cur.metric <> last_metric OR
+        cur.country <> last_country OR
+        cur.transport <> last_transport OR
+        cur.version <> last_version THEN
+      last_id := -1;
+      last_start := '1970-01-01 00:00:00';
+      last_end := '1970-01-01 00:00:00';
+      last_val := -1;
+    END IF;
+
+    -- Remember all fields that determine the group of which entries
+    -- belong together.
+    last_fingerprint := cur.fingerprint;
+    last_node := cur.node;
+    last_metric := cur.metric;
+    last_country := cur.country;
+    last_transport := cur.transport;
+    last_version := cur.version;
+
+    -- If the existing entry that we're currently looking at starts before
+    -- the previous entry ends, we have created two overlapping entries in
+    -- the last iteration, and that is not allowed.  Undo the previous
+    -- change.
+    IF cur.merged_start IS NOT NULL AND
+        cur.merged_start < last_end AND
+        undo_end IS NOT NULL AND undo_val IS NOT NULL THEN
+      UPDATE merged SET stats_end = undo_end, val = undo_val
+        WHERE id = last_id;
+      undo_end := NULL;
+      undo_val := NULL;
+
+    -- If there is no adjacent entry to the one we're about to merge,
+    -- insert it as new entry.
+    ELSIF cur.merged_end IS NULL THEN
+      IF cur.imported_start > last_end THEN
+        last_start := cur.imported_start;
+        last_end := cur.imported_end;
+        last_val := cur.imported_val;
+        INSERT INTO merged (fingerprint, node, metric, country, transport,
+                            version, stats_start, stats_end, val)
+          VALUES (last_fingerprint, last_node, last_metric, last_country,
+                  last_transport, last_version, last_start, last_end,
+                  last_val)
+          RETURNING id INTO last_id;
+
+      -- If there was no adjacent entry before starting to merge, but
+      -- there is now one ending right before the new entry starts, merge
+      -- the new entry into the existing one.
+      ELSIF cur.imported_start = last_end THEN
+        last_val := last_val + cur.imported_val;
+        last_end := cur.imported_end;
+        UPDATE merged SET stats_end = last_end, val = last_val
+          WHERE id = last_id;
+      END IF;
+
+      -- There's no risk of this entry overlapping with the next.
+      undo_end := NULL;
+      undo_val := NULL;
+
+    -- If the new entry ends right when an existing entry starts, but
+    -- there's a gap between when the previously processed entry ends and
+    -- when the new entry starts, merge the new entry with the existing
+    -- entry we're currently looking at.
+    ELSIF cur.imported_end = cur.merged_start THEN
+      IF cur.imported_start > last_end THEN
+        last_id := cur.merged_id;
+        last_start := cur.imported_start;
+        last_end := cur.merged_end;
+        last_val := cur.imported_val + cur.merged_val;
+        UPDATE merged SET stats_start = last_start, val = last_val
+          WHERE id = last_id;
+
+      -- If the new entry ends right when an existing entry starts and
+      -- there's no gap between when the previousl processed entry ends
+      -- and when the new entry starts, merge the new entry with the other
+      -- two entries.  This happens by deleting the previous entry and
+      -- expanding the subsequent entry to cover all three entries.
+      ELSIF cur.imported_start = last_end THEN
+        DELETE FROM merged WHERE id = last_id;
+        last_id := cur.merged_id;
+        last_end := cur.merged_end;
+        last_val := last_val + cur.merged_val;
+        UPDATE merged SET stats_start = last_start, val = last_val
+          WHERE id = last_id;
+      END IF;
+
+      -- There's no risk of this entry overlapping with the next.
+      undo_end := NULL;
+      undo_val := NULL;
+
+    -- If the new entry starts right when an existing entry ends, but
+    -- there's a gap between the previously processed entry and the
+    -- existing one, extend the existing entry.  There's a special case
+    -- when this operation is false and must be undone, which is when the
+    -- newly added entry overlaps with the subsequent entry.  That's why
+    -- we have to store the old interval end and value, so that this
+    -- operation can be undone in the next loop iteration.
+    ELSIF cur.imported_start = cur.merged_end THEN
+      IF last_end < cur.imported_start THEN
+        undo_end := cur.merged_end;
+        undo_val := cur.merged_val;
+        last_id := cur.merged_id;
+        last_start := cur.merged_start;
+        last_end := cur.imported_end;
+        last_val := cur.merged_val + cur.imported_val;
+        UPDATE merged SET stats_end = last_end, val = last_val
+          WHERE id = last_id;
+
+      -- If the new entry starts right when an existing entry ends and
+      -- there's no gap between the previously processed entry and the
+      -- existing entry, extend the existing entry.  This is very similar
+      -- to the previous case.  The same reasoning about possibly having
+      -- to undo this operation applies.
+      ELSE
+        undo_end := cur.merged_end;
+        undo_val := last_val;
+        last_end := cur.imported_end;
+        last_val := last_val + cur.imported_val;
+        UPDATE merged SET stats_end = last_end, val = last_val
+          WHERE id = last_id;
+      END IF;
+
+    -- If none of the cases above applies, there must have been an overlap
+    -- between the new entry and an existing one.  Skip the new entry.
+    ELSE
+      last_id := cur.merged_id;
+      last_start := cur.merged_start;
+      last_end := cur.merged_end;
+      last_val := cur.merged_val;
+    END IF;
+  END LOOP;
+
+  -- That's it, we're done merging.
+  RAISE NOTICE '% Finishing merge.', timeofday();
+  RETURN;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Aggregate user estimates for all dates that have updated entries in the
+-- merged table.  This function first creates a temporary table with
+-- new or updated observations, then removes all existing estimates for
+-- the dates to be updated, and finally inserts newly computed aggregates
+-- for these dates.
+CREATE OR REPLACE FUNCTION aggregate() RETURNS VOID AS $$
+BEGIN
+  RAISE NOTICE '% Starting aggregate step.', timeofday();
+
+  -- Create a new temporary table containing all relevant information
+  -- needed to update the aggregated table.  In this table, we sum up all
+  -- observations of a given type by reporting node.  This query is
+  -- (temporarily) materialized, because we need to combine its entries
+  -- multiple times in various ways.  A (non-materialized) view would have
+  -- meant to re-compute this query multiple times.
+  CREATE TEMPORARY TABLE update AS
+    SELECT fingerprint, node, metric, country, transport, version,
+           DATE(stats_start), SUM(val) AS val,
+           SUM(CAST(EXTRACT(EPOCH FROM stats_end - stats_start)
+               AS DOUBLE PRECISION)) AS seconds
+    FROM merged
+    WHERE DATE(stats_start) IN (
+          SELECT DISTINCT DATE(stats_start) FROM imported)
+    GROUP BY fingerprint, node, metric, country, transport, version,
+             DATE(stats_start);
+
+  -- Delete all entries from the aggregated table that we're about to
+  -- re-compute.
+  DELETE FROM aggregated WHERE date IN (SELECT DISTINCT date FROM update);
+
+  -- Insert partly empty results for all existing combinations of date,
+  -- node ('relay' or 'bridge'), country, transport, and version.  Only
+  -- the rrx and nrx fields will contain number and seconds of reported
+  -- responses for the given combination of date, node, etc., while the
+  -- other fields will be updated below.
+  INSERT INTO aggregated (date, node, country, transport, version, rrx,
+      nrx)
+    SELECT date, node, country, transport, version, SUM(val) AS rrx,
+    SUM(seconds) AS nrx
+    FROM update WHERE metric = 'responses'
+    GROUP BY date, node, country, transport, version;
+
+  -- Create another temporary table with only those entries that aren't
+  -- broken down by any dimension.  This table is much smaller, so the
+  -- following operations are much faster.
+  CREATE TEMPORARY TABLE update_no_dimensions AS
+    SELECT fingerprint, node, metric, date, val, seconds FROM update
+    WHERE country = ''
+    AND transport = ''
+    AND version = '';
+
+  -- Update results in the aggregated table by setting aggregates based
+  -- on reported directory bytes.  These aggregates are only based on
+  -- date and node, so that the same values are set for all combinations
+  -- of country, transport, and version.
+  UPDATE aggregated
+    SET hh = aggregated_bytes.hh, nh = aggregated_bytes.nh
+    FROM (
+      SELECT date, node, SUM(val) AS hh, SUM(seconds) AS nh
+      FROM update_no_dimensions
+      WHERE metric = 'bytes'
+      GROUP BY date, node
+    ) aggregated_bytes
+    WHERE aggregated.date = aggregated_bytes.date
+    AND aggregated.node = aggregated_bytes.node;
+
+  -- Update results based on nodes being contained in the network status.
+  UPDATE aggregated
+    SET nn = aggregated_status.nn
+    FROM (
+      SELECT date, node, SUM(seconds) AS nn
+      FROM update_no_dimensions
+      WHERE metric = 'status'
+      GROUP BY date, node
+    ) aggregated_status
+    WHERE aggregated.date = aggregated_status.date
+    AND aggregated.node = aggregated_status.node;
+
+  -- Update results based on nodes reporting both bytes and responses.
+  UPDATE aggregated
+    SET hrh = aggregated_bytes_responses.hrh
+    FROM (
+      SELECT bytes.date, bytes.node,
+             SUM((LEAST(bytes.seconds, responses.seconds)
+                 * bytes.val) / bytes.seconds) AS hrh
+      FROM update_no_dimensions bytes
+      LEFT JOIN update_no_dimensions responses
+      ON bytes.date = responses.date
+      AND bytes.fingerprint = responses.fingerprint
+      AND bytes.node = responses.node
+      WHERE bytes.metric = 'bytes'
+      AND responses.metric = 'responses'
+      GROUP BY bytes.date, bytes.node
+    ) aggregated_bytes_responses
+    WHERE aggregated.date = aggregated_bytes_responses.date
+    AND aggregated.node = aggregated_bytes_responses.node;
+
+  -- Update results based on notes reporting responses but no bytes.
+  UPDATE aggregated
+    SET nrh = aggregated_responses_bytes.nrh
+    FROM (
+      SELECT responses.date, responses.node,
+             SUM(GREATEST(0, responses.seconds
+                             - COALESCE(bytes.seconds, 0))) AS nrh
+      FROM update_no_dimensions responses
+      LEFT JOIN update_no_dimensions bytes
+      ON responses.date = bytes.date
+      AND responses.fingerprint = bytes.fingerprint
+      AND responses.node = bytes.node
+      WHERE responses.metric = 'responses'
+      AND bytes.metric = 'bytes'
+      GROUP BY responses.date, responses.node
+    ) aggregated_responses_bytes
+    WHERE aggregated.date = aggregated_responses_bytes.date
+    AND aggregated.node = aggregated_responses_bytes.node;
+
+  -- We're done aggregating new data.
+  RAISE NOTICE '% Finishing aggregate step.', timeofday();
+  RETURN;
+END;
+$$ LANGUAGE plpgsql;
+
+-- User-friendly view on the aggregated table that implements the
+-- algorithm proposed in Tor Tech Report 2012-10-001.  This view returns
+-- user number estimates for both relay and bridge staistics, possibly
+-- broken down by country or transport or version.
+CREATE OR REPLACE VIEW estimated AS SELECT
+
+  -- The date of this user number estimate.
+  a.date,
+
+  -- The node type, which is either 'relay' or 'bridge'.
+  a.node,
+
+  -- The two-letter lower-case country code of this estimate; can be '??'
+  -- for an estimate of users that could not be resolved to any country,
+  -- or '' (empty string) for an estimate of all users, regardless of
+  -- country.
+  a.country,
+
+  -- The pluggable transport name of this estimate; can be '<OR>' for an
+  -- estimate of users that did not use any pluggable transport, '<??>'
+  -- for unknown pluggable transports, or '' (empty string) for an
+  -- estimate of all users, regardless of transport.
+  a.transport,
+
+  -- The IP address version of this estimate; can be 'v4' or 'v6', or ''
+  -- (empty string) for an estimate of all users, regardless of IP address
+  -- version.
+  a.version,
+
+  -- Estimated fraction of nodes reporting directory requests, which is
+  -- used to extrapolate observed requests to estimated total requests in
+  -- the network.  The closer this fraction is to 1.0, the more precise
+  -- the estimation.
+  CAST(a.frac * 100 AS INTEGER) AS frac,
+
+  -- Finally, the estimate number of users.
+  CAST(a.rrx / (a.frac * 10) AS INTEGER) AS users
+
+  -- Implement the estimation method in a subquery, so that the ugly
+  -- formula only has to be written once.
+  FROM (
+    SELECT date, node, country, transport, version, rrx, nrx,
+           (hrh * nh + hh * nrh) / (hh * nn) AS frac
+    FROM aggregated WHERE hh * nn > 0.0) a
+
+  -- Only include estimates with at least 10% of nodes reporting directory
+  -- request statistics.
+  WHERE a.frac BETWEEN 0.1 AND 1.0
+
+  -- Order results.
+  ORDER BY date DESC, node, version, transport, country;
+
diff --git a/task-8462/run-userstats.sh b/task-8462/run-userstats.sh
new file mode 100644
index 0000000..9a759ee
--- /dev/null
+++ b/task-8462/run-userstats.sh
@@ -0,0 +1,17 @@
+#!/bin/sh
+set -e
+echo `date` "Starting."
+echo `date` "Downloading descriptors."
+rsync -arz --delete --exclude 'relay-descriptors/votes' metrics.torproject.org::metrics-recent in
+echo `date` "Parsing descriptors."
+javac -d bin/ -cp lib/commons-codec-1.6.jar:lib/commons-compress-1.4.1.jar:lib/descriptor.jar src/Parse.java
+java -cp bin/:lib/commons-codec-1.6.jar:lib/commons-compress-1.4.1.jar:lib/descriptor.jar Parse
+for i in $(ls out/*.sql)
+do
+  echo `date` "Importing $i."
+  psql -f $i userstats
+done
+echo `date` "Exporting results."
+psql -c 'COPY (SELECT * FROM estimated) TO STDOUT WITH CSV HEADER;' userstats > userstats.csv
+echo `date` "Terminating."
+
diff --git a/task-8462/src/Parse.java b/task-8462/src/Parse.java
new file mode 100644
index 0000000..fdf9bf2
--- /dev/null
+++ b/task-8462/src/Parse.java
@@ -0,0 +1,449 @@
+/* Copyright 2013 The Tor Project
+ * See LICENSE for licensing information */
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.Stack;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+import org.torproject.descriptor.BandwidthHistory;
+import org.torproject.descriptor.BridgeNetworkStatus;
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorFile;
+import org.torproject.descriptor.DescriptorReader;
+import org.torproject.descriptor.DescriptorSourceFactory;
+import org.torproject.descriptor.ExtraInfoDescriptor;
+import org.torproject.descriptor.NetworkStatusEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
+
+public class Parse {
+
+  public static void main(String[] args) throws Exception {
+    detectBulkOrRegular();
+    parseRelayDescriptors();
+    parseBridgeDescriptors();
+    closeOutputFiles();
+  }
+
+  private static boolean isBulkImport = false;
+  private static void detectBulkOrRegular() {
+    Stack<File> inFiles = new Stack<File>();
+    inFiles.add(new File("in"));
+    while (!inFiles.isEmpty()) {
+      File file = inFiles.pop();
+      if (file.isDirectory()) {
+        inFiles.addAll(Arrays.asList(file.listFiles()));
+      } else if (file.getName().endsWith(".tar") ||
+          file.getName().endsWith(".tar.bz2")) {
+        isBulkImport = true;
+        break;
+      } else {
+        isBulkImport = false;
+        break;
+      }
+    }
+  }
+
+  private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L,
+      ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS,
+      ONE_WEEK_MILLIS = 7L * ONE_DAY_MILLIS;
+
+  private static void parseRelayDescriptors() throws Exception {
+    DescriptorReader descriptorReader =
+        DescriptorSourceFactory.createDescriptorReader();
+    descriptorReader.setExcludeFiles(new File(
+        "status/relay-descriptors"));
+    descriptorReader.addDirectory(new File("in/relay-descriptors/"));
+    Iterator<DescriptorFile> descriptorFiles =
+        descriptorReader.readDescriptors();
+    while (descriptorFiles.hasNext()) {
+      DescriptorFile descriptorFile = descriptorFiles.next();
+      for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+        if (descriptor instanceof ExtraInfoDescriptor) {
+          parseRelayExtraInfoDescriptor((ExtraInfoDescriptor) descriptor);
+        } else if (descriptor instanceof RelayNetworkStatusConsensus) {
+          parseRelayNetworkStatusConsensus(
+              (RelayNetworkStatusConsensus) descriptor);
+        }
+      }
+    }
+  }
+
+  private static void parseRelayExtraInfoDescriptor(
+      ExtraInfoDescriptor descriptor) throws IOException {
+    long publishedMillis = descriptor.getPublishedMillis();
+    String fingerprint = descriptor.getFingerprint().
+        toUpperCase();
+    long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
+    long dirreqStatsIntervalLengthMillis =
+        descriptor.getDirreqStatsIntervalLength() * 1000L;
+    SortedMap<String, Integer> requests = descriptor.getDirreqV3Reqs();
+    BandwidthHistory dirreqWriteHistory =
+        descriptor.getDirreqWriteHistory();
+    parseRelayDirreqV3Reqs(fingerprint, publishedMillis,
+        dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis, requests);
+    parseRelayDirreqWriteHistory(fingerprint, publishedMillis,
+        dirreqWriteHistory);
+  }
+
+  private static void parseRelayDirreqV3Reqs(String fingerprint,
+      long publishedMillis, long dirreqStatsEndMillis,
+      long dirreqStatsIntervalLengthMillis,
+      SortedMap<String, Integer> requests) throws IOException {
+    if (requests == null ||
+        publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS ||
+        dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) {
+      /* Cut off all observations that are one week older than
+       * the descriptor publication time, or we'll have to update
+       * weeks of aggregate values every hour. */
+      return;
+    }
+    long statsStartMillis = dirreqStatsEndMillis
+        - dirreqStatsIntervalLengthMillis;
+    long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
+        * ONE_DAY_MILLIS;
+    for (int i = 0; i < 2; i++) {
+      long fromMillis = i == 0 ? statsStartMillis
+          : utcBreakMillis;
+      long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
+      if (fromMillis >= toMillis) {
+        continue;
+      }
+      double intervalFraction =  ((double) (toMillis - fromMillis))
+          / ((double) dirreqStatsIntervalLengthMillis);
+      double sum = 0L;
+      for (Map.Entry<String, Integer> e : requests.entrySet()) {
+        String country = e.getKey();
+        double reqs = ((double) e.getValue()) - 4.0;
+        sum += reqs;
+        writeOutputLine(fingerprint, "relay", "responses", country,
+            "", "", fromMillis, toMillis, reqs * intervalFraction);
+      }
+      writeOutputLine(fingerprint, "relay", "responses", "", "",
+          "", fromMillis, toMillis, sum * intervalFraction);
+    }
+  }
+
+  private static void parseRelayDirreqWriteHistory(String fingerprint,
+      long publishedMillis, BandwidthHistory dirreqWriteHistory)
+      throws IOException {
+    if (dirreqWriteHistory == null ||
+        publishedMillis - dirreqWriteHistory.getHistoryEndMillis()
+        > ONE_WEEK_MILLIS) {
+      return;
+      /* Cut off all observations that are one week older than
+       * the descriptor publication time, or we'll have to update
+       * weeks of aggregate values every hour. */
+    }
+    long intervalLengthMillis =
+        dirreqWriteHistory.getIntervalLength() * 1000L;
+    for (Map.Entry<Long, Long> e :
+        dirreqWriteHistory.getBandwidthValues().entrySet()) {
+      long intervalEndMillis = e.getKey();
+      long intervalStartMillis =
+          intervalEndMillis - intervalLengthMillis;
+      for (int i = 0; i < 2; i++) {
+        long fromMillis = intervalStartMillis;
+        long toMillis = intervalEndMillis;
+        double writtenBytes = (double) e.getValue();
+        if (intervalStartMillis / ONE_DAY_MILLIS <
+            intervalEndMillis / ONE_DAY_MILLIS) {
+          long utcBreakMillis = (intervalEndMillis
+              / ONE_DAY_MILLIS) * ONE_DAY_MILLIS;
+          if (i == 0) {
+            toMillis = utcBreakMillis;
+          } else if (i == 1) {
+            fromMillis = utcBreakMillis;
+          }
+          double intervalFraction = ((double) (toMillis - fromMillis))
+              / ((double) intervalLengthMillis);
+          writtenBytes *= intervalFraction;
+        } else if (i == 1) {
+          break;
+        }
+        writeOutputLine(fingerprint, "relay", "bytes", "", "", "",
+            fromMillis, toMillis, writtenBytes);
+      }
+    }
+  }
+
+  private static void parseRelayNetworkStatusConsensus(
+      RelayNetworkStatusConsensus consensus) throws IOException {
+    long fromMillis = consensus.getValidAfterMillis();
+    long toMillis = consensus.getFreshUntilMillis();
+    for (NetworkStatusEntry statusEntry :
+        consensus.getStatusEntries().values()) {
+      String fingerprint = statusEntry.getFingerprint().
+          toUpperCase();
+      if (statusEntry.getFlags().contains("Running")) {
+        writeOutputLine(fingerprint, "relay", "status", "", "", "",
+            fromMillis, toMillis, 0.0);
+      }
+    }
+  }
+
+  private static void parseBridgeDescriptors() throws Exception {
+    DescriptorReader descriptorReader =
+        DescriptorSourceFactory.createDescriptorReader();
+    descriptorReader.setExcludeFiles(new File(
+        "status/bridge-descriptors"));
+    descriptorReader.addDirectory(new File(
+        "in/bridge-descriptors/"));
+    Iterator<DescriptorFile> descriptorFiles =
+        descriptorReader.readDescriptors();
+    while (descriptorFiles.hasNext()) {
+      DescriptorFile descriptorFile = descriptorFiles.next();
+      for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+        if (descriptor instanceof ExtraInfoDescriptor) {
+          parseBridgeExtraInfoDescriptor(
+              (ExtraInfoDescriptor) descriptor);
+        } else if (descriptor instanceof BridgeNetworkStatus) {
+          parseBridgeNetworkStatus((BridgeNetworkStatus) descriptor);
+        }
+      }
+    }
+  }
+
+  private static void parseBridgeExtraInfoDescriptor(
+      ExtraInfoDescriptor descriptor) throws IOException {
+    String fingerprint = descriptor.getFingerprint().toUpperCase();
+    long publishedMillis = descriptor.getPublishedMillis();
+    long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
+    long dirreqStatsIntervalLengthMillis =
+        descriptor.getDirreqStatsIntervalLength() * 1000L;
+    parseBridgeDirreqV3Resp(fingerprint, publishedMillis,
+        dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis,
+        descriptor.getDirreqV3Resp(),
+        descriptor.getBridgeIps(),
+        descriptor.getBridgeIpTransports(),
+        descriptor.getBridgeIpVersions());
+
+    parseBridgeDirreqWriteHistory(fingerprint, publishedMillis,
+          descriptor.getDirreqWriteHistory());
+  }
+
+  private static void parseBridgeDirreqV3Resp(String fingerprint,
+      long publishedMillis, long dirreqStatsEndMillis,
+      long dirreqStatsIntervalLengthMillis,
+      SortedMap<String, Integer> responses,
+      SortedMap<String, Integer> bridgeIps,
+      SortedMap<String, Integer> bridgeIpTransports,
+      SortedMap<String, Integer> bridgeIpVersions) throws IOException {
+    if (responses == null ||
+        publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS ||
+        dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) {
+      /* Cut off all observations that are one week older than
+       * the descriptor publication time, or we'll have to update
+       * weeks of aggregate values every hour. */
+      return;
+    }
+    long statsStartMillis = dirreqStatsEndMillis
+        - dirreqStatsIntervalLengthMillis;
+    long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
+        * ONE_DAY_MILLIS;
+    double resp = ((double) responses.get("ok")) - 4.0;
+    if (resp > 0.0) {
+      for (int i = 0; i < 2; i++) {
+        long fromMillis = i == 0 ? statsStartMillis
+            : utcBreakMillis;
+        long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
+        if (fromMillis >= toMillis) {
+          continue;
+        }
+        double intervalFraction = ((double) (toMillis - fromMillis))
+            / ((double) dirreqStatsIntervalLengthMillis);
+        writeOutputLine(fingerprint, "bridge", "responses", "", "",
+            "", fromMillis, toMillis, resp * intervalFraction);
+        parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+            dirreqStatsIntervalLengthMillis, "country", bridgeIps);
+        parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+            dirreqStatsIntervalLengthMillis, "transport",
+            bridgeIpTransports);
+        parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+            dirreqStatsIntervalLengthMillis, "version", bridgeIpVersions);
+      }
+    }
+  }
+
+  private static void parseBridgeRespByCategory(String fingerprint,
+      long fromMillis, long toMillis, double resp,
+      long dirreqStatsIntervalLengthMillis, String category,
+      SortedMap<String, Integer> frequencies) throws IOException {
+    double total = 0.0;
+    SortedMap<String, Double> frequenciesCopy =
+        new TreeMap<String, Double>();
+    if (frequencies != null) {
+      for (Map.Entry<String, Integer> e : frequencies.entrySet()) {
+        if (e.getValue() < 4.0) {
+          continue;
+        }
+        double r = ((double) e.getValue()) - 4.0;
+        frequenciesCopy.put(e.getKey(), r);
+        total += r;
+      }
+    }
+    /* If we're not told any frequencies, or at least none of them are
+     * greater than 4, put in a default that we'll attribute all responses
+     * to. */
+    if (total == 0) {
+      if (category.equals("country")) {
+        frequenciesCopy.put("??", 4.0);
+      } else if (category.equals("transport")) {
+        frequenciesCopy.put("<OR>", 4.0);
+      } else if (category.equals("version")) {
+        frequenciesCopy.put("v4", 4.0);
+      }
+      total = 4.0;
+    }
+    for (Map.Entry<String, Double> e : frequenciesCopy.entrySet()) {
+      double intervalFraction = ((double) (toMillis - fromMillis))
+          / ((double) dirreqStatsIntervalLengthMillis);
+      double val = resp * intervalFraction * e.getValue() / total;
+      if (category.equals("country")) {
+        writeOutputLine(fingerprint, "bridge", "responses", e.getKey(),
+            "", "", fromMillis, toMillis, val);
+      } else if (category.equals("transport")) {
+        writeOutputLine(fingerprint, "bridge", "responses", "",
+            e.getKey(), "", fromMillis, toMillis, val);
+      } else if (category.equals("version")) {
+        writeOutputLine(fingerprint, "bridge", "responses", "", "",
+            e.getKey(), fromMillis, toMillis, val);
+      }
+    }
+  }
+
+  private static void parseBridgeDirreqWriteHistory(String fingerprint,
+      long publishedMillis, BandwidthHistory dirreqWriteHistory)
+      throws IOException {
+    if (dirreqWriteHistory == null ||
+        publishedMillis - dirreqWriteHistory.getHistoryEndMillis()
+        > ONE_WEEK_MILLIS) {
+      /* Cut off all observations that are one week older than
+       * the descriptor publication time, or we'll have to update
+       * weeks of aggregate values every hour. */
+      return;
+    }
+    long intervalLengthMillis =
+        dirreqWriteHistory.getIntervalLength() * 1000L;
+    for (Map.Entry<Long, Long> e :
+        dirreqWriteHistory.getBandwidthValues().entrySet()) {
+      long intervalEndMillis = e.getKey();
+      long intervalStartMillis =
+          intervalEndMillis - intervalLengthMillis;
+      for (int i = 0; i < 2; i++) {
+        long fromMillis = intervalStartMillis;
+        long toMillis = intervalEndMillis;
+        double writtenBytes = (double) e.getValue();
+        if (intervalStartMillis / ONE_DAY_MILLIS <
+            intervalEndMillis / ONE_DAY_MILLIS) {
+          long utcBreakMillis = (intervalEndMillis
+              / ONE_DAY_MILLIS) * ONE_DAY_MILLIS;
+          if (i == 0) {
+            toMillis = utcBreakMillis;
+          } else if (i == 1) {
+            fromMillis = utcBreakMillis;
+          }
+          double intervalFraction = ((double) (toMillis - fromMillis))
+              / ((double) intervalLengthMillis);
+          writtenBytes *= intervalFraction;
+        } else if (i == 1) {
+          break;
+        }
+        writeOutputLine(fingerprint, "bridge", "bytes", "",
+            "", "", fromMillis, toMillis, writtenBytes);
+      }
+    }
+  }
+
+  private static void parseBridgeNetworkStatus(BridgeNetworkStatus status)
+      throws IOException {
+    if (status.getPublishedMillis() % ONE_HOUR_MILLIS
+        > ONE_HOUR_MILLIS / 2) {
+      return;
+    }
+    long fromMillis = (status.getPublishedMillis()
+        / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS;
+    long toMillis = fromMillis + ONE_HOUR_MILLIS;
+    for (NetworkStatusEntry statusEntry :
+        status.getStatusEntries().values()) {
+      String fingerprint = statusEntry.getFingerprint().
+          toUpperCase();
+      if (statusEntry.getFlags().contains("Running")) {
+        writeOutputLine(fingerprint, "bridge", "status", "", "", "",
+            fromMillis, toMillis, 0.0);
+      }
+    }
+  }
+
+  private static Map<String, BufferedWriter> openOutputFiles =
+      new HashMap<String, BufferedWriter>();
+  private static void writeOutputLine(String fingerprint, String node,
+      String metric, String country, String transport, String version,
+      long fromMillis, long toMillis, double val) throws IOException {
+    if (fromMillis > toMillis) {
+      return;
+    }
+    String fromDateTime = formatDateTimeMillis(fromMillis);
+    String toDateTime = formatDateTimeMillis(toMillis);
+    BufferedWriter bw = getOutputFile(fromDateTime);
+    bw.write(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%.1f\n",
+        fingerprint, node, metric, country, transport, version,
+        fromDateTime, toDateTime, val));
+  }
+
+  private static SimpleDateFormat dateTimeFormat = null;
+  private static String formatDateTimeMillis(long millis) {
+    if (dateTimeFormat == null) {
+      dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+      dateTimeFormat.setLenient(false);
+      dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    }
+    return dateTimeFormat.format(millis);
+  }
+
+  private static BufferedWriter getOutputFile(String fromDateTime)
+      throws IOException {
+    String outputFileName = isBulkImport
+        ? "out/userstats-" + fromDateTime.substring(0, 10) + ".sql"
+        : "out/userstats.sql";
+    BufferedWriter bw = openOutputFiles.get(outputFileName);
+    if (bw == null) {
+      bw = openOutputFile(outputFileName);
+      openOutputFiles.put(outputFileName, bw);
+    }
+    return bw;
+  }
+
+  private static BufferedWriter openOutputFile(String outputFileName)
+      throws IOException {
+    BufferedWriter bw = new BufferedWriter(new FileWriter(
+        outputFileName));
+    bw.write("BEGIN;\n");
+    bw.write("LOCK TABLE imported NOWAIT;\n");
+    bw.write("COPY imported (fingerprint, node, metric, country, "
+        + "transport, version, stats_start, stats_end, val) FROM "
+        + "stdin;\n");
+    return bw;
+  }
+
+  private static void closeOutputFiles() throws IOException {
+    for (BufferedWriter bw : openOutputFiles.values()) {
+      bw.write("\\.\n");
+      bw.write("SELECT merge();\n");
+      bw.write("SELECT aggregate();\n");
+      bw.write("TRUNCATE imported;\n");
+      bw.write("COMMIT;\n");
+      bw.close();
+    }
+  }
+}
+



More information about the tor-commits mailing list