commit f5ef5fb0d8f46e28dc7e8536a11d95d43ee61c08
Author: Karsten Loesing <karsten.loesing(a)gmx.net>
Date: Sat Dec 8 11:06:17 2018 +0100
Access userstats database from Java only.
Previously, we used Java to write .sql files, imported them using
psql, and afterwards made queries via psql. Now we're using Java to
interact with the database directly. This is another step towards
making the daily updater Java-only.
---
build.xml | 22 ---
.../torproject/metrics/stats/clients/Database.java | 156 ++++++++++++++++
.../org/torproject/metrics/stats/clients/Main.java | 206 +++++++--------------
.../torproject/metrics/stats/clients/Writer.java | 42 +++++
4 files changed, 263 insertions(+), 163 deletions(-)
diff --git a/build.xml b/build.xml
index 250417e..6736e19 100644
--- a/build.xml
+++ b/build.xml
@@ -367,28 +367,6 @@
<mkdir dir="${statsdir}" />
<antcall target="run-java" />
- <apply executable="psql" failonerror="true" >
- <arg value="--dbname=userstats"/>
- <arg value="-f"/>
- <fileset dir="${localmoddir}/out"
- includes="*.sql" />
- </apply>
-
- <exec executable="psql"
- dir="${localmoddir}"
- failonerror="true" >
- <arg value="-c COPY (SELECT * FROM estimated) TO STDOUT WITH CSV HEADER;" />
- <arg value="--dbname=userstats"/>
- <arg value="--output=userstats.csv" />
- </exec>
-
- <exec executable="psql"
- dir="${localmoddir}"
- failonerror="true" >
- <arg value="-c COPY (SELECT * FROM combined) TO STDOUT WITH CSV HEADER;" />
- <arg value="--dbname=userstats"/>
- <arg value="--output=userstats-combined.csv" />
- </exec>
<antcall target="run-R" >
<param name="module.Rscript" value="userstats-detector.R" />
diff --git a/src/main/java/org/torproject/metrics/stats/clients/Database.java b/src/main/java/org/torproject/metrics/stats/clients/Database.java
new file mode 100644
index 0000000..7e783dc
--- /dev/null
+++ b/src/main/java/org/torproject/metrics/stats/clients/Database.java
@@ -0,0 +1,156 @@
+/* Copyright 2017--2018 The Tor Project
+ * See LICENSE for licensing information */
+
+package org.torproject.metrics.stats.clients;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Locale;
+import java.util.TimeZone;
+
+/** Database wrapper to connect to the database, insert data, run the stored
+ * procedure for aggregating data, and query aggregated data as output. */
+class Database implements AutoCloseable {
+
+ /** Database connection string. */
+ private String jdbcString;
+
+ /** Connection object for all interactions with the database. */
+ private Connection connection;
+
+ /** Prepared statement for inserting a platform string into the imported
+ * table. */
+ private PreparedStatement psImportedInsert;
+
+ /** Create a new Database instance and prepare for inserting or querying
+ * data. */
+ Database(String jdbcString) throws SQLException {
+ this.jdbcString = jdbcString;
+ this.connect();
+ this.prepareStatements();
+ }
+
+ private void connect() throws SQLException {
+ this.connection = DriverManager.getConnection(this.jdbcString);
+ this.connection.setAutoCommit(false);
+ }
+
+ private void prepareStatements() throws SQLException {
+ this.psImportedInsert = this.connection.prepareStatement(
+ "INSERT INTO imported (fingerprint, node, metric, country, transport, "
+ + "version, stats_start, stats_end, val) "
+ + "VALUES (?, CAST(? AS node), CAST(? AS metric), ?, ?, ?, ?, ?, ?)");
+ }
+
+ /** Insert into the imported table. */
+ void insertIntoImported(String fingerprint, String node, String metric,
+ String country, String transport, String version, long fromMillis,
+ long toMillis, double val) throws SQLException {
+ if (fromMillis > toMillis) {
+ return;
+ }
+ psImportedInsert.clearParameters();
+ psImportedInsert.setString(1, fingerprint);
+ psImportedInsert.setString(2, node);
+ psImportedInsert.setString(3, metric);
+ psImportedInsert.setString(4, country);
+ psImportedInsert.setString(5, transport);
+ psImportedInsert.setString(6, version);
+ psImportedInsert.setTimestamp(7,
+ Timestamp.from(Instant.ofEpochMilli(fromMillis)));
+ psImportedInsert.setTimestamp(8,
+ Timestamp.from(Instant.ofEpochMilli(toMillis)));
+ psImportedInsert.setDouble(9, Math.round(val * 10.0) / 10.0);
+ psImportedInsert.execute();
+ }
+
+ /** Process the newly imported data by calling the various stored procedures
+ * and then truncating the imported table. */
+ void processImported() throws SQLException {
+ this.connection.createStatement().execute("SELECT merge()");
+ this.connection.createStatement().execute("SELECT aggregate()");
+ this.connection.createStatement().execute("SELECT combine()");
+ this.connection.createStatement().execute("TRUNCATE imported");
+ }
+
+ /** Commit all changes made in this execution. */
+ void commit() throws SQLException {
+ this.connection.commit();
+ }
+
+ /** Query the estimated view. */
+ List<String[]> queryEstimated() throws SQLException {
+ List<String[]> statistics = new ArrayList<>();
+ String columns = "date, node, country, transport, version, frac, users";
+ statistics.add(columns.split(", "));
+ Statement st = this.connection.createStatement();
+ Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"),
+ Locale.US);
+ String queryString = "SELECT " + columns + " FROM estimated";
+ try (ResultSet rs = st.executeQuery(queryString)) {
+ while (rs.next()) {
+ String[] outputLine = new String[7];
+ outputLine[0] = rs.getDate("date", calendar).toLocalDate().toString();
+ outputLine[1] = rs.getString("node");
+ outputLine[2] = rs.getString("country");
+ outputLine[3] = rs.getString("transport");
+ outputLine[4] = rs.getString("version");
+ outputLine[5] = getIntFromResultSet(rs, "frac");
+ outputLine[6] = getIntFromResultSet(rs, "users");
+ statistics.add(outputLine);
+ }
+ }
+ return statistics;
+ }
+
+ /** Query the combined view. */
+ List<String[]> queryCombined() throws SQLException {
+ List<String[]> statistics = new ArrayList<>();
+ String columns = "date, node, country, transport, version, frac, low, high";
+ statistics.add(columns.split(", "));
+ Statement st = this.connection.createStatement();
+ Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"),
+ Locale.US);
+ String queryString = "SELECT " + columns + " FROM combined";
+ try (ResultSet rs = st.executeQuery(queryString)) {
+ while (rs.next()) {
+ String[] outputLine = new String[8];
+ outputLine[0] = rs.getDate("date", calendar).toLocalDate().toString();
+ outputLine[1] = rs.getString("node");
+ outputLine[2] = rs.getString("country");
+ outputLine[3] = rs.getString("transport");
+ outputLine[4] = rs.getString("version");
+ outputLine[5] = getIntFromResultSet(rs, "frac");
+ outputLine[6] = getIntFromResultSet(rs, "low");
+ outputLine[7] = getIntFromResultSet(rs, "high");
+ statistics.add(outputLine);
+ }
+ }
+ return statistics;
+ }
+
+ /** Retrieve the <code>int</code> value of the designated column in the
+ * current row of the given <code>ResultSet</code> object and format it as a
+ * <code>String</code> object, or return <code>null</code> if the retrieved
+ * value was <code>NULL</code>. */
+ private static String getIntFromResultSet(ResultSet rs, String columnLabel)
+ throws SQLException {
+ int result = rs.getInt(columnLabel);
+ return rs.wasNull() ? null : String.valueOf(result);
+ }
+
+ /** Release database connection. */
+ public void close() throws SQLException {
+ this.connection.close();
+ }
+}
+
diff --git a/src/main/java/org/torproject/metrics/stats/clients/Main.java b/src/main/java/org/torproject/metrics/stats/clients/Main.java
index 3ccfe96..48d8d8d 100644
--- a/src/main/java/org/torproject/metrics/stats/clients/Main.java
+++ b/src/main/java/org/torproject/metrics/stats/clients/Main.java
@@ -15,46 +15,50 @@ import org.torproject.descriptor.RelayNetworkStatusConsensus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.HashMap;
+import java.nio.file.Paths;
+import java.sql.SQLException;
import java.util.Map;
import java.util.SortedMap;
-import java.util.TimeZone;
import java.util.TreeMap;
public class Main {
private static Logger log = LoggerFactory.getLogger(Main.class);
+ private static final String jdbcString
+ = System.getProperty("clients.database", "jdbc:postgresql:userstats");
+
+ private static Database database;
+
/** Executes this data-processing module. */
public static void main(String[] args) throws Exception {
- parseArgs(args);
+
+ log.info("Starting clients module.");
+
+ log.info("Connecting to database.");
+ database = new Database(jdbcString);
+
+ log.info("Reading relay descriptors and importing relevant parts into the "
+ + "database.");
parseRelayDescriptors();
+
+ log.info("Reading bridge descriptors and importing relevant parts into the "
+ + "database.");
parseBridgeDescriptors();
- closeOutputFiles();
- }
- private static boolean writeToSingleFile = true;
- private static boolean byStatsDateNotByDescHour = false;
-
- private static void parseArgs(String[] args) {
- if (args.length == 0) {
- writeToSingleFile = true;
- } else if (args.length == 1 && args[0].equals("--stats-date")) {
- writeToSingleFile = false;
- byStatsDateNotByDescHour = true;
- } else if (args.length == 1 && args[0].equals("--desc-hour")) {
- writeToSingleFile = false;
- byStatsDateNotByDescHour = false;
- } else {
- log.warn("Usage: java {} [ --stats-date | --desc-hour ]",
- Main.class.getName());
- System.exit(1);
- }
+ log.info("Processing newly imported data.");
+ database.processImported();
+ database.commit();
+
+ log.info("Querying aggregated statistics from the database.");
+ new Writer().write(Paths.get("stats", "userstats.csv"),
+ database.queryEstimated());
+ new Writer().write(Paths.get("stats", "userstats-combined.csv"),
+ database.queryCombined());
+
+ log.info("Disconnecting from database.");
+ database.close();
}
private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L;
@@ -80,11 +84,12 @@ public class Main {
(RelayNetworkStatusConsensus) descriptor);
}
}
+ database.commit();
descriptorReader.saveHistoryFile(historyFile);
}
private static void parseRelayExtraInfoDescriptor(
- ExtraInfoDescriptor descriptor) throws IOException {
+ ExtraInfoDescriptor descriptor) throws SQLException {
long publishedMillis = descriptor.getPublishedMillis();
String fingerprint = descriptor.getFingerprint()
.toUpperCase();
@@ -103,7 +108,7 @@ public class Main {
private static void parseRelayDirreqV3Reqs(String fingerprint,
long publishedMillis, long dirreqStatsEndMillis,
long dirreqStatsIntervalLengthMillis,
- SortedMap<String, Integer> requests) throws IOException {
+ SortedMap<String, Integer> requests) throws SQLException {
if (requests == null
|| publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS
|| dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) {
@@ -130,19 +135,17 @@ public class Main {
String country = e.getKey();
double reqs = ((double) e.getValue()) - 4.0;
sum += reqs;
- writeOutputLine(fingerprint, "relay", "responses", country,
- "", "", fromMillis, toMillis, reqs * intervalFraction,
- publishedMillis);
+ database.insertIntoImported(fingerprint, "relay", "responses", country,
+ "", "", fromMillis, toMillis, reqs * intervalFraction);
}
- writeOutputLine(fingerprint, "relay", "responses", "", "",
- "", fromMillis, toMillis, sum * intervalFraction,
- publishedMillis);
+ database.insertIntoImported(fingerprint, "relay", "responses", "", "",
+ "", fromMillis, toMillis, sum * intervalFraction);
}
}
private static void parseRelayDirreqWriteHistory(String fingerprint,
long publishedMillis, BandwidthHistory dirreqWriteHistory)
- throws IOException {
+ throws SQLException {
if (dirreqWriteHistory == null
|| publishedMillis - dirreqWriteHistory.getHistoryEndMillis()
> ONE_WEEK_MILLIS) {
@@ -177,14 +180,14 @@ public class Main {
} else if (i == 1) {
break;
}
- writeOutputLine(fingerprint, "relay", "bytes", "", "", "",
- fromMillis, toMillis, writtenBytes, publishedMillis);
+ database.insertIntoImported(fingerprint, "relay", "bytes", "", "", "",
+ fromMillis, toMillis, writtenBytes);
}
}
}
private static void parseRelayNetworkStatusConsensus(
- RelayNetworkStatusConsensus consensus) throws IOException {
+ RelayNetworkStatusConsensus consensus) throws SQLException {
long fromMillis = consensus.getValidAfterMillis();
long toMillis = consensus.getFreshUntilMillis();
for (NetworkStatusEntry statusEntry
@@ -192,8 +195,8 @@ public class Main {
String fingerprint = statusEntry.getFingerprint()
.toUpperCase();
if (statusEntry.getFlags().contains("Running")) {
- writeOutputLine(fingerprint, "relay", "status", "", "", "",
- fromMillis, toMillis, 0.0, fromMillis);
+ database.insertIntoImported(fingerprint, "relay", "status", "", "", "",
+ fromMillis, toMillis, 0.0);
}
}
}
@@ -213,11 +216,12 @@ public class Main {
parseBridgeNetworkStatus((BridgeNetworkStatus) descriptor);
}
}
+ database.commit();
descriptorReader.saveHistoryFile(historyFile);
}
private static void parseBridgeExtraInfoDescriptor(
- ExtraInfoDescriptor descriptor) throws IOException {
+ ExtraInfoDescriptor descriptor) throws SQLException {
String fingerprint = descriptor.getFingerprint().toUpperCase();
long publishedMillis = descriptor.getPublishedMillis();
long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
@@ -240,7 +244,7 @@ public class Main {
SortedMap<String, Integer> responses,
SortedMap<String, Integer> bridgeIps,
SortedMap<String, Integer> bridgeIpTransports,
- SortedMap<String, Integer> bridgeIpVersions) throws IOException {
+ SortedMap<String, Integer> bridgeIpVersions) throws SQLException {
if (responses == null
|| publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS
|| dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) {
@@ -264,18 +268,15 @@ public class Main {
}
double intervalFraction = ((double) (toMillis - fromMillis))
/ ((double) dirreqStatsIntervalLengthMillis);
- writeOutputLine(fingerprint, "bridge", "responses", "", "",
- "", fromMillis, toMillis, resp * intervalFraction,
- publishedMillis);
+ database.insertIntoImported(fingerprint, "bridge", "responses", "", "",
+ "", fromMillis, toMillis, resp * intervalFraction);
parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
- dirreqStatsIntervalLengthMillis, "country", bridgeIps,
- publishedMillis);
+ dirreqStatsIntervalLengthMillis, "country", bridgeIps);
parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
dirreqStatsIntervalLengthMillis, "transport",
- bridgeIpTransports, publishedMillis);
+ bridgeIpTransports);
parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
- dirreqStatsIntervalLengthMillis, "version", bridgeIpVersions,
- publishedMillis);
+ dirreqStatsIntervalLengthMillis, "version", bridgeIpVersions);
}
}
}
@@ -283,8 +284,8 @@ public class Main {
private static void parseBridgeRespByCategory(String fingerprint,
long fromMillis, long toMillis, double resp,
long dirreqStatsIntervalLengthMillis, String category,
- SortedMap<String, Integer> frequencies, long publishedMillis)
- throws IOException {
+ SortedMap<String, Integer> frequencies)
+ throws SQLException {
double total = 0.0;
SortedMap<String, Double> frequenciesCopy = new TreeMap<>();
if (frequencies != null) {
@@ -322,16 +323,16 @@ public class Main {
double val = resp * intervalFraction * e.getValue() / total;
switch (category) {
case "country":
- writeOutputLine(fingerprint, "bridge", "responses", e.getKey(),
- "", "", fromMillis, toMillis, val, publishedMillis);
+ database.insertIntoImported(fingerprint, "bridge", "responses",
+ e.getKey(), "", "", fromMillis, toMillis, val);
break;
case "transport":
- writeOutputLine(fingerprint, "bridge", "responses", "",
- e.getKey(), "", fromMillis, toMillis, val, publishedMillis);
+ database.insertIntoImported(fingerprint, "bridge", "responses", "",
+ e.getKey(), "", fromMillis, toMillis, val);
break;
case "version":
- writeOutputLine(fingerprint, "bridge", "responses", "", "",
- e.getKey(), fromMillis, toMillis, val, publishedMillis);
+ database.insertIntoImported(fingerprint, "bridge", "responses", "",
+ "", e.getKey(), fromMillis, toMillis, val);
break;
default:
/* Ignore any other categories. */
@@ -341,7 +342,7 @@ public class Main {
private static void parseBridgeDirreqWriteHistory(String fingerprint,
long publishedMillis, BandwidthHistory dirreqWriteHistory)
- throws IOException {
+ throws SQLException {
if (dirreqWriteHistory == null
|| publishedMillis - dirreqWriteHistory.getHistoryEndMillis()
> ONE_WEEK_MILLIS) {
@@ -376,14 +377,14 @@ public class Main {
} else if (i == 1) {
break;
}
- writeOutputLine(fingerprint, "bridge", "bytes", "",
- "", "", fromMillis, toMillis, writtenBytes, publishedMillis);
+ database.insertIntoImported(fingerprint, "bridge", "bytes", "",
+ "", "", fromMillis, toMillis, writtenBytes);
}
}
}
private static void parseBridgeNetworkStatus(BridgeNetworkStatus status)
- throws IOException {
+ throws SQLException {
long publishedMillis = status.getPublishedMillis();
long fromMillis = (publishedMillis / ONE_HOUR_MILLIS)
* ONE_HOUR_MILLIS;
@@ -393,87 +394,10 @@ public class Main {
String fingerprint = statusEntry.getFingerprint()
.toUpperCase();
if (statusEntry.getFlags().contains("Running")) {
- writeOutputLine(fingerprint, "bridge", "status", "", "", "",
- fromMillis, toMillis, 0.0, publishedMillis);
+ database.insertIntoImported(fingerprint, "bridge", "status", "", "", "",
+ fromMillis, toMillis, 0.0);
}
}
}
-
- private static Map<String, BufferedWriter> openOutputFiles = new HashMap<>();
-
- private static void writeOutputLine(String fingerprint, String node,
- String metric, String country, String transport, String version,
- long fromMillis, long toMillis, double val, long publishedMillis)
- throws IOException {
- if (fromMillis > toMillis) {
- return;
- }
- String fromDateTime = formatDateTimeMillis(fromMillis);
- String toDateTime = formatDateTimeMillis(toMillis);
- BufferedWriter bw = getOutputFile(fromDateTime, publishedMillis);
- bw.write(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%.1f\n",
- fingerprint, node, metric, country, transport, version,
- fromDateTime, toDateTime, val));
- }
-
- private static SimpleDateFormat dateTimeFormat = null;
-
- private static String formatDateTimeMillis(long millis) {
- if (dateTimeFormat == null) {
- dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- dateTimeFormat.setLenient(false);
- dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- }
- return dateTimeFormat.format(millis);
- }
-
- private static BufferedWriter getOutputFile(String fromDateTime,
- long publishedMillis) throws IOException {
- String outputFileName;
- if (writeToSingleFile) {
- outputFileName = "out/userstats.sql";
- } else if (byStatsDateNotByDescHour) {
- outputFileName = "out/userstats-" + fromDateTime.substring(0, 10)
- + ".sql";
- } else {
- String publishedHourDateTime = formatDateTimeMillis(
- (publishedMillis / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS);
- outputFileName = "out/userstats-"
- + publishedHourDateTime.substring(0, 10) + "-"
- + publishedHourDateTime.substring(11, 13) + ".sql";
- }
- BufferedWriter bw = openOutputFiles.get(outputFileName);
- if (bw == null) {
- bw = openOutputFile(outputFileName);
- openOutputFiles.put(outputFileName, bw);
- }
- return bw;
- }
-
- private static BufferedWriter openOutputFile(String outputFileName)
- throws IOException {
- File outputFile = new File(outputFileName);
- outputFile.getParentFile().mkdirs();
- BufferedWriter bw = new BufferedWriter(new FileWriter(
- outputFileName));
- bw.write("BEGIN;\n");
- bw.write("LOCK TABLE imported NOWAIT;\n");
- bw.write("COPY imported (fingerprint, node, metric, country, "
- + "transport, version, stats_start, stats_end, val) FROM "
- + "stdin;\n");
- return bw;
- }
-
- private static void closeOutputFiles() throws IOException {
- for (BufferedWriter bw : openOutputFiles.values()) {
- bw.write("\\.\n");
- bw.write("SELECT merge();\n");
- bw.write("SELECT aggregate();\n");
- bw.write("SELECT combine();\n");
- bw.write("TRUNCATE imported;\n");
- bw.write("COMMIT;\n");
- bw.close();
- }
- }
}
diff --git a/src/main/java/org/torproject/metrics/stats/clients/Writer.java b/src/main/java/org/torproject/metrics/stats/clients/Writer.java
new file mode 100644
index 0000000..ed10bf1
--- /dev/null
+++ b/src/main/java/org/torproject/metrics/stats/clients/Writer.java
@@ -0,0 +1,42 @@
+/* Copyright 2017--2018 The Tor Project
+ * See LICENSE for licensing information */
+
+package org.torproject.metrics.stats.clients;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Writer that takes output line objects and writes them to a file, preceded
+ * by a column header line. */
+class Writer {
+
+ /** Write output lines to the given file. */
+ void write(Path filePath, Iterable<String[]> outputLines)
+ throws IOException {
+ File parentFile = filePath.toFile().getParentFile();
+ if (null != parentFile && !parentFile.exists()) {
+ if (!parentFile.mkdirs()) {
+ throw new IOException("Unable to create parent directory of output "
+ + "file. Not writing this file.");
+ }
+ }
+ List<String> formattedOutputLines = new ArrayList<>();
+ for (String[] outputLine : outputLines) {
+ StringBuilder formattedOutputLine = new StringBuilder();
+ for (String outputLinePart : outputLine) {
+ formattedOutputLine.append(',');
+ if (null != outputLinePart) {
+ formattedOutputLine.append(outputLinePart);
+ }
+ }
+ formattedOutputLines.add(formattedOutputLine.substring(1));
+ }
+ Files.write(filePath, formattedOutputLines, StandardCharsets.UTF_8);
+ }
+}
+