commit e8794217eeec2d22e1d0bb1fb87e5a8f1950e80f
Author: Karsten Loesing <karsten.loesing(a)gmx.net>
Date: Thu Dec 13 14:45:08 2012 +0100
Split off code that imports conn-bi-direct stats.
---
src/org/torproject/ernie/cron/Main.java | 12 +
.../ernie/cron/PerformanceStatsImporter.java | 271 ++++++++++++++++++++
.../cron/RelayDescriptorDatabaseImporter.java | 102 +-------
3 files changed, 286 insertions(+), 99 deletions(-)
diff --git a/src/org/torproject/ernie/cron/Main.java b/src/org/torproject/ernie/cron/Main.java
index a3f38e6..fb4a450 100644
--- a/src/org/torproject/ernie/cron/Main.java
+++ b/src/org/torproject/ernie/cron/Main.java
@@ -62,6 +62,18 @@ public class Main {
bsfh.importRelayDescriptors();
}
rddi.closeConnection();
+
+ // Import conn-bi-direct statistics.
+ PerformanceStatsImporter psi = new PerformanceStatsImporter(
+ config.getWriteRelayDescriptorDatabase() ?
+ config.getRelayDescriptorDatabaseJDBC() : null,
+ config.getWriteRelayDescriptorsRawFiles() ?
+ config.getRelayDescriptorRawFilesDirectory() : null,
+ new File(config.getDirectoryArchivesDirectory()),
+ statsDirectory,
+ config.getKeepDirectoryArchiveImportHistory());
+ psi.importRelayDescriptors();
+ psi.closeConnection();
}
// Import sanitized bridges and write updated stats files to disk
diff --git a/src/org/torproject/ernie/cron/PerformanceStatsImporter.java b/src/org/torproject/ernie/cron/PerformanceStatsImporter.java
new file mode 100644
index 0000000..32d5911
--- /dev/null
+++ b/src/org/torproject/ernie/cron/PerformanceStatsImporter.java
@@ -0,0 +1,271 @@
+/* Copyright 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.ernie.cron;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.TimeZone;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorFile;
+import org.torproject.descriptor.DescriptorReader;
+import org.torproject.descriptor.DescriptorSourceFactory;
+import org.torproject.descriptor.ExtraInfoDescriptor;
+
+public class PerformanceStatsImporter {
+
+ /**
+ * How many records to commit with each database transaction.
+ */
+ private final long autoCommitCount = 500;
+
+ /**
+ * Keep track of the number of records committed before each transaction
+ */
+ private int rbsCount = 0;
+
+ /**
+ * Relay descriptor database connection.
+ */
+ private Connection conn;
+
+ /**
+ * Prepared statement to check whether a given conn-bi-direct stats
+ * string has been imported into the database before.
+ */
+ private PreparedStatement psBs;
+
+ /**
+ * Prepared statement to insert a conn-bi-direct stats string into the
+ * database.
+ */
+ private PreparedStatement psB;
+
+ /**
+ * Logger for this class.
+ */
+ private Logger logger;
+
+ /**
+ * Directory for writing raw import files.
+ */
+ private String rawFilesDirectory;
+
+ /**
+ * Raw import file containing conn-bi-direct stats strings.
+ */
+ private BufferedWriter connBiDirectOut;
+
+ /**
+ * Date format to parse timestamps.
+ */
+ private SimpleDateFormat dateTimeFormat;
+
+ private boolean importIntoDatabase;
+ private boolean writeRawImportFiles;
+
+ private File archivesDirectory;
+ private File statsDirectory;
+ private boolean keepImportHistory;
+
+ /**
+ * Initialize database importer by connecting to the database and
+ * preparing statements.
+ */
+ public PerformanceStatsImporter(String connectionURL,
+ String rawFilesDirectory, File archivesDirectory,
+ File statsDirectory, boolean keepImportHistory) {
+
+ if (archivesDirectory == null ||
+ statsDirectory == null) {
+ throw new IllegalArgumentException();
+ }
+ this.archivesDirectory = archivesDirectory;
+ this.statsDirectory = statsDirectory;
+ this.keepImportHistory = keepImportHistory;
+
+ /* Initialize logger. */
+ this.logger = Logger.getLogger(
+ RelayDescriptorDatabaseImporter.class.getName());
+
+ if (connectionURL != null) {
+ try {
+ /* Connect to database. */
+ this.conn = DriverManager.getConnection(connectionURL);
+
+ /* Turn autocommit off */
+ this.conn.setAutoCommit(false);
+
+ /* Prepare statements. */
+ this.psBs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM connbidirect WHERE source = ? AND statsend = ?");
+ this.psB = conn.prepareStatement("INSERT INTO connbidirect "
+ + "(source, statsend, seconds, belownum, readnum, writenum, "
+ + "bothnum) VALUES (?, ?, ?, ?, ?, ?, ?)");
+ this.importIntoDatabase = true;
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not connect to database or "
+ + "prepare statements.", e);
+ }
+ }
+
+ /* Remember where we want to write raw import files. */
+ if (rawFilesDirectory != null) {
+ this.rawFilesDirectory = rawFilesDirectory;
+ this.writeRawImportFiles = true;
+ }
+
+ /* Initialize date format, so that we can format timestamps. */
+ this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+
+ /**
+ * Insert a conn-bi-direct stats string into the database.
+ */
+ private void addConnBiDirect(String source, long statsEndMillis,
+ long seconds, long below, long read, long write, long both) {
+ String statsEnd = this.dateTimeFormat.format(statsEndMillis);
+ if (this.importIntoDatabase) {
+ try {
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ Timestamp statsEndTimestamp = new Timestamp(statsEndMillis);
+ this.psBs.setString(1, source);
+ this.psBs.setTimestamp(2, statsEndTimestamp, cal);
+ ResultSet rs = psBs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) == 0) {
+ this.psB.clearParameters();
+ this.psB.setString(1, source);
+ this.psB.setTimestamp(2, statsEndTimestamp, cal);
+ this.psB.setLong(3, seconds);
+ this.psB.setLong(4, below);
+ this.psB.setLong(5, read);
+ this.psB.setLong(6, write);
+ this.psB.setLong(7, both);
+ this.psB.executeUpdate();
+ rbsCount++;
+ if (rbsCount % autoCommitCount == 0) {
+ this.conn.commit();
+ }
+ }
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not add conn-bi-direct "
+ + "stats string. We won't make any further SQL requests in "
+ + "this execution.", e);
+ this.importIntoDatabase = false;
+ }
+ }
+ if (this.writeRawImportFiles) {
+ try {
+ if (this.connBiDirectOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.connBiDirectOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/connbidirect.sql"));
+ this.connBiDirectOut.write(" COPY connbidirect (source, "
+ + "statsend, seconds, belownum, readnum, writenum, "
+ + "bothnum) FROM stdin;\n");
+ }
+ this.connBiDirectOut.write(source + "\t" + statsEnd + "\t"
+ + seconds + "\t" + below + "\t" + read + "\t" + write + "\t"
+ + both + "\n");
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write conn-bi-direct "
+ + "stats string to raw database import file. We won't make "
+ + "any further attempts to write raw import files in this "
+ + "execution.", e);
+ this.writeRawImportFiles = false;
+ }
+ }
+ }
+
+ void importRelayDescriptors() {
+ if (archivesDirectory.exists()) {
+ logger.fine("Importing files in directory " + archivesDirectory
+ + "/...");
+ DescriptorReader reader =
+ DescriptorSourceFactory.createDescriptorReader();
+ reader.addDirectory(archivesDirectory);
+ if (keepImportHistory) {
+ reader.setExcludeFiles(new File(statsDirectory,
+ "connbidirect-relay-descriptor-history"));
+ }
+ Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors();
+ while (descriptorFiles.hasNext()) {
+ DescriptorFile descriptorFile = descriptorFiles.next();
+ if (descriptorFile.getDescriptors() != null) {
+ for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+ if (descriptor instanceof ExtraInfoDescriptor) {
+ this.addExtraInfoDescriptor(
+ (ExtraInfoDescriptor) descriptor);
+ }
+ }
+ }
+ }
+ }
+
+ logger.info("Finished importing relay descriptors.");
+ }
+
+ private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) {
+ if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) {
+ this.addConnBiDirect(descriptor.getFingerprint(),
+ descriptor.getConnBiDirectStatsEndMillis(),
+ descriptor.getConnBiDirectStatsIntervalLength(),
+ descriptor.getConnBiDirectBelow(),
+ descriptor.getConnBiDirectRead(),
+ descriptor.getConnBiDirectWrite(),
+ descriptor.getConnBiDirectBoth());
+ }
+ }
+
+ /**
+ * Close the relay descriptor database connection.
+ */
+ void closeConnection() {
+
+ /* Log stats about imported descriptors. */
+ this.logger.info(String.format("Finished importing relay "
+ + "descriptors: %d conn-bi-direct stats lines", rbsCount));
+
+ /* Commit any stragglers before closing. */
+ if (this.conn != null) {
+ try {
+ this.conn.commit();
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not commit final records "
+ + "to database", e);
+ }
+ try {
+ this.conn.close();
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not close database "
+ + "connection.", e);
+ }
+ }
+
+ /* Close raw import files. */
+ try {
+ if (this.connBiDirectOut != null) {
+ this.connBiDirectOut.write("\\.\n");
+ this.connBiDirectOut.close();
+ }
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not close one or more raw "
+ + "database import files.", e);
+ }
+ }
+}
diff --git a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
index c9d9bc4..05c663f 100644
--- a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
@@ -60,7 +60,6 @@ public final class RelayDescriptorDatabaseImporter {
private int rrsCount = 0;
private int rcsCount = 0;
private int rvsCount = 0;
- private int rbsCount = 0;
private int rqsCount = 0;
/**
@@ -94,12 +93,6 @@ public final class RelayDescriptorDatabaseImporter {
private PreparedStatement psCs;
/**
- * Prepared statement to check whether a given conn-bi-direct stats
- * string has been imported into the database before.
- */
- private PreparedStatement psBs;
-
- /**
* Prepared statement to check whether a given dirreq stats string has
* been imported into the database before.
*/
@@ -141,12 +134,6 @@ public final class RelayDescriptorDatabaseImporter {
private PreparedStatement psC;
/**
- * Prepared statement to insert a conn-bi-direct stats string into the
- * database.
- */
- private PreparedStatement psB;
-
- /**
* Prepared statement to insert a given dirreq stats string into the
* database.
*/
@@ -183,11 +170,6 @@ public final class RelayDescriptorDatabaseImporter {
private BufferedWriter consensusOut;
/**
- * Raw import file containing conn-bi-direct stats strings.
- */
- private BufferedWriter connBiDirectOut;
-
- /**
* Raw import file containing dirreq stats.
*/
private BufferedWriter dirReqOut;
@@ -260,8 +242,6 @@ public final class RelayDescriptorDatabaseImporter {
+ "FROM descriptor WHERE descriptor = ?");
this.psCs = conn.prepareStatement("SELECT COUNT(*) "
+ "FROM consensus WHERE validafter = ?");
- this.psBs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM connbidirect WHERE source = ? AND statsend = ?");
this.psQs = conn.prepareStatement("SELECT COUNT(*) "
+ "FROM dirreq_stats WHERE source = ? AND statsend = ?");
this.psR = conn.prepareStatement("INSERT INTO statusentry "
@@ -282,9 +262,6 @@ public final class RelayDescriptorDatabaseImporter {
+ "?)}");
this.psC = conn.prepareStatement("INSERT INTO consensus "
+ "(validafter, rawdesc) VALUES (?, ?)");
- this.psB = conn.prepareStatement("INSERT INTO connbidirect "
- + "(source, statsend, seconds, belownum, readnum, writenum, "
- + "bothnum) VALUES (?, ?, ?, ?, ?, ?, ?)");
this.psQ = conn.prepareStatement("INSERT INTO dirreq_stats "
+ "(source, statsend, seconds, country, requests) VALUES "
+ "(?, ?, ?, ?, ?)");
@@ -882,66 +859,6 @@ public final class RelayDescriptorDatabaseImporter {
}
/**
- * Insert a conn-bi-direct stats string into the database.
- */
- public void addConnBiDirect(String source, long statsEndMillis,
- long seconds, long below, long read, long write, long both) {
- String statsEnd = this.dateTimeFormat.format(statsEndMillis);
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(statsEndMillis);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp statsEndTimestamp = new Timestamp(statsEndMillis);
- this.psBs.setString(1, source);
- this.psBs.setTimestamp(2, statsEndTimestamp, cal);
- ResultSet rs = psBs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- this.psB.clearParameters();
- this.psB.setString(1, source);
- this.psB.setTimestamp(2, statsEndTimestamp, cal);
- this.psB.setLong(3, seconds);
- this.psB.setLong(4, below);
- this.psB.setLong(5, read);
- this.psB.setLong(6, write);
- this.psB.setLong(7, both);
- this.psB.executeUpdate();
- rbsCount++;
- if (rbsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not add conn-bi-direct "
- + "stats string. We won't make any further SQL requests in "
- + "this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.connBiDirectOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.connBiDirectOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/connbidirect.sql"));
- this.connBiDirectOut.write(" COPY connbidirect (source, "
- + "statsend, seconds, belownum, readnum, writenum, "
- + "bothnum) FROM stdin;\n");
- }
- this.connBiDirectOut.write(source + "\t" + statsEnd + "\t"
- + seconds + "\t" + below + "\t" + read + "\t" + write + "\t"
- + both + "\n");
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not write conn-bi-direct "
- + "stats string to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- /**
* Adds observations on the number of directory requests by country as
* seen on a directory at a given date to the database.
*/
@@ -1081,15 +998,6 @@ public final class RelayDescriptorDatabaseImporter {
descriptor.getDirreqStatsEndMillis(),
descriptor.getDirreqStatsIntervalLength(), obs);
}
- if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) {
- this.addConnBiDirect(descriptor.getFingerprint(),
- descriptor.getConnBiDirectStatsEndMillis(),
- descriptor.getConnBiDirectStatsIntervalLength(),
- descriptor.getConnBiDirectBelow(),
- descriptor.getConnBiDirectRead(),
- descriptor.getConnBiDirectWrite(),
- descriptor.getConnBiDirectBoth());
- }
List<String> bandwidthHistoryLines = new ArrayList<String>();
if (descriptor.getWriteHistory() != null) {
bandwidthHistoryLines.add(descriptor.getWriteHistory().getLine());
@@ -1121,9 +1029,9 @@ public final class RelayDescriptorDatabaseImporter {
this.logger.info(String.format("Finished importing relay "
+ "descriptors: %d consensuses, %d network status entries, %d "
+ "votes, %d server descriptors, %d extra-info descriptors, %d "
- + "bandwidth history elements, %d dirreq stats elements, and %d "
- + "conn-bi-direct stats lines", rcsCount, rrsCount, rvsCount,
- rdsCount, resCount, rhsCount, rqsCount, rbsCount));
+ + "bandwidth history elements, and %d dirreq stats elements",
+ rcsCount, rrsCount, rvsCount, rdsCount, resCount, rhsCount,
+ rqsCount));
/* Insert scheduled updates a second time, just in case the refresh
* run has started since inserting them the first time in which case
@@ -1179,10 +1087,6 @@ public final class RelayDescriptorDatabaseImporter {
this.consensusOut.write("\\.\n");
this.consensusOut.close();
}
- if (this.connBiDirectOut != null) {
- this.connBiDirectOut.write("\\.\n");
- this.connBiDirectOut.close();
- }
} catch (IOException e) {
this.logger.log(Level.WARNING, "Could not close one or more raw "
+ "database import files.", e);