[or-cvs] [metrics-db/master] Improve database schema and speed up importing.

karsten at torproject.org karsten at torproject.org
Mon Sep 13 19:04:39 UTC 2010


Author: Karsten Loesing <karsten.loesing at gmx.net>
Date: Fri, 10 Sep 2010 11:16:11 +0200
Subject: Improve database schema and speed up importing.
Commit: fe9d66badeee6a1ccc4b267d80ec5eb9da12fd3b

Improve database schema by adding more relay descriptor parts to the
database.

Speed up importing by exporting to to files that can be read by psql's
\copy command.
---
 build.xml                                          |    2 +-
 config                                             |    8 +
 db/tordir.sql                                      |   24 ++
 src/org/torproject/ernie/db/Configuration.java     |   16 +-
 src/org/torproject/ernie/db/Main.java              |    8 +-
 .../ernie/db/RelayDescriptorDatabaseImporter.java  |  371 +++++++++++++++-----
 .../torproject/ernie/db/RelayDescriptorParser.java |   88 ++++-
 7 files changed, 399 insertions(+), 118 deletions(-)

diff --git a/build.xml b/build.xml
index 49376e0..bddac1d 100644
--- a/build.xml
+++ b/build.xml
@@ -36,7 +36,7 @@
            destdir="${classes}"
            excludes="org/torproject/ernie/web/"
            debug="true" debuglevel="lines,source"
-           classpath="lib/commons-codec-1.4.jar;lib/commons-compress-1.0.jar"/>
+           classpath="lib/commons-codec-1.4.jar;lib/commons-compress-1.0.jar;lib/postgresql-8.4-701.jdbc4.jar"/>
   </target>
   <target name="run" depends="compile">
     <java classpath="${classes};lib/commons-codec-1.4.jar:lib/commons-compress-1.0.jar;lib/postgresql-8.4-701.jdbc4.jar"
diff --git a/config b/config
index 695e268..64ce8ff 100644
--- a/config
+++ b/config
@@ -86,6 +86,14 @@
 ## JDBC string for relay descriptor database
 #RelayDescriptorDatabaseJDBC jdbc:postgresql://localhost/tordir?user=ernie&password=password
 #
+## Write relay descriptors to raw text files for importing them into a
+## database using PostgreSQL's \copy command
+#WriteRelayDescriptorsRawFiles 0
+#
+## Relative path to directory to write raw text files; note that existing
+## files will be overwritten!
+#RelayDescriptorRawFilesDirectory pg-import/
+#
 ## Write statistics about the current consensus and votes to the
 ## website
 #WriteConsensusHealth 0
