[tor-commits] [metrics-web/master] Import relay descriptors as part of metrics-web.

karsten at torproject.org karsten at torproject.org
Wed Mar 2 15:45:28 UTC 2011


commit dac73329a421cb3b13978c56be88ad36abd1c159
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Tue Mar 1 16:40:55 2011 +0100

    Import relay descriptors as part of metrics-web.
---
 build.xml                                          |    1 +
 config.template                                    |   14 +
 src/org/torproject/ernie/cron/Configuration.java   |   27 +
 src/org/torproject/ernie/cron/Main.java            |   19 +-
 .../cron/RelayDescriptorDatabaseImporter.java      | 1213 ++++++++++++++++++++
 .../ernie/cron/RelayDescriptorParser.java          |  241 ++++-
 6 files changed, 1509 insertions(+), 6 deletions(-)

diff --git a/build.xml b/build.xml
index 86ce310..446572b 100644
--- a/build.xml
+++ b/build.xml
@@ -13,6 +13,7 @@
   <path id="classpath">
     <pathelement path="${classes}"/>
     <pathelement location="lib/commons-codec-1.4.jar"/>
+    <pathelement location="lib/postgresql-8.4-702.jdbc3.jar"/>
   </path>
 
   <target name="init">
diff --git a/config.template b/config.template
index 479d78c..1d0701c 100644
--- a/config.template
+++ b/config.template
@@ -10,6 +10,20 @@
 ## again, but it can be confusing to users who don't know about it.
 #KeepDirectoryArchiveImportHistory 0
 #
+## Write relay descriptors to the database
+#WriteRelayDescriptorDatabase 0
+#
+## JDBC string for relay descriptor database
+#RelayDescriptorDatabaseJDBC jdbc:postgresql://localhost/tordir?user=metrics&password=password
+#
+## Write relay descriptors to raw text files for importing them into the
+## database using PostgreSQL's \copy command
+#WriteRelayDescriptorsRawFiles 0
+#
+## Relative path to directory to write raw text files; note that existing
+## files will be overwritten!
+#RelayDescriptorRawFilesDirectory pg-import/
+#
 ## Write statistics about the current consensus and votes to the
 ## website
 #WriteConsensusHealth 0
diff --git a/src/org/torproject/ernie/cron/Configuration.java b/src/org/torproject/ernie/cron/Configuration.java
index 6b76dc7..66ad778 100644
--- a/src/org/torproject/ernie/cron/Configuration.java
+++ b/src/org/torproject/ernie/cron/Configuration.java
@@ -16,6 +16,11 @@ public class Configuration {
   private boolean importDirectoryArchives = false;
   private String directoryArchivesDirectory = "archives/";
   private boolean keepDirectoryArchiveImportHistory = false;
+  private boolean writeRelayDescriptorDatabase = false;
+  private String relayDescriptorDatabaseJdbc =
+      "jdbc:postgresql://localhost/tordir?user=metrics&password=password";
+  private boolean writeRelayDescriptorsRawFiles = false;
+  private String relayDescriptorRawFilesDirectory = "pg-import/";
   private boolean writeConsensusHealth = false;
   public Configuration() {
 
@@ -42,6 +47,16 @@ public class Configuration {
         } else if (line.startsWith("KeepDirectoryArchiveImportHistory")) {
           this.keepDirectoryArchiveImportHistory = Integer.parseInt(
               line.split(" ")[1]) != 0;
+        } else if (line.startsWith("WriteRelayDescriptorDatabase")) {
+          this.writeRelayDescriptorDatabase = Integer.parseInt(
+              line.split(" ")[1]) != 0;
+        } else if (line.startsWith("RelayDescriptorDatabaseJDBC")) {
+          this.relayDescriptorDatabaseJdbc = line.split(" ")[1];
+        } else if (line.startsWith("WriteRelayDescriptorsRawFiles")) {
+          this.writeRelayDescriptorsRawFiles = Integer.parseInt(
+              line.split(" ")[1]) != 0;
+        } else if (line.startsWith("RelayDescriptorRawFilesDirectory")) {
+          this.relayDescriptorRawFilesDirectory = line.split(" ")[1];
         } else if (line.startsWith("WriteConsensusHealth")) {
           this.writeConsensusHealth = Integer.parseInt(
               line.split(" ")[1]) != 0;
@@ -79,6 +94,18 @@ public class Configuration {
   public boolean getKeepDirectoryArchiveImportHistory() {
     return this.keepDirectoryArchiveImportHistory;
   }
+  public boolean getWriteRelayDescriptorDatabase() {
+    return this.writeRelayDescriptorDatabase;
+  }
+  public String getRelayDescriptorDatabaseJDBC() {
+    return this.relayDescriptorDatabaseJdbc;
+  }
+  public boolean getWriteRelayDescriptorsRawFiles() {
+    return this.writeRelayDescriptorsRawFiles;
+  }
+  public String getRelayDescriptorRawFilesDirectory() {
+    return this.relayDescriptorRawFilesDirectory;
+  }
   public boolean getWriteConsensusHealth() {
     return this.writeConsensusHealth;
   }
diff --git a/src/org/torproject/ernie/cron/Main.java b/src/org/torproject/ernie/cron/Main.java
index b95a133..1a125bc 100644
--- a/src/org/torproject/ernie/cron/Main.java
+++ b/src/org/torproject/ernie/cron/Main.java
@@ -37,10 +37,20 @@ public class Main {
     ConsensusHealthChecker chc = config.getWriteConsensusHealth() ?
         new ConsensusHealthChecker() : null;
 
+    // Prepare writing relay descriptors to database
+    RelayDescriptorDatabaseImporter rddi =
+        config.getWriteRelayDescriptorDatabase() ||
+        config.getWriteRelayDescriptorsRawFiles() ?
+        new RelayDescriptorDatabaseImporter(
+        config.getWriteRelayDescriptorDatabase() ?
+        config.getRelayDescriptorDatabaseJDBC() : null,
+        config.getWriteRelayDescriptorsRawFiles() ?
+        config.getRelayDescriptorRawFilesDirectory() : null) : null;
+
     // Prepare relay descriptor parser (only if we are writing the
     // consensus-health page to disk)
-    RelayDescriptorParser rdp = config.getWriteConsensusHealth() ?
-        new RelayDescriptorParser(chc) : null;
+    RelayDescriptorParser rdp = chc != null || rddi != null ?
+        new RelayDescriptorParser(chc, rddi) : null;
 
     // Import relay descriptors
     if (rdp != null) {
@@ -52,6 +62,11 @@ public class Main {
       }
     }
 
+    // Close database connection (if active)
+    if (rddi != null)   {
+      rddi.closeConnection();
+    }
+
     // Write consensus health website
     if (chc != null) {
       chc.writeStatusWebsite();
diff --git a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
new file mode 100644
index 0000000..f8f1e9a
--- /dev/null
+++ b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java
@@ -0,0 +1,1213 @@
+/* Copyright 2011 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.ernie.cron;
+
+import java.io.*;
+import java.sql.*;
+import java.text.*;
+import java.util.*;
+import java.util.logging.*;
+import org.postgresql.util.*;
+
+/**
+ * Parse directory data.
+ */
+
+public final class RelayDescriptorDatabaseImporter {
+
+  /**
+   * How many records to commit with each database transaction.
+   */
+  private final long autoCommitCount = 500;
+
+  /**
+   * Keep track of the number of records committed before each transaction
+   */
+  private int rdsCount = 0;
+  private int resCount = 0;
+  private int rhsCount = 0;
+  private int rrsCount = 0;
+  private int rcsCount = 0;
+  private int rvsCount = 0;
+  private int rbsCount = 0;
+  private int rqsCount = 0;
+
+  /**
+   * Relay descriptor database connection.
+   */
+  private Connection conn;
+
+  /**
+   * Prepared statement to check whether any network status consensus
+   * entries matching a given valid-after time have been imported into the
+   * database before.
+   */
+  private PreparedStatement psSs;
+
+  /**
+   * Prepared statement to check whether a given network status consensus
+   * entry has been imported into the database before.
+   */
+  private PreparedStatement psRs;
+
+  /**
+   * Prepared statement to check whether a given extra-info descriptor has
+   * been imported into the database before.
+   */
+  private PreparedStatement psEs;
+
+  /**
+   * Prepared statement to check whether a given server descriptor has
+   * been imported into the database before.
+   */
+  private PreparedStatement psDs;
+
+  /**
+   * Prepared statement to check whether a given network status consensus
+   * has been imported into the database before.
+   */
+  private PreparedStatement psCs;
+
+  /**
+   * Prepared statement to check whether a given network status vote has
+   * been imported into the database before.
+   */
+  private PreparedStatement psVs;
+
+  /**
+   * Prepared statement to check whether a given conn-bi-direct stats
+   * string has been imported into the database before.
+   */
+  private PreparedStatement psBs;
+
+  /**
+   * Prepared statement to check whether a given dirreq stats string has
+   * been imported into the database before.
+   */
+  private PreparedStatement psQs;
+
+  /**
+   * Set of dates that have been inserted into the database for being
+   * included in the next refresh run.
+   */
+  private Set<Long> scheduledUpdates;
+
+  /**
+   * Prepared statement to insert a date into the database that shall be
+   * included in the next refresh run.
+   */
+  private PreparedStatement psU;
+
+  /**
+   * Prepared statement to insert a network status consensus entry into
+   * the database.
+   */
+  private PreparedStatement psR;
+
+  /**
+   * Prepared statement to insert a server descriptor into the database.
+   */
+  private PreparedStatement psD;
+
+  /**
+   * Prepared statement to insert an extra-info descriptor into the
+   * database.
+   */
+  private PreparedStatement psE;
+
+  /**
+   * Callable statement to insert the bandwidth history of an extra-info
+   * descriptor into the database.
+   */
+  private CallableStatement csH;
+
+  /**
+   * Prepared statement to insert a network status consensus into the
+   * database.
+   */
+  private PreparedStatement psC;
+
+  /**
+   * Prepared statement to insert a network status vote into the
+   * database.
+   */
+  private PreparedStatement psV;
+
+  /**
+   * Prepared statement to insert a conn-bi-direct stats string into the
+   * database.
+   */
+  private PreparedStatement psB;
+
+  /**
+   * Prepared statement to insert a given dirreq stats string into the
+   * database.
+   */
+  private PreparedStatement psQ;
+
+  /**
+   * Logger for this class.
+   */
+  private Logger logger;
+
+  /**
+   * Directory for writing raw import files.
+   */
+  private String rawFilesDirectory;
+
+  /**
+   * Raw import file containing status entries.
+   */
+  private BufferedWriter statusentryOut;
+
+  /**
+   * Raw import file containing server descriptors.
+   */
+  private BufferedWriter descriptorOut;
+
+  /**
+   * Raw import file containing extra-info descriptors.
+   */
+  private BufferedWriter extrainfoOut;
+
+  /**
+   * Raw import file containing bandwidth histories.
+   */
+  private BufferedWriter bwhistOut;
+
+  /**
+   * Raw import file containing consensuses.
+   */
+  private BufferedWriter consensusOut;
+
+  /**
+   * Raw import file containing votes.
+   */
+  private BufferedWriter voteOut;
+
+  /**
+   * Raw import file containing conn-bi-direct stats strings.
+   */
+  private BufferedWriter connBiDirectOut;
+
+  /**
+   * Raw import file containing dirreq stats.
+   */
+  private BufferedWriter dirReqOut;
+
+  /**
+   * Date format to parse timestamps.
+   */
+  private SimpleDateFormat dateTimeFormat;
+
+  /**
+   * The last valid-after time for which we checked whether they have been
+   * any network status entries in the database.
+   */
+  private long lastCheckedStatusEntries;
+
+  /**
+   * Set of fingerprints that we imported for the valid-after time in
+   * <code>lastCheckedStatusEntries</code>.
+   */
+  private Set<String> insertedStatusEntries;
+
+  /**
+   * Flag that tells us whether we need to check whether a network status
+   * entry is already contained in the database or not.
+   */
+  private boolean separateStatusEntryCheckNecessary;
+
+  private boolean importIntoDatabase;
+  private boolean writeRawImportFiles;
+
+  /**
+   * Initialize database importer by connecting to the database and
+   * preparing statements.
+   */
+  public RelayDescriptorDatabaseImporter(String connectionURL,
+      String rawFilesDirectory) {
+
+    /* Initialize logger. */
+    this.logger = Logger.getLogger(
+        RelayDescriptorDatabaseImporter.class.getName());
+
+    if (connectionURL != null) {
+      try {
+        /* Connect to database. */
+        this.conn = DriverManager.getConnection(connectionURL);
+
+        /* Turn autocommit off */
+        this.conn.setAutoCommit(false);
+
+        /* Prepare statements. */
+        this.psSs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM statusentry WHERE validafter = ?");
+        this.psRs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM statusentry WHERE validafter = ? AND descriptor = ?");
+        this.psDs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM descriptor WHERE descriptor = ?");
+        this.psEs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM extrainfo WHERE extrainfo = ?");
+        this.psCs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM consensus WHERE validafter = ?");
+        this.psVs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM vote WHERE validafter = ? AND dirsource = ?");
+        this.psBs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM connbidirect WHERE source = ? AND statsend = ?");
+        this.psQs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM dirreq_stats WHERE source = ? AND statsend = ?");
+        this.psR = conn.prepareStatement("INSERT INTO statusentry "
+            + "(validafter, nickname, fingerprint, descriptor, "
+            + "published, address, orport, dirport, isauthority, "
+            + "isbadexit, isbaddirectory, isexit, isfast, isguard, "
+            + "ishsdir, isnamed, isstable, isrunning, isunnamed, "
+            + "isvalid, isv2dir, isv3dir, version, bandwidth, ports, "
+            + "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
+            + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+        this.psD = conn.prepareStatement("INSERT INTO descriptor "
+            + "(descriptor, nickname, address, orport, dirport, "
+            + "fingerprint, bandwidthavg, bandwidthburst, "
+            + "bandwidthobserved, platform, published, uptime, "
+            + "extrainfo, rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
+            + "?, ?, ?, ?)");
+        this.psE = conn.prepareStatement("INSERT INTO extrainfo "
+            + "(extrainfo, nickname, fingerprint, published, rawdesc) "
+            + "VALUES (?, ?, ?, ?, ?)");
+        this.csH = conn.prepareCall("{call insert_bwhist(?, ?, ?, ?, ?, "
+            + "?)}");
+        this.psC = conn.prepareStatement("INSERT INTO consensus "
+            + "(validafter, rawdesc) VALUES (?, ?)");
+        this.psV = conn.prepareStatement("INSERT INTO vote "
+            + "(validafter, dirsource, rawdesc) VALUES (?, ?, ?)");
+        this.psB = conn.prepareStatement("INSERT INTO connbidirect "
+            + "(source, statsend, seconds, belownum, readnum, writenum, "
+            + "bothnum) VALUES (?, ?, ?, ?, ?, ?, ?)");
+        this.psQ = conn.prepareStatement("INSERT INTO dirreq_stats "
+            + "(source, statsend, seconds, country, requests) VALUES "
+            + "(?, ?, ?, ?, ?)");
+        this.psU = conn.prepareStatement("INSERT INTO scheduled_updates "
+            + "(date) VALUES (?)");
+        this.scheduledUpdates = new HashSet<Long>();
+        this.importIntoDatabase = true;
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not connect to database or "
+            + "prepare statements.", e);
+      }
+
+      /* Initialize set of fingerprints to remember which status entries
+       * we already imported. */
+      this.insertedStatusEntries = new HashSet<String>();
+    }
+
+    /* Remember where we want to write raw import files. */
+    if (rawFilesDirectory != null) {
+      this.rawFilesDirectory = rawFilesDirectory;
+      this.writeRawImportFiles = true;
+    }
+
+    /* Initialize date format, so that we can format timestamps. */
+    this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+  }
+
+  private void addDateToScheduledUpdates(long timestamp)
+      throws SQLException {
+    if (!this.importIntoDatabase) {
+      return;
+    }
+    long dateMillis = 0L;
+    try {
+      dateMillis = this.dateTimeFormat.parse(
+          this.dateTimeFormat.format(timestamp).substring(0, 10)
+          + " 00:00:00").getTime();
+    } catch (ParseException e) {
+      this.logger.log(Level.WARNING, "Internal parsing error.", e);
+      return;
+    }
+    if (!this.scheduledUpdates.contains(dateMillis)) {
+      this.psU.setDate(1, new java.sql.Date(dateMillis));
+      this.psU.execute();
+      this.scheduledUpdates.add(dateMillis);
+    }
+  }
+
+  /**
+   * Insert network status consensus entry into database.
+   */
+  public void addStatusEntry(long validAfter, String nickname,
+      String fingerprint, String descriptor, long published,
+      String address, long orPort, long dirPort,
+      SortedSet<String> flags, String version, long bandwidth,
+      String ports, byte[] rawDescriptor) {
+    if (this.importIntoDatabase) {
+      try {
+        this.addDateToScheduledUpdates(validAfter);
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        Timestamp validAfterTimestamp = new Timestamp(validAfter);
+        if (lastCheckedStatusEntries != validAfter) {
+          this.psSs.setTimestamp(1, validAfterTimestamp, cal);
+          ResultSet rs = psSs.executeQuery();
+          rs.next();
+          if (rs.getInt(1) == 0) {
+            separateStatusEntryCheckNecessary = false;
+            insertedStatusEntries.clear();
+          } else {
+            separateStatusEntryCheckNecessary = true;
+          }
+          rs.close();
+          lastCheckedStatusEntries = validAfter;
+        }
+        boolean alreadyContained = false;
+        if (separateStatusEntryCheckNecessary ||
+            insertedStatusEntries.contains(fingerprint)) {
+          this.psRs.setTimestamp(1, validAfterTimestamp, cal);
+          this.psRs.setString(2, descriptor);
+          ResultSet rs = psRs.executeQuery();
+          rs.next();
+          if (rs.getInt(1) > 0) {
+            alreadyContained = true;
+          }
+          rs.close();
+        } else {
+          insertedStatusEntries.add(fingerprint);
+        }
+        if (!alreadyContained) {
+          this.psR.clearParameters();
+          this.psR.setTimestamp(1, validAfterTimestamp, cal);
+          this.psR.setString(2, nickname);
+          this.psR.setString(3, fingerprint);
+          this.psR.setString(4, descriptor);
+          this.psR.setTimestamp(5, new Timestamp(published), cal);
+          this.psR.setString(6, address);
+          this.psR.setLong(7, orPort);
+          this.psR.setLong(8, dirPort);
+          this.psR.setBoolean(9, flags.contains("Authority"));
+          this.psR.setBoolean(10, flags.contains("BadExit"));
+          this.psR.setBoolean(11, flags.contains("BadDirectory"));
+          this.psR.setBoolean(12, flags.contains("Exit"));
+          this.psR.setBoolean(13, flags.contains("Fast"));
+          this.psR.setBoolean(14, flags.contains("Guard"));
+          this.psR.setBoolean(15, flags.contains("HSDir"));
+          this.psR.setBoolean(16, flags.contains("Named"));
+          this.psR.setBoolean(17, flags.contains("Stable"));
+          this.psR.setBoolean(18, flags.contains("Running"));
+          this.psR.setBoolean(19, flags.contains("Unnamed"));
+          this.psR.setBoolean(20, flags.contains("Valid"));
+          this.psR.setBoolean(21, flags.contains("V2Dir"));
+          this.psR.setBoolean(22, flags.contains("V3Dir"));
+          this.psR.setString(23, version);
+          this.psR.setLong(24, bandwidth);
+          this.psR.setString(25, ports);
+          this.psR.setBytes(26, rawDescriptor);
+          this.psR.executeUpdate();
+          rrsCount++;
+          if (rrsCount % autoCommitCount == 0)  {
+            this.conn.commit();
+          }
+        }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add network status "
+            + "consensus entry.  We won't make any further SQL requests "
+            + "in this execution.", e);
+        this.importIntoDatabase = false;
+      }
+    }
+    if (this.writeRawImportFiles) {
+      try {
+        if (this.statusentryOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.statusentryOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/statusentry.sql"));
+          this.statusentryOut.write(" COPY statusentry (validafter, "
+              + "nickname, fingerprint, descriptor, published, address, "
+              + "orport, dirport, isauthority, isbadExit, "
+              + "isbaddirectory, isexit, isfast, isguard, ishsdir, "
+              + "isnamed, isstable, isrunning, isunnamed, isvalid, "
+              + "isv2dir, isv3dir, version, bandwidth, ports, rawdesc) "
+              + "FROM stdin;\n");
+        }
+        this.statusentryOut.write(
+            this.dateTimeFormat.format(validAfter) + "\t" + nickname
+            + "\t" + fingerprint.toLowerCase() + "\t"
+            + descriptor.toLowerCase() + "\t"
+            + this.dateTimeFormat.format(published) + "\t" + address
+            + "\t" + orPort + "\t" + dirPort + "\t"
+            + (flags.contains("Authority") ? "t" : "f") + "\t"
+            + (flags.contains("BadExit") ? "t" : "f") + "\t"
+            + (flags.contains("BadDirectory") ? "t" : "f") + "\t"
+            + (flags.contains("Exit") ? "t" : "f") + "\t"
+            + (flags.contains("Fast") ? "t" : "f") + "\t"
+            + (flags.contains("Guard") ? "t" : "f") + "\t"
+            + (flags.contains("HSDir") ? "t" : "f") + "\t"
+            + (flags.contains("Named") ? "t" : "f") + "\t"
+            + (flags.contains("Stable") ? "t" : "f") + "\t"
+            + (flags.contains("Running") ? "t" : "f") + "\t"
+            + (flags.contains("Unnamed") ? "t" : "f") + "\t"
+            + (flags.contains("Valid") ? "t" : "f") + "\t"
+            + (flags.contains("V2Dir") ? "t" : "f") + "\t"
+            + (flags.contains("V3Dir") ? "t" : "f") + "\t"
+            + (version != null ? version : "\\N") + "\t"
+            + (bandwidth >= 0 ? bandwidth : "\\N") + "\t"
+            + (ports != null ? ports : "\\N") + "\t");
+        this.statusentryOut.write(PGbytea.toPGString(rawDescriptor).
+            replaceAll("\\\\", "\\\\\\\\") + "\n");
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "consensus entry to raw database import file.  We won't "
+            + "make any further attempts to write raw import files in "
+            + "this execution.", e);
+        this.writeRawImportFiles = false;
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "consensus entry to raw database import file.  We won't "
+            + "make any further attempts to write raw import files in "
+            + "this execution.", e);
+        this.writeRawImportFiles = false;
+      }
+    }
+  }
+
+  /**
+   * Insert server descriptor into database.
+   */
+  public void addServerDescriptor(String descriptor, String nickname,
+      String address, int orPort, int dirPort, String relayIdentifier,
+      long bandwidthAvg, long bandwidthBurst, long bandwidthObserved,
+      String platform, long published, long uptime,
+      String extraInfoDigest, byte[] rawDescriptor) {
+    if (this.importIntoDatabase) {
+      try {
+        this.addDateToScheduledUpdates(published);
+        this.addDateToScheduledUpdates(
+            published + 24L * 60L * 60L * 1000L);
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        this.psDs.setString(1, descriptor);
+        ResultSet rs = psDs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) == 0) {
+          this.psD.clearParameters();
+          this.psD.setString(1, descriptor);
+          this.psD.setString(2, nickname);
+          this.psD.setString(3, address);
+          this.psD.setInt(4, orPort);
+          this.psD.setInt(5, dirPort);
+          this.psD.setString(6, relayIdentifier);
+          this.psD.setLong(7, bandwidthAvg);
+          this.psD.setLong(8, bandwidthBurst);
+          this.psD.setLong(9, bandwidthObserved);
+          this.psD.setString(10, new String(platform.getBytes(),
+              "US-ASCII"));
+          this.psD.setTimestamp(11, new Timestamp(published), cal);
+          this.psD.setLong(12, uptime);
+          this.psD.setString(13, extraInfoDigest);
+          this.psD.setBytes(14, rawDescriptor);
+          this.psD.executeUpdate();
+          rdsCount++;
+          if (rdsCount % autoCommitCount == 0)  {
+            this.conn.commit();
+          }
+        }
+      } catch (UnsupportedEncodingException e) {
+        // US-ASCII is supported for sure
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add server "
+            + "descriptor.  We won't make any further SQL requests in "
+            + "this execution.", e);
+        this.importIntoDatabase = false;
+      }
+    }
+    if (this.writeRawImportFiles) {
+      try {
+        if (this.descriptorOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.descriptorOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/descriptor.sql"));
+          this.descriptorOut.write(" COPY descriptor (descriptor, "
+              + "nickname, address, orport, dirport, fingerprint, "
+              + "bandwidthavg, bandwidthburst, bandwidthobserved, "
+              + "platform, published, uptime, extrainfo, rawdesc) FROM "
+              + "stdin;\n");
+        }
+        this.descriptorOut.write(descriptor.toLowerCase() + "\t"
+            + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
+            + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t"
+            + bandwidthBurst + "\t" + bandwidthObserved + "\t"
+            + (platform != null && platform.length() > 0
+            ? new String(platform.getBytes(), "US-ASCII") : "\\N")
+            + "\t" + this.dateTimeFormat.format(published) + "\t"
+            + (uptime >= 0 ? uptime : "\\N") + "\t"
+            + (extraInfoDigest != null ? extraInfoDigest : "\\N")
+            + "\t");
+        this.descriptorOut.write(PGbytea.toPGString(rawDescriptor).
+            replaceAll("\\\\", "\\\\\\\\") + "\n");
+      } catch (UnsupportedEncodingException e) {
+        // US-ASCII is supported for sure
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not write server "
+            + "descriptor to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write server "
+            + "descriptor to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      }
+    }
+  }
+
+  /**
+   * Insert extra-info descriptor into database.
+   */
+  public void addExtraInfoDescriptor(String extraInfoDigest,
+      String nickname, String fingerprint, long published,
+      byte[] rawDescriptor, List<String> bandwidthHistoryLines) {
+    if (this.importIntoDatabase) {
+      try {
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        this.psEs.setString(1, extraInfoDigest);
+        ResultSet rs = psEs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) == 0) {
+          this.psE.clearParameters();
+          this.psE.setString(1, extraInfoDigest);
+          this.psE.setString(2, nickname);
+          this.psE.setString(3, fingerprint);
+          this.psE.setTimestamp(4, new Timestamp(published), cal);
+          this.psE.setBytes(5, rawDescriptor);
+          this.psE.executeUpdate();
+          resCount++;
+          if (resCount % autoCommitCount == 0)  {
+            this.conn.commit();
+          }
+        }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add extra-info "
+            + "descriptor.  We won't make any further SQL requests in "
+            + "this execution.", e);
+        this.importIntoDatabase = false;
+      }
+    }
+    if (this.writeRawImportFiles) {
+      try {
+        if (this.extrainfoOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.extrainfoOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/extrainfo.sql"));
+          this.extrainfoOut.write(" COPY extrainfo (extrainfo, nickname, "
+              + "fingerprint, published, rawdesc) FROM stdin;\n");
+        }
+        this.extrainfoOut.write(extraInfoDigest.toLowerCase() + "\t"
+            + nickname + "\t" + fingerprint.toLowerCase() + "\t"
+            + this.dateTimeFormat.format(published) + "\t");
+        this.extrainfoOut.write(PGbytea.toPGString(rawDescriptor).
+            replaceAll("\\\\", "\\\\\\\\") + "\n");
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write extra-info "
+            + "descriptor to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not write extra-info "
+            + "descriptor to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      }
+    }
+    if (!bandwidthHistoryLines.isEmpty()) {
+      this.addBandwidthHistory(fingerprint.toLowerCase(), published,
+          bandwidthHistoryLines);
+    }
+  }
+
+  private static class BigIntArray implements java.sql.Array {
+
+    private final String stringValue;
+
+    public BigIntArray(long[] array, int offset) {
+      if (array == null) {
+        this.stringValue = "[-1:-1]={0}";
+      } else {
+        StringBuilder sb = new StringBuilder("[" + offset + ":"
+            + (offset + array.length - 1) + "]={");
+        for (int i = 0; i < array.length; i++) {
+          sb.append((i > 0 ? "," : "") + array[i]);
+        }
+        sb.append('}');
+        this.stringValue = sb.toString();
+      }
+    }
+
+    public String toString() {
+      return stringValue;
+    }
+
+    public String getBaseTypeName() {
+      return "int8";
+    }
+
+    /* The other methods are never called; no need to implement them. */
+    public void free() {
+      throw new UnsupportedOperationException();
+    }
+    public Object getArray() {
+      throw new UnsupportedOperationException();
+    }
+    public Object getArray(long index, int count) {
+      throw new UnsupportedOperationException();
+    }
+    public Object getArray(long index, int count,
+        Map<String, Class<?>> map) {
+      throw new UnsupportedOperationException();
+    }
+    public Object getArray(Map<String, Class<?>> map) {
+      throw new UnsupportedOperationException();
+    }
+    public int getBaseType() {
+      throw new UnsupportedOperationException();
+    }
+    public ResultSet getResultSet() {
+      throw new UnsupportedOperationException();
+    }
+    public ResultSet getResultSet(long index, int count) {
+      throw new UnsupportedOperationException();
+    }
+    public ResultSet getResultSet(long index, int count,
+        Map<String, Class<?>> map) {
+      throw new UnsupportedOperationException();
+    }
+    public ResultSet getResultSet(Map<String, Class<?>> map) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public void addBandwidthHistory(String fingerprint, long published,
+      List<String> bandwidthHistoryStrings) {
+
+    /* Split history lines by date and rewrite them so that the date
+     * comes first. */
+    SortedSet<String> historyLinesByDate = new TreeSet<String>();
+    for (String bandwidthHistoryString : bandwidthHistoryStrings) {
+      String[] parts = bandwidthHistoryString.split(" ");
+      if (parts.length != 6) {
+        this.logger.finer("Bandwidth history line does not have expected "
+            + "number of elements. Ignoring this line.");
+        continue;
+      }
+      long intervalLength = 0L;
+      try {
+        intervalLength = Long.parseLong(parts[3].substring(1));
+      } catch (NumberFormatException e) {
+        this.logger.fine("Bandwidth history line does not have valid "
+            + "interval length '" + parts[3] + " " + parts[4] + "'. "
+            + "Ignoring this line.");
+        continue;
+      }
+      if (intervalLength != 900L) {
+        this.logger.fine("Bandwidth history line does not consist of "
+            + "15-minute intervals. Ignoring this line.");
+        continue;
+      }
+      String type = parts[0];
+      String intervalEndTime = parts[1] + " " + parts[2];
+      long intervalEnd, dateStart;
+      try {
+        intervalEnd = dateTimeFormat.parse(intervalEndTime).getTime();
+        dateStart = dateTimeFormat.parse(parts[1] + " 00:00:00").
+            getTime();
+      } catch (ParseException e) {
+        this.logger.fine("Parse exception while parsing timestamp in "
+            + "bandwidth history line. Ignoring this line.");
+        continue;
+      }
+      if (Math.abs(published - intervalEnd) >
+          7L * 24L * 60L * 60L * 1000L) {
+        this.logger.fine("Extra-info descriptor publication time "
+            + dateTimeFormat.format(published) + " and last interval "
+            + "time " + intervalEndTime + " in " + type + " line differ "
+            + "by more than 7 days! Not adding this line!");
+        continue;
+      }
+      long currentIntervalEnd = intervalEnd;
+      StringBuilder sb = new StringBuilder();
+      String[] values = parts[5].split(",");
+      SortedSet<String> newHistoryLines = new TreeSet<String>();
+      try {
+        for (int i = values.length - 1; i >= -1; i--) {
+          if (i == -1 || currentIntervalEnd < dateStart) {
+            sb.insert(0, intervalEndTime + " " + type + " ("
+                + intervalLength + " s) ");
+            sb.setLength(sb.length() - 1);
+            String historyLine = sb.toString();
+            newHistoryLines.add(historyLine);
+            sb = new StringBuilder();
+            dateStart -= 24L * 60L * 60L * 1000L;
+            intervalEndTime = dateTimeFormat.format(currentIntervalEnd);
+          }
+          if (i == -1) {
+            break;
+          }
+          Long.parseLong(values[i]);
+          sb.insert(0, values[i] + ",");
+          currentIntervalEnd -= intervalLength * 1000L;
+        }
+      } catch (NumberFormatException e) {
+        this.logger.fine("Number format exception while parsing "
+            + "bandwidth history line. Ignoring this line.");
+        continue;
+      }
+      historyLinesByDate.addAll(newHistoryLines);
+    }
+
+    /* Add split history lines to database. */
+    String lastDate = null;
+    historyLinesByDate.add("EOL");
+    long[] readArray = null, writtenArray = null, dirreadArray = null,
+        dirwrittenArray = null;
+    int readOffset = 0, writtenOffset = 0, dirreadOffset = 0,
+        dirwrittenOffset = 0;
+    for (String historyLine : historyLinesByDate) {
+      String[] parts = historyLine.split(" ");
+      String currentDate = parts[0];
+      if (lastDate != null && (historyLine.equals("EOL") ||
+          !currentDate.equals(lastDate))) {
+        BigIntArray readIntArray = new BigIntArray(readArray,
+            readOffset);
+        BigIntArray writtenIntArray = new BigIntArray(writtenArray,
+            writtenOffset);
+        BigIntArray dirreadIntArray = new BigIntArray(dirreadArray,
+            dirreadOffset);
+        BigIntArray dirwrittenIntArray = new BigIntArray(dirwrittenArray,
+            dirwrittenOffset);
+        if (this.importIntoDatabase) {
+          try {
+            long dateMillis = dateTimeFormat.parse(lastDate
+                + " 00:00:00").getTime();
+            this.addDateToScheduledUpdates(dateMillis);
+            this.csH.setString(1, fingerprint);
+            this.csH.setDate(2, new java.sql.Date(dateMillis));
+            this.csH.setArray(3, readIntArray);
+            this.csH.setArray(4, writtenIntArray);
+            this.csH.setArray(5, dirreadIntArray);
+            this.csH.setArray(6, dirwrittenIntArray);
+            this.csH.addBatch();
+            rhsCount++;
+            if (rhsCount % autoCommitCount == 0)  {
+              this.csH.executeBatch();
+            }
+          } catch (SQLException e) {
+            this.logger.log(Level.WARNING, "Could not insert bandwidth "
+                + "history line into database.  We won't make any "
+                + "further SQL requests in this execution.", e);
+            this.importIntoDatabase = false;
+          } catch (ParseException e) {
+            this.logger.log(Level.WARNING, "Could not insert bandwidth "
+                + "history line into database.  We won't make any "
+                + "further SQL requests in this execution.", e);
+            this.importIntoDatabase = false;
+          }
+        }
+        if (this.writeRawImportFiles) {
+          try {
+            if (this.bwhistOut == null) {
+              new File(rawFilesDirectory).mkdirs();
+              this.bwhistOut = new BufferedWriter(new FileWriter(
+                  rawFilesDirectory + "/bwhist.sql"));
+            }
+            this.bwhistOut.write("SELECT insert_bwhist('" + fingerprint
+                + "','" + lastDate + "','" + readIntArray.toString()
+                + "','" + writtenIntArray.toString() + "','"
+                + dirreadIntArray.toString() + "','"
+                + dirwrittenIntArray.toString() + "');\n");
+          } catch (IOException e) {
+            this.logger.log(Level.WARNING, "Could not write bandwidth "
+                + "history to raw database import file.  We won't make "
+                + "any further attempts to write raw import files in "
+                + "this execution.", e);
+            this.writeRawImportFiles = false;
+          }
+        }
+        readArray = writtenArray = dirreadArray = dirwrittenArray = null;
+      }
+      if (historyLine.equals("EOL")) {
+        break;
+      }
+      long lastIntervalTime;
+      try {
+        lastIntervalTime = dateTimeFormat.parse(parts[0] + " "
+            + parts[1]).getTime() - dateTimeFormat.parse(parts[0]
+            + " 00:00:00").getTime();
+      } catch (ParseException e) {
+        continue;
+      }
+      String[] stringValues = parts[5].split(",");
+      long[] longValues = new long[stringValues.length];
+      for (int i = 0; i < longValues.length; i++) {
+        longValues[i] = Long.parseLong(stringValues[i]);
+      }
+
+      int offset = (int) (lastIntervalTime / (15L * 60L * 1000L))
+          - longValues.length + 1;
+      String type = parts[2];
+      if (type.equals("read-history")) {
+        readArray = longValues;
+        readOffset = offset;
+      } else if (type.equals("write-history")) {
+        writtenArray = longValues;
+        writtenOffset = offset;
+      } else if (type.equals("dirreq-read-history")) {
+        dirreadArray = longValues;
+        dirreadOffset = offset;
+      } else if (type.equals("dirreq-write-history")) {
+        dirwrittenArray = longValues;
+        dirwrittenOffset = offset;
+      }
+      lastDate = currentDate;
+    }
+  }
+
+  /**
+   * Insert network status consensus into database.
+   */
+  public void addConsensus(long validAfter, byte[] rawDescriptor) {
+    if (this.importIntoDatabase) {
+      try {
+        this.addDateToScheduledUpdates(validAfter);
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        Timestamp validAfterTimestamp = new Timestamp(validAfter);
+        this.psCs.setTimestamp(1, validAfterTimestamp, cal);
+        ResultSet rs = psCs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) == 0) {
+          this.psC.clearParameters();
+          this.psC.setTimestamp(1, validAfterTimestamp, cal);
+          this.psC.setBytes(2, rawDescriptor);
+          this.psC.executeUpdate();
+          rcsCount++;
+          if (rcsCount % autoCommitCount == 0)  {
+            this.conn.commit();
+          }
+        }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add network status "
+            + "consensus.  We won't make any further SQL requests in "
+            + "this execution.", e);
+        this.importIntoDatabase = false;
+      }
+    }
+    if (this.writeRawImportFiles) {
+      try {
+        if (this.consensusOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.consensusOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/consensus.sql"));
+          this.consensusOut.write(" COPY consensus (validafter, rawdesc) "
+              + "FROM stdin;\n");
+        }
+        String validAfterString = this.dateTimeFormat.format(validAfter);
+        this.consensusOut.write(validAfterString + "\t");
+        this.consensusOut.write(PGbytea.toPGString(rawDescriptor).
+            replaceAll("\\\\", "\\\\\\\\") + "\n");
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "consensus to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "consensus to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      }
+    }
+  }
+
+  /**
+   * Insert network status vote into database.
+   */
+  public void addVote(long validAfter, String dirSource,
+      byte[] rawDescriptor) {
+    if (this.importIntoDatabase) {
+      try {
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        Timestamp validAfterTimestamp = new Timestamp(validAfter);
+        this.psVs.setTimestamp(1, validAfterTimestamp, cal);
+        this.psVs.setString(2, dirSource);
+        ResultSet rs = psVs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) == 0) {
+          this.psV.clearParameters();
+          this.psV.setTimestamp(1, validAfterTimestamp, cal);
+          this.psV.setString(2, dirSource);
+          this.psV.setBytes(3, rawDescriptor);
+          this.psV.executeUpdate();
+          rvsCount++;
+          if (rvsCount % autoCommitCount == 0)  {
+            this.conn.commit();
+          }
+        }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add network status "
+            + "vote.  We won't make any further SQL requests in this "
+            + "execution.", e);
+        this.importIntoDatabase = false;
+      }
+    }
+    if (this.writeRawImportFiles) {
+      try {
+        if (this.voteOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.voteOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/vote.sql"));
+          this.voteOut.write(" COPY vote (validafter, dirsource, "
+              + "rawdesc) FROM stdin;\n");
+        }
+        String validAfterString = this.dateTimeFormat.format(validAfter);
+        this.voteOut.write(validAfterString + "\t" + dirSource + "\t");
+        this.voteOut.write(PGbytea.toPGString(rawDescriptor).
+            replaceAll("\\\\", "\\\\\\\\") + "\n");
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "vote to raw database import file.  We won't make any "
+            + "further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "vote to raw database import file.  We won't make any "
+            + "further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      }
+    }
+  }
+
+  /**
+   * Insert a conn-bi-direct stats string into the database.
+   */
+  public void addConnBiDirect(String source, String statsEnd,
+      long seconds, long below, long read, long write, long both) {
+    long statsEndTime = 0L;
+    try {
+      statsEndTime = this.dateTimeFormat.parse(statsEnd).getTime();
+    } catch (ParseException e) {
+      this.logger.log(Level.WARNING, "Could not add conn-bi-direct "
+          + "stats string with interval ending '" + statsEnd + "'.", e);
+      return;
+    }
+    if (this.importIntoDatabase) {
+      try {
+        this.addDateToScheduledUpdates(statsEndTime);
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        Timestamp statsEndTimestamp = new Timestamp(statsEndTime);
+        this.psBs.setString(1, source);
+        this.psBs.setTimestamp(2, statsEndTimestamp, cal);
+        ResultSet rs = psBs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) == 0) {
+          this.psB.clearParameters();
+          this.psB.setString(1, source);
+          this.psB.setTimestamp(2, statsEndTimestamp, cal);
+          this.psB.setLong(3, seconds);
+          this.psB.setLong(4, below);
+          this.psB.setLong(5, read);
+          this.psB.setLong(6, write);
+          this.psB.setLong(7, both);
+          this.psB.executeUpdate();
+          rbsCount++;
+          if (rbsCount % autoCommitCount == 0)  {
+            this.conn.commit();
+          }
+        }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add conn-bi-direct "
+            + "stats string. We won't make any further SQL requests in "
+            + "this execution.", e);
+        this.importIntoDatabase = false;
+      }
+    }
+    if (this.writeRawImportFiles) {
+      try {
+        if (this.connBiDirectOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.connBiDirectOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/connbidirect.sql"));
+          this.connBiDirectOut.write(" COPY connbidirect (source, "
+              + "statsend, seconds, belownum, readnum, writenum, "
+              + "bothnum) FROM stdin;\n");
+        }
+        this.connBiDirectOut.write(source + "\t" + statsEnd + "\t"
+            + seconds + "\t" + below + "\t" + read + "\t" + write + "\t"
+            + both + "\n");
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write conn-bi-direct "
+            + "stats string to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      }
+    }
+  }
+
+  /**
+   * Adds observations on the number of directory requests by country as
+   * seen on a directory at a given date to the database.
+   */
+  public void addDirReqStats(String source, String statsEnd, long seconds,
+      Map<String, String> dirReqsPerCountry) {
+    long statsEndTime = 0L;
+    try {
+      statsEndTime = this.dateTimeFormat.parse(statsEnd).getTime();
+    } catch (ParseException e) {
+      this.logger.log(Level.WARNING, "Could not add dirreq stats with "
+          + "interval ending '" + statsEnd + "'.", e);
+      return;
+    }
+    if (this.importIntoDatabase) {
+      try {
+        this.addDateToScheduledUpdates(statsEndTime);
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        Timestamp statsEndTimestamp = new Timestamp(statsEndTime);
+        this.psQs.setString(1, source);
+        this.psQs.setTimestamp(2, statsEndTimestamp, cal);
+        ResultSet rs = psQs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) == 0) {
+          for (Map.Entry<String, String> e :
+              dirReqsPerCountry.entrySet()) {
+            this.psQ.clearParameters();
+            this.psQ.setString(1, source);
+            this.psQ.setTimestamp(2, statsEndTimestamp, cal);
+            this.psQ.setLong(3, seconds);
+            this.psQ.setString(4, e.getKey());
+            this.psQ.setLong(5, Long.parseLong(e.getValue()));
+            this.psQ.executeUpdate();
+            rqsCount++;
+            if (rqsCount % autoCommitCount == 0)  {
+              this.conn.commit();
+            }
+          }
+        }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add dirreq stats.  We "
+            + "won't make any further SQL requests in this execution.",
+            e);
+        this.importIntoDatabase = false;
+      }
+    }
+    if (this.writeRawImportFiles) {
+      try {
+        if (this.dirReqOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.dirReqOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/dirreq_stats.sql"));
+          this.dirReqOut.write(" COPY dirreq_stats (source, statsend, "
+              + "seconds, country, requests) FROM stdin;\n");
+        }
+        for (Map.Entry<String, String> e :
+            dirReqsPerCountry.entrySet()) {
+          this.dirReqOut.write(source + "\t" + statsEnd + "\t" + seconds
+              + "\t" + e.getKey() + "\t" + e.getValue() + "\n");
+        }
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write dirreq stats to "
+            + "raw database import file.  We won't make any further "
+            + "attempts to write raw import files in this execution.", e);
+        this.writeRawImportFiles = false;
+      }
+    }
+  }
+
+  /**
+   * Close the relay descriptor database connection.
+   */
+  public void closeConnection() {
+
+    /* Log stats about imported descriptors. */
+    this.logger.info(String.format("Finished importing relay "
+        + "descriptors: %d consensuses, %d network status entries, %d "
+        + "votes, %d server descriptors, %d extra-info descriptors, %d "
+        + "bandwidth history elements, %d dirreq stats elements, and %d "
+        + "conn-bi-direct stats lines", rcsCount, rrsCount, rvsCount,
+        rdsCount, resCount, rhsCount, rqsCount, rbsCount));
+
+    /* Insert scheduled updates a second time, just in case the refresh
+     * run has started since inserting them the first time in which case
+     * it will miss the data inserted afterwards.  We cannot, however,
+     * insert them only now, because if a Java execution fails at a random
+     * point, we might have added data, but not the corresponding dates to
+     * update statistics. */
+    if (this.importIntoDatabase) {
+      try {
+        for (long dateMillis : this.scheduledUpdates) {
+          this.psU.setDate(1, new java.sql.Date(dateMillis));
+          this.psU.execute();
+        }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add scheduled dates "
+            + "for the next refresh run.", e);
+      }
+    }
+
+    /* Commit any stragglers before closing. */
+    if (this.conn != null) {
+      try {
+        this.csH.executeBatch();
+
+        this.conn.commit();
+      } catch (SQLException e)  {
+        this.logger.log(Level.WARNING, "Could not commit final records to "
+            + "database", e);
+      }
+      try {
+        this.conn.close();
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not close database "
+            + "connection.", e);
+      }
+    }
+
+    /* Close raw import files. */
+    try {
+      if (this.statusentryOut != null) {
+        this.statusentryOut.write("\\.\n");
+        this.statusentryOut.close();
+      }
+      if (this.descriptorOut != null) {
+        this.descriptorOut.write("\\.\n");
+        this.descriptorOut.close();
+      }
+      if (this.extrainfoOut != null) {
+        this.extrainfoOut.write("\\.\n");
+        this.extrainfoOut.close();
+      }
+      if (this.bwhistOut != null) {
+        this.bwhistOut.write("\\.\n");
+        this.bwhistOut.close();
+      }
+      if (this.consensusOut != null) {
+        this.consensusOut.write("\\.\n");
+        this.consensusOut.close();
+      }
+      if (this.voteOut != null) {
+        this.voteOut.write("\\.\n");
+        this.voteOut.close();
+      }
+      if (this.connBiDirectOut != null) {
+        this.connBiDirectOut.write("\\.\n");
+        this.connBiDirectOut.close();
+      }
+    } catch (IOException e) {
+      this.logger.log(Level.WARNING, "Could not close one or more raw "
+          + "database import files.", e);
+    }
+  }
+}
+
diff --git a/src/org/torproject/ernie/cron/RelayDescriptorParser.java b/src/org/torproject/ernie/cron/RelayDescriptorParser.java
index 24b512b..d02022a 100644
--- a/src/org/torproject/ernie/cron/RelayDescriptorParser.java
+++ b/src/org/torproject/ernie/cron/RelayDescriptorParser.java
@@ -17,6 +17,12 @@ import org.apache.commons.codec.binary.*;
  */
 public class RelayDescriptorParser {
 
+  /**
+   * Relay descriptor database importer that stores relay descriptor
+   * contents for later evaluation.
+   */
+  private RelayDescriptorDatabaseImporter rddi;
+
   private ConsensusHealthChecker chc;
 
   /**
@@ -29,8 +35,10 @@ public class RelayDescriptorParser {
   /**
    * Initializes this class.
    */
-  public RelayDescriptorParser(ConsensusHealthChecker chc) {
+  public RelayDescriptorParser(ConsensusHealthChecker chc,
+      RelayDescriptorDatabaseImporter rddi) {
     this.chc = chc;
+    this.rddi = rddi;
 
     /* Initialize logger. */
     this.logger = Logger.getLogger(RelayDescriptorParser.class.getName());
@@ -60,31 +68,256 @@ public class RelayDescriptorParser {
         // time to see when we switch from hourly to half-hourly
         // consensuses
         boolean isConsensus = true;
-        String validAfterTime = null;
-        String dirSource = null;
+        String validAfterTime = null, nickname = null,
+            relayIdentity = null, serverDesc = null, version = null,
+            ports = null;
+        String fingerprint = null, dirSource = null, address = null;
+        long validAfter = -1L, published = -1L, bandwidth = -1L,
+            orPort = 0L, dirPort = 0L;
+        SortedSet<String> relayFlags = null;
+        StringBuilder rawStatusEntry = null;
         while ((line = br.readLine()) != null) {
           if (line.equals("vote-status vote")) {
             isConsensus = false;
           } else if (line.startsWith("valid-after ")) {
             validAfterTime = line.substring("valid-after ".length());
+            validAfter = parseFormat.parse(validAfterTime).getTime();
           } else if (line.startsWith("dir-source ")) {
             dirSource = line.split(" ")[2];
-            break;
+          } else if (line.startsWith("fingerprint ")) {
+            fingerprint = line.split(" ")[1];
+          } else if (line.startsWith("r ")) {
+            if (isConsensus && relayIdentity != null &&
+                this.rddi != null) {
+              byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
+              this.rddi.addStatusEntry(validAfter, nickname,
+                  relayIdentity, serverDesc, published, address, orPort,
+                  dirPort, relayFlags, version, bandwidth, ports,
+                  rawDescriptor);
+              relayFlags = null;
+              version = null;
+              bandwidth = -1L;
+              ports = null;
+            }
+            rawStatusEntry = new StringBuilder(line + "\n");
+            String[] parts = line.split(" ");
+            if (parts.length < 9) {
+              this.logger.log(Level.WARNING, "Could not parse r line '"
+                  + line + "' in descriptor. Skipping.");
+              break;
+            }
+            String publishedTime = parts[4] + " " + parts[5];
+            nickname = parts[1];
+            relayIdentity = Hex.encodeHexString(
+                Base64.decodeBase64(parts[2] + "=")).
+                toLowerCase();
+            serverDesc = Hex.encodeHexString(Base64.decodeBase64(
+                parts[3] + "=")).toLowerCase();
+            published = parseFormat.parse(parts[4] + " " + parts[5]).
+                getTime();
+            address = parts[6];
+            orPort = Long.parseLong(parts[7]);
+            dirPort = Long.parseLong(parts[8]);
+          } else if (line.startsWith("s ") || line.equals("s")) {
+            rawStatusEntry.append(line + "\n");
+            relayFlags = new TreeSet<String>();
+            if (line.length() > 2) {
+              for (String flag : line.substring(2).split(" ")) {
+                relayFlags.add(flag);
+              }
+            }
+          } else if (line.startsWith("v ")) {
+            rawStatusEntry.append(line + "\n");
+            version = line.substring(2);
+          } else if (line.startsWith("w ")) {
+            rawStatusEntry.append(line + "\n");
+            String[] parts = line.split(" ");
+            for (String part : parts) {
+              if (part.startsWith("Bandwidth=")) {
+                bandwidth = Long.parseLong(part.substring(
+                    "Bandwidth=".length()));
+              }
+            }
+          } else if (line.startsWith("p ")) {
+            rawStatusEntry.append(line + "\n");
+            ports = line.substring(2);
           }
         }
         if (isConsensus) {
           if (this.chc != null) {
             this.chc.processConsensus(validAfterTime, data);
           }
+          if (this.rddi != null) {
+            this.rddi.addConsensus(validAfter, data);
+            if (relayIdentity != null) {
+              byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
+              this.rddi.addStatusEntry(validAfter, nickname,
+                  relayIdentity, serverDesc, published, address, orPort,
+                  dirPort, relayFlags, version, bandwidth, ports,
+                  rawDescriptor);
+            }
+          }
         } else {
           if (this.chc != null) {
             this.chc.processVote(validAfterTime, dirSource, data);
           }
+          if (this.rddi != null) {
+            this.rddi.addVote(validAfter, dirSource, data);
+          }
+        }
+      } else if (line.startsWith("router ")) {
+        String platformLine = null, publishedLine = null,
+            bandwidthLine = null, extraInfoDigest = null,
+            relayIdentifier = null;
+        String[] parts = line.split(" ");
+        String nickname = parts[1];
+        String address = parts[2];
+        int orPort = Integer.parseInt(parts[3]);
+        int dirPort = Integer.parseInt(parts[4]);
+        long published = -1L, uptime = -1L;
+        while ((line = br.readLine()) != null) {
+          if (line.startsWith("platform ")) {
+            platformLine = line;
+          } else if (line.startsWith("published ")) {
+            String publishedTime = line.substring("published ".length());
+            published = parseFormat.parse(publishedTime).getTime();
+          } else if (line.startsWith("opt fingerprint") ||
+              line.startsWith("fingerprint")) {
+            relayIdentifier = line.substring(line.startsWith("opt ") ?
+                "opt fingerprint".length() : "fingerprint".length()).
+                replaceAll(" ", "").toLowerCase();
+          } else if (line.startsWith("bandwidth ")) {
+            bandwidthLine = line;
+          } else if (line.startsWith("opt extra-info-digest ") ||
+              line.startsWith("extra-info-digest ")) {
+            extraInfoDigest = line.startsWith("opt ") ?
+                line.split(" ")[2].toLowerCase() :
+                line.split(" ")[1].toLowerCase();
+          } else if (line.startsWith("uptime ")) {
+            uptime = Long.parseLong(line.substring("uptime ".length()));
+          }
+        }
+        String ascii = new String(data, "US-ASCII");
+        String startToken = "router ";
+        String sigToken = "\nrouter-signature\n";
+        int start = ascii.indexOf(startToken);
+        int sig = ascii.indexOf(sigToken) + sigToken.length();
+        String digest = null;
+        if (start >= 0 || sig >= 0 || sig > start) {
+          byte[] forDigest = new byte[sig - start];
+          System.arraycopy(data, start, forDigest, 0, sig - start);
+          digest = DigestUtils.shaHex(forDigest);
+        }
+        if (this.rddi != null && digest != null) {
+          String[] bwParts = bandwidthLine.split(" ");
+          long bandwidthAvg = Long.parseLong(bwParts[1]);
+          long bandwidthBurst = Long.parseLong(bwParts[2]);
+          long bandwidthObserved = Long.parseLong(bwParts[3]);
+          String platform = platformLine.substring("platform ".length());
+          this.rddi.addServerDescriptor(digest, nickname, address, orPort,
+              dirPort, relayIdentifier, bandwidthAvg, bandwidthBurst,
+              bandwidthObserved, platform, published, uptime,
+              extraInfoDigest, data);
+        }
+      } else if (line.startsWith("extra-info ")) {
+        String nickname = line.split(" ")[1];
+        long published = -1L;
+        String dir = line.split(" ")[2];
+        String statsEnd = null;
+        long seconds = -1L;
+        List<String> bandwidthHistory = new ArrayList<String>();
+        boolean skip = false;
+        while ((line = br.readLine()) != null) {
+          if (line.startsWith("published ")) {
+            String publishedTime = line.substring("published ".length());
+            published = parseFormat.parse(publishedTime).getTime();
+          } else if (line.startsWith("read-history ") ||
+              line.startsWith("write-history ") ||
+              line.startsWith("dirreq-read-history ") ||
+              line.startsWith("dirreq-write-history ")) {
+            bandwidthHistory.add(line);
+          } else if (line.startsWith("dirreq-stats-end ")) {
+            String[] parts = line.split(" ");
+            if (parts.length < 5) {
+              this.logger.warning("Could not parse dirreq-stats-end "
+                  + "line '" + line + "' in descriptor. Skipping.");
+              break;
+            }
+            statsEnd = parts[1] + " " + parts[2];
+            seconds = Long.parseLong(parts[3].substring(1));
+          } else if (line.startsWith("dirreq-v3-reqs ")
+              && line.length() > "dirreq-v3-reqs ".length()) {
+            if (this.rddi != null) {
+              try {
+                int allUsers = 0;
+                Map<String, String> obs = new HashMap<String, String>();
+                String[] parts = line.substring("dirreq-v3-reqs ".
+                    length()).split(",");
+                for (String p : parts) {
+                  String country = p.substring(0, 2);
+                  int users = Integer.parseInt(p.substring(3)) - 4;
+                  allUsers += users;
+                  obs.put(country, "" + users);
+                }
+                obs.put("zy", "" + allUsers);
+                this.rddi.addDirReqStats(dir, statsEnd, seconds, obs);
+              } catch (NumberFormatException e) {
+                this.logger.log(Level.WARNING, "Could not parse "
+                    + "dirreq-v3-reqs line '" + line + "' in descriptor. "
+                    + "Skipping.", e);
+                break;
+              }
+            }
+          } else if (line.startsWith("conn-bi-direct ")) {
+            if (this.rddi != null) {
+              String[] parts = line.split(" ");
+              if (parts.length == 6 &&
+                  parts[5].split(",").length == 4) {
+                try {
+                  String connBiDirectStatsEnd = parts[1] + " " + parts[2];
+                  long connBiDirectSeconds = Long.parseLong(parts[3].
+                      substring(1));
+                  String[] parts2 = parts[5].split(",");
+                  long below = Long.parseLong(parts2[0]);
+                  long read = Long.parseLong(parts2[1]);
+                  long write = Long.parseLong(parts2[2]);
+                  long both = Long.parseLong(parts2[3]);
+                  this.rddi.addConnBiDirect(dir, connBiDirectStatsEnd,
+                      connBiDirectSeconds, below, read, write, both);
+                } catch (NumberFormatException e) {
+                  this.logger.log(Level.WARNING, "Number format "
+                      + "exception while parsing conn-bi-direct stats "
+                      + "string '" + line + "'. Skipping.", e);
+                }
+              } else {
+                this.logger.warning("Skipping invalid conn-bi-direct "
+                    + "stats string '" + line + "'.");
+              }
+            }
+          }
+        }
+        String ascii = new String(data, "US-ASCII");
+        String startToken = "extra-info ";
+        String sigToken = "\nrouter-signature\n";
+        String digest = null;
+        int start = ascii.indexOf(startToken);
+        int sig = ascii.indexOf(sigToken) + sigToken.length();
+        if (start >= 0 || sig >= 0 || sig > start) {
+          byte[] forDigest = new byte[sig - start];
+          System.arraycopy(data, start, forDigest, 0, sig - start);
+          digest = DigestUtils.shaHex(forDigest);
+        }
+        if (this.rddi != null && digest != null) {
+          this.rddi.addExtraInfoDescriptor(digest, nickname,
+              dir.toLowerCase(), published, data, bandwidthHistory);
         }
       }
     } catch (IOException e) {
       this.logger.log(Level.WARNING, "Could not parse descriptor. "
           + "Skipping.", e);
+    } catch (ParseException e) {
+      this.logger.log(Level.WARNING, "Could not parse descriptor. "
+          + "Skipping.", e);
     }
   }
 }



More information about the tor-commits mailing list