diff --git a/db/tordir.sql b/db/tordir.sql
index a421730..af4f64e 100644
--- a/db/tordir.sql
+++ b/db/tordir.sql
@@ -5,6 +5,7 @@
 -- Contains all of the descriptors published by routers.
 CREATE TABLE descriptor (
     descriptor CHARACTER(40) NOT NULL,
+    nickname CHARACTER VARYING(19) NOT NULL,
     address CHARACTER VARYING(15) NOT NULL,
     orport INTEGER NOT NULL,
     dirport INTEGER NOT NULL,
@@ -14,15 +15,34 @@ CREATE TABLE descriptor (
     platform CHARACTER VARYING(256),
     published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
     uptime BIGINT,
+    extrainfo CHARACTER(40),
+    rawdesc BYTEA NOT NULL,
     CONSTRAINT descriptor_pkey PRIMARY KEY (descriptor)
 );
 
+-- TABLE extrainfo
+-- Contains all of the extra-info descriptors published by the routers.
+CREATE TABLE extrainfo (
+    extrainfo CHARACTER(40) NOT NULL,
+    nickname CHARACTER VARYING(19) NOT NULL,
+    fingerprint CHARACTER(40) NOT NULL,
+    published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+    rawdesc BYTEA NOT NULL,
+    CONSTRAINT extrainfo_pkey PRIMARY KEY (extrainfo)
+);
+
 -- TABLE statusentry
 -- Contains all of the consensuses published by the directories. Each
 -- statusentry references a valid descriptor.
 CREATE TABLE statusentry (
     validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+    nickname CHARACTER VARYING(19) NOT NULL,
+    fingerprint CHARACTER(40) NOT NULL,
     descriptor CHARACTER(40) NOT NULL,
+    published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+    address CHARACTER VARYING(15) NOT NULL,
+    orport INTEGER NOT NULL,
+    dirport INTEGER NOT NULL,
     isauthority BOOLEAN DEFAULT FALSE NOT NULL,
     isbadexit BOOLEAN DEFAULT FALSE NOT NULL,
     isbaddirectory BOOLEAN DEFAULT FALSE NOT NULL,
@@ -37,6 +57,10 @@ CREATE TABLE statusentry (
     isvalid BOOLEAN DEFAULT FALSE NOT NULL,
     isv2dir BOOLEAN DEFAULT FALSE NOT NULL,
     isv3dir BOOLEAN DEFAULT FALSE NOT NULL,
+    version CHARACTER VARYING(50),
+    bandwidth BIGINT,
+    ports TEXT,
+    rawdesc BYTEA NOT NULL,
     CONSTRAINT statusentry_pkey PRIMARY KEY (validafter, descriptor)
 );
 
diff --git a/src/org/torproject/ernie/db/Configuration.java b/src/org/torproject/ernie/db/Configuration.java
index f7910e7..5fb5e0e 100644
--- a/src/org/torproject/ernie/db/Configuration.java
+++ b/src/org/torproject/ernie/db/Configuration.java
@@ -39,6 +39,8 @@ public class Configuration {
   private boolean writeRelayDescriptorDatabase = false;
   private String relayDescriptorDatabaseJdbc =
       "jdbc:postgresql://localhost/tordir?user=ernie&password=password";
+  private boolean writeRelayDescriptorsRawFiles = false;
+  private String relayDescriptorRawFilesDirectory = "pg-import/";
   private boolean writeSanitizedBridges = false;
   private String sanitizedBridgesWriteDirectory = "sanitized-bridges/";
   private boolean importSanitizedBridges = false;
@@ -141,6 +143,11 @@ public class Configuration {
               line.split(" ")[1]) != 0;
         } else if (line.startsWith("RelayDescriptorDatabaseJDBC")) {
           this.relayDescriptorDatabaseJdbc = line.split(" ")[1];
+        } else if (line.startsWith("WriteRelayDescriptorsRawFiles")) {
+          this.writeRelayDescriptorsRawFiles = Integer.parseInt(
+              line.split(" ")[1]) != 0;
+        } else if (line.startsWith("RelayDescriptorRawFilesDirectory")) {
+          this.relayDescriptorRawFilesDirectory = line.split(" ")[1];
         } else if (line.startsWith("WriteSanitizedBridges")) {
           this.writeSanitizedBridges = Integer.parseInt(
               line.split(" ")[1]) != 0;
@@ -249,7 +256,8 @@ public class Configuration {
     if ((this.importCachedRelayDescriptors ||
         this.importDirectoryArchives || this.downloadRelayDescriptors) &&
         !(this.writeDirectoryArchives ||
-        this.writeRelayDescriptorDatabase || this.writeConsensusStats ||
+        this.writeRelayDescriptorDatabase ||
+        this.writeRelayDescriptorsRawFiles || this.writeConsensusStats ||
         this.writeDirreqStats || this.writeBridgeStats ||
         this.writeServerDescriptorStats || this.writeConsensusHealth)) {
       logger.warning("We are configured to import/download relay "
@@ -338,6 +346,12 @@ public class Configuration {
   public String getRelayDescriptorDatabaseJDBC() {
     return this.relayDescriptorDatabaseJdbc;
   }
+  public boolean getWriteRelayDescriptorsRawFiles() {
+    return this.writeRelayDescriptorsRawFiles;
+  }
+  public String getRelayDescriptorRawFilesDirectory() {
+    return this.relayDescriptorRawFilesDirectory;
+  }
   public boolean getWriteSanitizedBridges() {
     return this.writeSanitizedBridges;
   }
diff --git a/src/org/torproject/ernie/db/Main.java b/src/org/torproject/ernie/db/Main.java
index 5cf8d91..f463684 100644
--- a/src/org/torproject/ernie/db/Main.java
+++ b/src/org/torproject/ernie/db/Main.java
@@ -57,9 +57,13 @@ public class Main {
 
     // Prepare writing relay descriptors to database
     RelayDescriptorDatabaseImporter rddi =
-        config.getWriteRelayDescriptorDatabase() ?
+        config.getWriteRelayDescriptorDatabase() ||
+        config.getWriteRelayDescriptorsRawFiles() ?
         new RelayDescriptorDatabaseImporter(
-        config.getRelayDescriptorDatabaseJDBC()) : null;
+        config.getWriteRelayDescriptorDatabase() ?
+        config.getRelayDescriptorDatabaseJDBC() : null,
+        config.getWriteRelayDescriptorsRawFiles() ?
+        config.getRelayDescriptorRawFilesDirectory() : null) : null;
 
     // Prepare relay descriptor parser (only if we are writing stats or
     // directory archives to disk)
diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index 71d25b8..af65743 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -2,9 +2,12 @@
  * See LICENSE for licensing information */
 package org.torproject.ernie.db;
 
+import java.io.*;
 import java.sql.*;
+import java.text.*;
 import java.util.*;
 import java.util.logging.*;
+import org.postgresql.util.*;
 
 /**
  * Parse directory data.
@@ -21,6 +24,7 @@ public final class RelayDescriptorDatabaseImporter {
    * Keep track of the number of records committed before each transaction
    */
   private int rdsCount = 0;
+  private int resCount = 0;
   private int rrsCount = 0;
 
   /**
@@ -35,6 +39,12 @@ public final class RelayDescriptorDatabaseImporter {
   private PreparedStatement psRs;
 
   /**
+   * Prepared statement to check whether a given extra-info descriptor has
+   * been imported into the database before.
+   */
+  private PreparedStatement psEs;
+
+  /**
    * Prepared statement to check whether a given server descriptor has
    * been imported into the database before.
    */
@@ -52,135 +62,282 @@ public final class RelayDescriptorDatabaseImporter {
   private PreparedStatement psD;
 
   /**
+   * Prepared statement to insert an extra-info descriptor into the
+   * database.
+   */
+  private PreparedStatement psE;
+
+  /**
    * Logger for this class.
    */
   private Logger logger;
 
+  private BufferedWriter statusentryOut;
+  private BufferedWriter descriptorOut;
+  private BufferedWriter extrainfoOut;
+
   /**
    * Initialize database importer by connecting to the database and
    * preparing statements.
    */
-  public RelayDescriptorDatabaseImporter(String connectionURL) {
+  public RelayDescriptorDatabaseImporter(String connectionURL,
+      String rawFilesDirectory) {
 
     /* Initialize logger. */
     this.logger = Logger.getLogger(
         RelayDescriptorDatabaseImporter.class.getName());
 
-    try {
-      /* Connect to database. */
-      this.conn = DriverManager.getConnection(connectionURL);
-
-      /* Turn autocommit off */
-      this.conn.setAutoCommit(false);
-
-      /* Prepare statements. */
-      this.psRs = conn.prepareStatement("SELECT COUNT(*) "
-          + "FROM statusentry WHERE validafter = ? AND descriptor = ?");
-      this.psDs = conn.prepareStatement("SELECT COUNT(*) "
-          + "FROM descriptor WHERE descriptor = ?");
-      this.psR = conn.prepareStatement("INSERT INTO statusentry "
-          + "(validafter, descriptor, isauthority, isbadexit, "
-          + "isbaddirectory, isexit, isfast, isguard, ishsdir, isnamed, "
-          + "isstable, isrunning, isunnamed, isvalid, isv2dir, isv3dir) "
-          + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
-      this.psD = conn.prepareStatement("INSERT INTO descriptor "
-          + "(descriptor, address, orport, dirport, bandwidthavg, "
-          + "bandwidthburst, bandwidthobserved, platform, published, "
-          + "uptime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+    if (connectionURL != null) {
+      try {
+        /* Connect to database. */
+        this.conn = DriverManager.getConnection(connectionURL);
 
-    } catch (SQLException e) {
-      this.logger.log(Level.WARNING, "Could not connect to database or "
-          + "prepare statements.", e);
+        /* Turn autocommit off */
+        this.conn.setAutoCommit(false);
+
+        /* Prepare statements. */
+        this.psRs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM statusentry WHERE validafter = ? AND descriptor = ?");
+        this.psDs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM descriptor WHERE descriptor = ?");
+        this.psEs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM extrainfo WHERE extrainfo = ?");
+        this.psR = conn.prepareStatement("INSERT INTO statusentry "
+            + "(validafter, nickname, fingerprint, descriptor, "
+            + "published, address, orport, dirport, isauthority, "
+            + "isbadexit, isbaddirectory, isexit, isfast, isguard, "
+            + "ishsdir, isnamed, isstable, isrunning, isunnamed, "
+            + "isvalid, isv2dir, isv3dir, version, bandwidth, ports, "
+            + "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
+            + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+        this.psD = conn.prepareStatement("INSERT INTO descriptor "
+            + "(descriptor, nickname, address, orport, dirport, "
+            + "bandwidthavg, bandwidthburst, bandwidthobserved, "
+            + "platform, published, uptime, extrainfo, rawdesc) "
+            + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+        this.psE = conn.prepareStatement("INSERT INTO extrainfo "
+            + "(extrainfo, nickname, fingerprint, published, rawdesc) "
+            + "VALUES (?, ?, ?, ?, ?)");
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not connect to database or "
+            + "prepare statements.", e);
+      }
+    }
+
+    if (rawFilesDirectory != null) {
+      try {
+        new File(rawFilesDirectory).mkdirs();
+        this.statusentryOut = new BufferedWriter(new FileWriter(
+            rawFilesDirectory + "/statusentry.sql"));
+        this.descriptorOut = new BufferedWriter(new FileWriter(
+            rawFilesDirectory + "/descriptor.sql"));
+        this.extrainfoOut = new BufferedWriter(new FileWriter(
+            rawFilesDirectory + "/extrainfo.sql"));
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not open raw database "
+            + "import files.", e);
+      }
     }
   }
 
   /**
    * Insert network status consensus entry into database.
    */
-  public void addStatusEntry(long validAfter, String descriptor,
-      SortedSet<String> flags) {
-    if (this.psRs == null || this.psR == null) {
-      return;
-    }
+  public void addStatusEntry(long validAfter, String nickname,
+      String fingerprint, String descriptor, long published,
+      String address, long orPort, long dirPort,
+      SortedSet<String> flags, String version, long bandwidth,
+      String ports, byte[] rawDescriptor) {
     try {
-      Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-      Timestamp validAfterTimestamp = new Timestamp(validAfter);
-      this.psRs.setTimestamp(1, validAfterTimestamp, cal);
-      this.psRs.setString(2, descriptor);
-      ResultSet rs = psRs.executeQuery();
-      rs.next();
-      if (rs.getInt(1) > 0) {
-        return;
+      if (this.psRs != null && this.psR != null) {
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        Timestamp validAfterTimestamp = new Timestamp(validAfter);
+        this.psRs.setTimestamp(1, validAfterTimestamp, cal);
+        this.psRs.setString(2, descriptor);
+        ResultSet rs = psRs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) > 0) {
+          return;
+        }
+        this.psR.clearParameters();
+        this.psR.setTimestamp(1, validAfterTimestamp, cal);
+        this.psR.setString(2, nickname);
+        this.psR.setString(3, fingerprint);
+        this.psR.setString(4, descriptor);
+        this.psR.setTimestamp(5, new Timestamp(published), cal);
+        this.psR.setString(6, address);
+        this.psR.setLong(7, orPort);
+        this.psR.setLong(8, dirPort);
+        this.psR.setBoolean(9, flags.contains("Authority"));
+        this.psR.setBoolean(10, flags.contains("BadExit"));
+        this.psR.setBoolean(11, flags.contains("BadDirectory"));
+        this.psR.setBoolean(12, flags.contains("Exit"));
+        this.psR.setBoolean(13, flags.contains("Fast"));
+        this.psR.setBoolean(14, flags.contains("Guard"));
+        this.psR.setBoolean(15, flags.contains("HSDir"));
+        this.psR.setBoolean(16, flags.contains("Named"));
+        this.psR.setBoolean(17, flags.contains("Stable"));
+        this.psR.setBoolean(18, flags.contains("Running"));
+        this.psR.setBoolean(19, flags.contains("Unnamed"));
+        this.psR.setBoolean(20, flags.contains("Valid"));
+        this.psR.setBoolean(21, flags.contains("V2Dir"));
+        this.psR.setBoolean(22, flags.contains("V3Dir"));
+        this.psR.setString(23, version);
+        this.psR.setLong(24, bandwidth);
+        this.psR.setString(25, ports);
+        this.psR.setBytes(26, rawDescriptor);
+        this.psR.executeUpdate();
+        rrsCount++;
+        if (rrsCount % autoCommitCount == 0)  {
+          this.conn.commit();
+          rrsCount = 0;
+        }
       }
-      this.psR.clearParameters();
-      this.psR.setTimestamp(1, validAfterTimestamp, cal);
-      this.psR.setString(2, descriptor);
-      this.psR.setBoolean(3, flags.contains("Authority"));
-      this.psR.setBoolean(4, flags.contains("BadExit"));
-      this.psR.setBoolean(5, flags.contains("BadDirectory"));
-      this.psR.setBoolean(6, flags.contains("Exit"));
-      this.psR.setBoolean(7, flags.contains("Fast"));
-      this.psR.setBoolean(8, flags.contains("Guard"));
-      this.psR.setBoolean(9, flags.contains("HSDir"));
-      this.psR.setBoolean(10, flags.contains("Named"));
-      this.psR.setBoolean(11, flags.contains("Stable"));
-      this.psR.setBoolean(12, flags.contains("Running"));
-      this.psR.setBoolean(13, flags.contains("Unnamed"));
-      this.psR.setBoolean(14, flags.contains("Valid"));
-      this.psR.setBoolean(15, flags.contains("V2Dir"));
-      this.psR.setBoolean(16, flags.contains("V3Dir"));
-      this.psR.executeUpdate();
-      rrsCount++;
-      if (rrsCount % autoCommitCount == 0)  {
-        this.conn.commit();
-        rrsCount = 0;
+      if (this.statusentryOut != null) {
+        SimpleDateFormat dateTimeFormat =
+             new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+        this.statusentryOut.write(
+            dateTimeFormat.format(validAfter) + "\t" + nickname
+            + "\t" + fingerprint.toLowerCase() + "\t"
+            + descriptor.toLowerCase() + "\t"
+            + dateTimeFormat.format(published) + "\t" + address + "\t"
+            + orPort + "\t" + dirPort + "\t"
+            + (flags.contains("Authority") ? "t" : "f") + "\t"
+            + (flags.contains("BadExit") ? "t" : "f") + "\t"
+            + (flags.contains("BadDirectory") ? "t" : "f") + "\t"
+            + (flags.contains("Exit") ? "t" : "f") + "\t"
+            + (flags.contains("Fast") ? "t" : "f") + "\t"
+            + (flags.contains("Guard") ? "t" : "f") + "\t"
+            + (flags.contains("HSDir") ? "t" : "f") + "\t"
+            + (flags.contains("Named") ? "t" : "f") + "\t"
+            + (flags.contains("Stable") ? "t" : "f") + "\t"
+            + (flags.contains("Running") ? "t" : "f") + "\t"
+            + (flags.contains("Unnamed") ? "t" : "f") + "\t"
+            + (flags.contains("Valid") ? "t" : "f") + "\t"
+            + (flags.contains("V2Dir") ? "t" : "f") + "\t"
+            + (flags.contains("V3Dir") ? "t" : "f") + "\t"
+            + version + "\t" + bandwidth + "\t" + ports + "\t");
+        this.statusentryOut.write(PGbytea.toPGString(rawDescriptor).
+            replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
       }
-
     } catch (SQLException e) {
       this.logger.log(Level.WARNING, "Could not add network status "
           + "consensus entry.", e);
+    } catch (IOException e) {
+      this.logger.log(Level.WARNING, "Could not write network status "
+          + "consensus entry to raw database import file.", e);
     }
   }
 
   /**
    * Insert server descriptor into database.
    */
-  public void addServerDescriptor(String descriptor, String address,
-      int orPort, int dirPort, long bandwidthAvg, long bandwidthBurst,
-      long bandwidthObserved, String platform, long published,
-      long uptime) {
-    if (this.psDs == null || this.psD == null) {
-      return;
-    }
+  public void addServerDescriptor(String descriptor, String nickname,
+      String address, int orPort, int dirPort, long bandwidthAvg,
+      long bandwidthBurst, long bandwidthObserved, String platform,
+      long published, long uptime, String extraInfoDigest,
+      byte[] rawDescriptor) {
     try {
-      Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-      this.psDs.setString(1, descriptor);
-      ResultSet rs = psDs.executeQuery();
-      rs.next();
-      if (rs.getInt(1) > 0) {
-        return;
+      if (this.psDs != null && this.psD != null) {
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        this.psDs.setString(1, descriptor);
+        ResultSet rs = psDs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) > 0) {
+          return;
+        }
+        this.psD.clearParameters();
+        this.psD.setString(1, descriptor);
+        this.psD.setString(2, nickname);
+        this.psD.setString(3, address);
+        this.psD.setInt(4, orPort);
+        this.psD.setInt(5, dirPort);
+        this.psD.setLong(6, bandwidthAvg);
+        this.psD.setLong(7, bandwidthBurst);
+        this.psD.setLong(8, bandwidthObserved);
+        this.psD.setString(9, new String(platform.getBytes(),
+            "US-ASCII"));
+        this.psD.setTimestamp(10, new Timestamp(published), cal);
+        this.psD.setLong(11, uptime);
+        this.psD.setString(12, extraInfoDigest);
+        this.psD.setBytes(13, rawDescriptor);
+        this.psD.executeUpdate();
+        rdsCount++;
+        if (rdsCount % autoCommitCount == 0)  {
+          this.conn.commit();
+          rdsCount = 0;
+        }
       }
-      this.psD.clearParameters();
-      this.psD.setString(1, descriptor);
-      this.psD.setString(2, address);
-      this.psD.setInt(3, orPort);
-      this.psD.setInt(4, dirPort);
-      this.psD.setLong(5, bandwidthAvg);
-      this.psD.setLong(6, bandwidthBurst);
-      this.psD.setLong(7, bandwidthObserved);
-      this.psD.setString(8, platform);
-      this.psD.setTimestamp(9, new Timestamp(published), cal);
-      this.psD.setLong(10, uptime);
-      this.psD.executeUpdate();
-      rdsCount++;
-      if (rdsCount % autoCommitCount == 0)  {
-        this.conn.commit();
-        rdsCount = 0;
+      if (this.descriptorOut != null) {
+        SimpleDateFormat dateTimeFormat =
+             new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+        this.descriptorOut.write(descriptor.toLowerCase() + "\t"
+            + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
+            + "\t" + bandwidthAvg + "\t" + bandwidthBurst + "\t"
+            + bandwidthObserved + "\t" + new String(platform.getBytes(),
+            "US-ASCII") + "\t" + dateTimeFormat.format(published) + "\t"
+            + uptime + "\t" + extraInfoDigest + "\t");
+        this.descriptorOut.write(PGbytea.toPGString(rawDescriptor).
+            replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
       }
-
+    } catch (UnsupportedEncodingException e) {
+      this.logger.log(Level.WARNING, "Could not add server descriptor.",
+          e);
     } catch (SQLException e) {
       this.logger.log(Level.WARNING, "Could not add server descriptor.",
           e);
+    } catch (IOException e) {
+      this.logger.log(Level.WARNING, "Could not write server descriptor "
+          + "to raw database import file.", e);
+    }
+  }
+
+  /**
+   * Insert extra-info descriptor into database.
+   */
+  public void addExtraInfoDescriptor(String extraInfoDigest,
+      String nickname, String fingerprint, long published,
+      byte[] rawDescriptor) {
+    try {
+      if (this.psEs != null && this.psE != null) {
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        this.psEs.setString(1, extraInfoDigest);
+        ResultSet rs = psEs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) > 0) {
+          return;
+        }
+        this.psE.clearParameters();
+        this.psE.setString(1, extraInfoDigest);
+        this.psE.setString(2, nickname);
+        this.psE.setString(3, fingerprint);
+        this.psE.setTimestamp(4, new Timestamp(published), cal);
+        this.psE.setBytes(5, rawDescriptor);
+        this.psE.executeUpdate();
+        resCount++;
+        if (resCount % autoCommitCount == 0)  {
+          this.conn.commit();
+          resCount = 0;
+        }
+      }
+      if (this.extrainfoOut != null) {
+        SimpleDateFormat dateTimeFormat =
+             new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+        this.extrainfoOut.write(extraInfoDigest.toLowerCase() + "\t"
+            + nickname + "\t" + fingerprint.toLowerCase() + "\t"
+            + dateTimeFormat.format(published) + "\t");
+        this.extrainfoOut.write(PGbytea.toPGString(rawDescriptor).
+            replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+      }
+    } catch (SQLException e) {
+      this.logger.log(Level.WARNING, "Could not add extra-info "
+          + "descriptor.", e);
+    } catch (IOException e) {
+      this.logger.log(Level.WARNING, "Could not write extra-info "
+          + "descriptor to raw database import file.", e);
     }
   }
 
@@ -191,9 +348,9 @@ public final class RelayDescriptorDatabaseImporter {
     /* commit any stragglers before closing */
     try {
       this.conn.commit();
-    }
-    catch (SQLException e)  {
-      this.logger.log(Level.WARNING, "Could not commit final records to database", e);
+    } catch (SQLException e)  {
+      this.logger.log(Level.WARNING, "Could not commit final records to "
+          + "database", e);
     }
     try {
       this.conn.close();
@@ -201,5 +358,31 @@ public final class RelayDescriptorDatabaseImporter {
       this.logger.log(Level.WARNING, "Could not close database "
           + "connection.", e);
     }
+    /* Close raw import files. */
+    if (this.statusentryOut != null) {
+      try {
+        this.statusentryOut.close();
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not close raw database "
+            + "import file.", e);
+      }
+    }
+    if (this.descriptorOut != null) {
+      try {
+        this.descriptorOut.close();
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not close raw database "
+            + "import file.", e);
+      }
+    }
+    if (this.extrainfoOut != null) {
+      try {
+        this.extrainfoOut.close();
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not close raw database "
+            + "import file.", e);
+      }
+    }
   }
 }
+
diff --git a/src/org/torproject/ernie/db/RelayDescriptorParser.java b/src/org/torproject/ernie/db/RelayDescriptorParser.java
index 5582cfb..9dfac56 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorParser.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorParser.java
@@ -126,13 +126,17 @@ public class RelayDescriptorParser {
         boolean isConsensus = true;
         int exit = 0, fast = 0, guard = 0, running = 0, stable = 0;
         String validAfterTime = null, descriptorIdentity = null,
-            serverDesc = null;
+            nickname = null, relayIdentity = null, serverDesc = null,
+            version = null, ports = null;
         StringBuilder descriptorIdentities = new StringBuilder();
-        String fingerprint = null, dirSource = null;
-        long validAfter = -1L;
+        String fingerprint = null, dirSource = null, address = null;
+        long validAfter = -1L, published = -1L, bandwidth = -1L,
+            orPort = 0L, dirPort = 0L;
         SortedSet<String> dirSources = new TreeSet<String>();
         SortedSet<String> serverDescriptors = new TreeSet<String>();
         SortedSet<String> hashedRelayIdentities = new TreeSet<String>();
+        SortedSet<String> relayFlags = null;
+        StringBuilder rawStatusEntry = null;
         while ((line = br.readLine()) != null) {
           if (line.equals("vote-status vote")) {
             isConsensus = false;
@@ -146,20 +150,39 @@ public class RelayDescriptorParser {
           } else if (line.startsWith("fingerprint ")) {
             fingerprint = line.split(" ")[1];
           } else if (line.startsWith("r ")) {
-            String publishedTime = line.split(" ")[4] + " "
-                + line.split(" ")[5];
-            String relayIdentity = Hex.encodeHexString(
-                Base64.decodeBase64(line.split(" ")[2] + "=")).
+            if (relayIdentity != null && this.rddi != null) {
+              byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
+              this.rddi.addStatusEntry(validAfter, nickname,
+                  relayIdentity, serverDesc, published, address, orPort,
+                  dirPort, relayFlags, version, bandwidth, ports,
+                  rawDescriptor);
+              relayFlags = null;
+              version = null;
+              bandwidth = -1L;
+              ports = null;
+            }
+            rawStatusEntry = new StringBuilder(line + "\n");
+            String[] parts = line.split(" ");
+            String publishedTime = parts[4] + " " + parts[5];
+            nickname = parts[1];
+            relayIdentity = Hex.encodeHexString(
+                Base64.decodeBase64(parts[2] + "=")).
                 toLowerCase();
             serverDesc = Hex.encodeHexString(Base64.decodeBase64(
-                line.split(" ")[3] + "=")).toLowerCase();
+                parts[3] + "=")).toLowerCase();
             serverDescriptors.add(publishedTime + "," + relayIdentity
                 + "," + serverDesc);
             hashedRelayIdentities.add(DigestUtils.shaHex(
-                Base64.decodeBase64(line.split(" ")[2] + "=")).
+                Base64.decodeBase64(parts[2] + "=")).
                 toUpperCase());
-            descriptorIdentity = line.split(" ")[3];
+            descriptorIdentity = parts[3];
+            published = parseFormat.parse(parts[4] + " " + parts[5]).
+                getTime();
+            address = parts[6];
+            orPort = Long.parseLong(parts[7]);
+            dirPort = Long.parseLong(parts[8]);
           } else if (line.startsWith("s ")) {
+            rawStatusEntry.append(line + "\n");
             if (line.contains(" Running")) {
               exit += line.contains(" Exit") ? 1 : 0;
               fast += line.contains(" Fast") ? 1 : 0;
@@ -168,17 +191,36 @@ public class RelayDescriptorParser {
               running++;
               descriptorIdentities.append("," + descriptorIdentity);
             }
-            if (this.rddi != null) {
-              SortedSet<String> flags = new TreeSet<String>();
-              if (line.length() > 2) {
-                for (String flag : line.substring(2).split(" ")) {
-                  flags.add(flag);
-                }
+            relayFlags = new TreeSet<String>();
+            if (line.length() > 2) {
+              for (String flag : line.substring(2).split(" ")) {
+                relayFlags.add(flag);
+              }
+            }
+          } else if (line.startsWith("v ")) {
+            rawStatusEntry.append(line + "\n");
+            version = line.substring(2);
+          } else if (line.startsWith("w ")) {
+            rawStatusEntry.append(line + "\n");
+            String[] parts = line.split(" ");
+            for (String part : parts) {
+              if (part.startsWith("Bandwidth=")) {
+                bandwidth = Long.parseLong(part.substring(
+                    "Bandwidth=".length()));
               }
-              this.rddi.addStatusEntry(validAfter, serverDesc, flags);
             }
+          } else if (line.startsWith("p ")) {
+            rawStatusEntry.append(line + "\n");
+            ports = line.substring(2);
           }
         }
+        if (relayIdentity != null && this.rddi != null) {
+          byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
+          this.rddi.addStatusEntry(validAfter, nickname,
+              relayIdentity, serverDesc, published, address, orPort,
+              dirPort, relayFlags, version, bandwidth, ports,
+              rawDescriptor);
+        }
         if (isConsensus) {
           if (this.bsfh != null) {
             for (String hashedRelayIdentity : hashedRelayIdentities) {
@@ -233,6 +275,7 @@ public class RelayDescriptorParser {
             publishedTime = null, bandwidthLine = null,
             extraInfoDigest = null, relayIdentifier = null;
         String[] parts = line.split(" ");
+        String nickname = parts[1];
         String address = parts[2];
         int orPort = Integer.parseInt(parts[3]);
         int dirPort = Integer.parseInt(parts[4]);
@@ -290,11 +333,12 @@ public class RelayDescriptorParser {
           long bandwidthBurst = Long.parseLong(bwParts[2]);
           long bandwidthObserved = Long.parseLong(bwParts[3]);
           String platform = platformLine.substring("platform ".length());
-          this.rddi.addServerDescriptor(digest, address, orPort, dirPort,
-              bandwidthAvg, bandwidthBurst, bandwidthObserved, platform,
-              published, uptime);
+          this.rddi.addServerDescriptor(digest, nickname, address, orPort,
+              dirPort, bandwidthAvg, bandwidthBurst, bandwidthObserved,
+              platform, published, uptime, extraInfoDigest, data);
         }
       } else if (line.startsWith("extra-info ")) {
+        String nickname = line.split(" ")[1];
         String publishedTime = null, relayIdentifier = line.split(" ")[2];
         long published = -1L;
         String dir = line.split(" ")[2];
@@ -356,6 +400,10 @@ public class RelayDescriptorParser {
           this.rdd.haveParsedExtraInfoDescriptor(publishedTime,
               relayIdentifier.toLowerCase(), digest);
         }
+        if (this.rddi != null && digest != null) {
+          this.rddi.addExtraInfoDescriptor(digest, nickname,
+              dir.toLowerCase(), published, data);
+        }
       }
     } catch (IOException e) {
       this.logger.log(Level.WARNING, "Could not parse descriptor. "
-- 
1.7.1



More information about the tor-commits mailing list