[or-cvs] [metrics-db/master] Use SQL arrays and precomputed values for bandwidth histories.

karsten at torproject.org karsten at torproject.org
Thu Jan 20 07:17:37 UTC 2011


commit 55953abf69226dfff15e37c6e4b674cddfbaec67
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Sat Jan 15 12:38:14 2011 +0100

    Use SQL arrays and precomputed values for bandwidth histories.
---
 db/tordir.sql                                      |  152 +++++----
 .../ernie/db/RelayDescriptorDatabaseImporter.java  |  381 ++++++++++++++------
 .../torproject/ernie/db/RelayDescriptorParser.java |   35 +--
 3 files changed, 347 insertions(+), 221 deletions(-)

diff --git a/db/tordir.sql b/db/tordir.sql
index 7af0093..08f048f 100644
--- a/db/tordir.sql
+++ b/db/tordir.sql
@@ -32,25 +32,25 @@ CREATE TABLE extrainfo (
     CONSTRAINT extrainfo_pkey PRIMARY KEY (extrainfo)
 );
 
--- TABLE bandwidth
--- Contains bandwidth histories contained in extra-info descriptors.
--- Every row represents a 15-minute interval and can have read, written,
--- dirread, and dirwritten set or not. We're making sure that there's only
--- one interval for each extrainfo. However, it's possible that an
--- interval is contained in another extra-info descriptor of the same
--- relay. These duplicates need to be filtered when aggregating bandwidth
--- histories.
+-- Contains bandwidth histories reported by relays in extra-info
+-- descriptors. Each row contains the reported bandwidth in 15-minute
+-- intervals for each relay and date.
 CREATE TABLE bwhist (
     fingerprint CHARACTER(40) NOT NULL,
-    extrainfo CHARACTER(40) NOT NULL,
-    intervalend TIMESTAMP WITHOUT TIME ZONE NOT NULL,
-    read BIGINT,
-    written BIGINT,
-    dirread BIGINT,
-    dirwritten BIGINT,
-    CONSTRAINT bwhist_pkey PRIMARY KEY (extrainfo, intervalend)
+    date DATE NOT NULL,
+    read BIGINT[],
+    read_sum BIGINT,
+    written BIGINT[],
+    written_sum BIGINT,
+    dirread BIGINT[],
+    dirread_sum BIGINT,
+    dirwritten BIGINT[],
+    dirwritten_sum BIGINT,
+    CONSTRAINT bwhist_pkey PRIMARY KEY (fingerprint, date)
 );
 
+CREATE INDEX bwhist_date ON bwhist (date);
+
 -- TABLE statusentry
 -- Contains all of the consensus entries published by the directories.
 -- Each statusentry references a valid descriptor.
@@ -188,8 +188,6 @@ CREATE TABLE total_bwhist (
     date DATE NOT NULL,
     read BIGINT,
     written BIGINT,
-    dirread BIGINT,
-    dirwritten BIGINT,
     CONSTRAINT total_bwhist_pkey PRIMARY KEY(date)
 );
 
@@ -264,6 +262,59 @@ RETURNS INTEGER AS $$
     END;
 $$ LANGUAGE plpgsql;
 
+CREATE OR REPLACE FUNCTION array_sum (BIGINT[]) RETURNS BIGINT AS $$
+  SELECT SUM($1[i])::bigint
+  FROM generate_series(array_lower($1, 1), array_upper($1, 1)) index(i);
+$$ LANGUAGE SQL;
+
+CREATE OR REPLACE FUNCTION insert_bwhist(
+    insert_fingerprint CHARACTER(40), insert_date DATE,
+    insert_read BIGINT[], insert_written BIGINT[],
+    insert_dirread BIGINT[], insert_dirwritten BIGINT[])
+    RETURNS INTEGER AS $$
+  BEGIN
+  IF (SELECT COUNT(*) FROM bwhist
+      WHERE fingerprint = insert_fingerprint AND date = insert_date) = 0
+      THEN
+    INSERT INTO bwhist (fingerprint, date, read, written, dirread,
+        dirwritten)
+    VALUES (insert_fingerprint, insert_date, insert_read, insert_written,
+        insert_dirread, insert_dirwritten);
+  ELSE
+    BEGIN
+    UPDATE bwhist
+    SET read[array_lower(insert_read, 1):
+          array_upper(insert_read, 1)] = insert_read,
+        written[array_lower(insert_written, 1):
+          array_upper(insert_written, 1)] = insert_written,
+        dirread[array_lower(insert_dirread, 1):
+          array_upper(insert_dirread, 1)] = insert_dirread,
+        dirwritten[array_lower(insert_dirwritten, 1):
+          array_upper(insert_dirwritten, 1)] = insert_dirwritten
+    WHERE fingerprint = insert_fingerprint AND date = insert_date;
+    -- Updating twice is an ugly workaround for PostgreSQL bug 5840
+    UPDATE bwhist
+    SET read[array_lower(insert_read, 1):
+          array_upper(insert_read, 1)] = insert_read,
+        written[array_lower(insert_written, 1):
+          array_upper(insert_written, 1)] = insert_written,
+        dirread[array_lower(insert_dirread, 1):
+          array_upper(insert_dirread, 1)] = insert_dirread,
+        dirwritten[array_lower(insert_dirwritten, 1):
+          array_upper(insert_dirwritten, 1)] = insert_dirwritten
+    WHERE fingerprint = insert_fingerprint AND date = insert_date;
+    END;
+  END IF;
+  UPDATE bwhist
+  SET read_sum = array_sum(read),
+      written_sum = array_sum(written),
+      dirread_sum = array_sum(dirread),
+      dirwritten_sum = array_sum(dirwritten)
+  WHERE fingerprint = insert_fingerprint AND date = insert_date;
+  RETURN 1;
+  END;
+$$ LANGUAGE plpgsql;
+
 -- refresh_* functions
 -- The following functions keep their corresponding aggregate tables
 -- up-to-date. They should be called every time ERNIE is run, or when new
@@ -443,43 +494,15 @@ CREATE OR REPLACE FUNCTION refresh_total_bandwidth() RETURNS INTEGER AS $$
     END;
 $$ LANGUAGE plpgsql;
 
--- FUNCTION refresh_total_bwhist()
 CREATE OR REPLACE FUNCTION refresh_total_bwhist() RETURNS INTEGER AS $$
   BEGIN
   DELETE FROM total_bwhist WHERE date IN (SELECT date FROM updates);
-  INSERT INTO total_bwhist (date, read, written, dirread, dirwritten)
-  SELECT date,
-         SUM(read) AS read,
-         SUM(written) AS written,
-         SUM(dirread) * (SUM(written) + SUM(read)) / (1
-           + SUM(CASE WHEN dirwritten IS NULL THEN NULL ELSE written END)
-           + SUM(CASE WHEN dirread IS NULL THEN NULL ELSE read END))
-           AS dirread,
-         SUM(dirwritten) * (SUM(written) + SUM(read)) / (1
-           + SUM(CASE WHEN dirwritten IS NULL THEN NULL ELSE written END)
-           + SUM(CASE WHEN dirread IS NULL THEN NULL ELSE read END))
-           AS dirwritten
-  FROM (
-    SELECT fingerprint,
-           DATE(intervalend) AS date,
-           SUM(read) AS read,
-           SUM(written) AS written,
-           SUM(dirread) AS dirread,
-           SUM(dirwritten) AS dirwritten
-    FROM (
-      SELECT DISTINCT fingerprint,
-                      intervalend,
-                      read,
-                      written,
-                      dirread,
-                      dirwritten
-      FROM bwhist
-      WHERE DATE(intervalend) >= (SELECT MIN(date) FROM updates)
-      AND DATE(intervalend) <= (SELECT MAX(date) FROM updates)
-      AND DATE(intervalend) IN (SELECT date FROM updates)
-    ) byinterval
-    GROUP BY fingerprint, DATE(intervalend)
-  ) byrelay
+  INSERT INTO total_bwhist (date, read, written)
+  SELECT date, SUM(read_sum) AS read, SUM(written_sum) AS written
+  FROM bwhist
+  WHERE date >= (SELECT MIN(date) FROM updates)
+  AND date <= (SELECT MAX(date) FROM updates)
+  AND date IN (SELECT date FROM updates)
   GROUP BY date;
   RETURN 1;
   END;
@@ -520,17 +543,17 @@ CREATE OR REPLACE FUNCTION refresh_user_stats() RETURNS INTEGER AS $$
            THEN NULL ELSE written END) AS bw,
          SUM(CASE WHEN authority IS NOT NULL
            THEN NULL ELSE read END) AS br,
-         SUM(CASE WHEN dirwritten IS NULL OR authority IS NOT NULL
+         SUM(CASE WHEN dirwritten = 0 OR authority IS NOT NULL
            THEN NULL ELSE written END) AS bwd,
-         SUM(CASE WHEN dirwritten IS NULL OR authority IS NOT NULL
+         SUM(CASE WHEN dirwritten = 0 OR authority IS NOT NULL
            THEN NULL ELSE read END) AS brd,
          SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL
            THEN NULL ELSE written END) AS bwr,
          SUM(CASE WHEN requests IS NULL OR authority IS NOT NULL
            THEN NULL ELSE read END) AS brr,
-         SUM(CASE WHEN dirwritten IS NULL OR requests IS NULL
+         SUM(CASE WHEN dirwritten = 0 OR requests IS NULL
            OR authority IS NOT NULL THEN NULL ELSE written END) AS bwdr,
-         SUM(CASE WHEN dirwritten IS NULL OR requests IS NULL
+         SUM(CASE WHEN dirwritten = 0 OR requests IS NULL
            OR authority IS NOT NULL THEN NULL ELSE read END) AS brdr,
          SUM(CASE WHEN opendirport IS NULL OR authority IS NOT NULL
            THEN NULL ELSE written END) AS bwp,
@@ -589,19 +612,12 @@ CREATE OR REPLACE FUNCTION refresh_user_stats() RETURNS INTEGER AS $$
   LEFT JOIN (
     -- In the next step, we expand the result by bandwidth histories of
     -- all relays.
-    SELECT fingerprint,
-           DATE(intervalend) AS date,
-           SUM(read) AS read, SUM(written) AS written,
-           SUM(dirread) AS dirread, SUM(dirwritten) AS dirwritten
-    FROM (
-      SELECT DISTINCT fingerprint, intervalend,
-        read, written, dirread, dirwritten
-      FROM bwhist
-      WHERE DATE(intervalend) >= (SELECT MIN(date) FROM updates)
-      AND DATE(intervalend) <= (SELECT MAX(date) FROM updates)
-      AND DATE(intervalend) IN (SELECT date FROM updates)
-    ) distinct_bwhist
-    GROUP BY 1, 2
+    SELECT fingerprint, date, read_sum AS read, written_sum AS written,
+           dirread_sum AS dirread, dirwritten_sum AS dirwritten
+    FROM bwhist
+    WHERE date >= (SELECT MIN(date) FROM updates)
+    AND date <= (SELECT MAX(date) FROM updates)
+    AND date IN (SELECT date FROM updates)
   ) bwhist_by_relay
   ON dirreq_stats_by_country.date = bwhist_by_relay.date
   LEFT JOIN (
diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index e423ef9..6fce2db 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -57,12 +57,6 @@ public final class RelayDescriptorDatabaseImporter {
   private PreparedStatement psEs;
 
   /**
-   * Prepared statement to check whether the bandwidth history of an
-   * extra-info descriptor has been imported into the database before.
-   */
-  private PreparedStatement psHs;
-
-  /**
    * Prepared statement to check whether a given server descriptor has
    * been imported into the database before.
    */
@@ -122,10 +116,10 @@ public final class RelayDescriptorDatabaseImporter {
   private PreparedStatement psE;
 
   /**
-   * Prepared statement to insert the bandwidth history of an extra-info
+   * Callable statement to insert the bandwidth history of an extra-info
    * descriptor into the database.
    */
-  private PreparedStatement psH;
+  private CallableStatement csH;
 
   /**
    * Prepared statement to insert a network status consensus into the
@@ -252,8 +246,6 @@ public final class RelayDescriptorDatabaseImporter {
             + "FROM descriptor WHERE descriptor = ?");
         this.psEs = conn.prepareStatement("SELECT COUNT(*) "
             + "FROM extrainfo WHERE extrainfo = ?");
-        this.psHs = conn.prepareStatement("SELECT COUNT(*) "
-            + "FROM bwhist WHERE extrainfo = ?");
         this.psCs = conn.prepareStatement("SELECT COUNT(*) "
             + "FROM consensus WHERE validafter = ?");
         this.psVs = conn.prepareStatement("SELECT COUNT(*) "
@@ -279,9 +271,8 @@ public final class RelayDescriptorDatabaseImporter {
         this.psE = conn.prepareStatement("INSERT INTO extrainfo "
             + "(extrainfo, nickname, fingerprint, published, rawdesc) "
             + "VALUES (?, ?, ?, ?, ?)");
-        this.psH = conn.prepareStatement("INSERT INTO bwhist "
-            + "(fingerprint, extrainfo, intervalend, read, written, "
-            + "dirread, dirwritten) VALUES (?, ?, ?, ?, ?, ?, ?)");
+        this.csH = conn.prepareCall("{call insert_bwhist(?, ?, ?, ?, ?, "
+            + "?)}");
         this.psC = conn.prepareStatement("INSERT INTO consensus "
             + "(validafter, rawdesc) VALUES (?, ?)");
         this.psV = conn.prepareStatement("INSERT INTO vote "
@@ -315,6 +306,9 @@ public final class RelayDescriptorDatabaseImporter {
 
   private void addDateToScheduledUpdates(long timestamp)
       throws SQLException {
+    if (this.psU == null) {
+      return;
+    }
     long dateMillis = 0L;
     try {
       dateMillis = this.dateTimeFormat.parse(
@@ -536,7 +530,7 @@ public final class RelayDescriptorDatabaseImporter {
    */
   public void addExtraInfoDescriptor(String extraInfoDigest,
       String nickname, String fingerprint, long published,
-      byte[] rawDescriptor, SortedMap<String, String> bandwidthHistory) {
+      byte[] rawDescriptor, List<String> bandwidthHistoryLines) {
     try {
       Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
       if (this.psEs != null && this.psE != null) {
@@ -557,109 +551,11 @@ public final class RelayDescriptorDatabaseImporter {
           }
         }
       }
-      if ((this.psHs != null && this.psH != null) ||
-          this.rawFilesDirectory != null) {
-        boolean addToDatabase = false;
-        if (psHs != null && this.psH != null) {
-          this.psHs.setString(1, extraInfoDigest);
-          ResultSet rs = this.psHs.executeQuery();
-          rs.next();
-          if (rs.getInt(1) == 0) {
-            addToDatabase = true;
-          }
-        }
-        if (addToDatabase || this.rawFilesDirectory != null) {
-          String lastIntervalEnd = null;
-          List<String> bandwidthHistoryValues = new ArrayList<String>();
-          bandwidthHistoryValues.addAll(bandwidthHistory.values());
-          bandwidthHistoryValues.add("EOL");
-          String readBytes = null, writtenBytes = null,
-              dirReadBytes = null, dirWrittenBytes = null;
-          for (String bandwidthHistoryValue : bandwidthHistoryValues) {
-            String[] entryParts = bandwidthHistoryValue.split(",");
-            String intervalEnd = entryParts[0];
-            if ((intervalEnd.equals("EOL") ||
-                !intervalEnd.equals(lastIntervalEnd)) &&
-                lastIntervalEnd != null) {
-              if (addToDatabase) {
-                this.psH.clearParameters();
-                this.psH.setString(1, fingerprint);
-                this.psH.setString(2, extraInfoDigest);
-                try {
-                  this.psH.setTimestamp(3, new Timestamp(Long.parseLong(
-                      lastIntervalEnd)), cal);
-                  if (readBytes != null) {
-                    this.psH.setLong(4, Long.parseLong(readBytes));
-                  } else {
-                    this.psH.setNull(4, Types.BIGINT);
-                  }
-                  if (writtenBytes != null) {
-                    this.psH.setLong(5, Long.parseLong(writtenBytes));
-                  } else {
-                    this.psH.setNull(5, Types.BIGINT);
-                  }
-                  if (dirReadBytes != null) {
-                    this.psH.setLong(6, Long.parseLong(dirReadBytes));
-                  } else {
-                    this.psH.setNull(6, Types.BIGINT);
-                  }
-                  if (dirWrittenBytes != null) {
-                    this.psH.setLong(7, Long.parseLong(dirWrittenBytes));
-                  } else {
-                    this.psH.setNull(7, Types.BIGINT);
-                  }
-                } catch (NumberFormatException e) {
-                  break;
-                }
-                this.psH.executeUpdate();
-              }
-              if (this.rawFilesDirectory != null) {
-                if (this.bwhistOut == null) {
-                  new File(rawFilesDirectory).mkdirs();
-                  this.bwhistOut = new BufferedWriter(new FileWriter(
-                      rawFilesDirectory + "/bwhist.sql"));
-                  this.bwhistOut.write(" COPY bwhist (fingerprint, "
-                      + "extrainfo, intervalend, read, written, dirread, "
-                      + "dirwritten) FROM stdin;\n");
-                }
-                String extraInfo = extraInfoDigest.toLowerCase();
-                String intervalEndString = this.dateTimeFormat.format(
-                    Long.parseLong(lastIntervalEnd));
-                this.bwhistOut.write(fingerprint.toLowerCase() + "\t"
-                    + extraInfo + "\t" + intervalEndString + "\t"
-                    + (readBytes != null ? readBytes : "\\N") + "\t"
-                    + (writtenBytes != null ? writtenBytes : "\\N")
-                    + "\t" + (dirReadBytes != null ? dirReadBytes
-                    : "\\N") + "\t" + (dirWrittenBytes != null
-                    ? dirWrittenBytes : "\\N") + "\n");
-              }
-              readBytes = writtenBytes = dirReadBytes = dirWrittenBytes =
-                  null;
-            }
-            if (intervalEnd.equals("EOL")) {
-              break;
-            }
-            lastIntervalEnd = intervalEnd;
-            String type = entryParts[1];
-            String bytes = entryParts[2];
-            if (type.equals("read-history")) {
-              readBytes = bytes;
-            } else if (type.equals("write-history")) {
-              writtenBytes = bytes;
-            } else if (type.equals("dirreq-read-history")) {
-              dirReadBytes = bytes;
-            } else if (type.equals("dirreq-write-history")) {
-              dirWrittenBytes = bytes;
-            }
-          }
-          if (addToDatabase) {
-            rhsCount++;
-            if (rhsCount % autoCommitCount == 0)  {
-              this.conn.commit();
-            }
-          }
-        }
-      }
+    } catch (SQLException e) {
+      this.logger.log(Level.WARNING, "Could not add extra-info "
+          + "descriptor.", e);
+    }
+    try {
       if (this.rawFilesDirectory != null) {
         if (this.extrainfoOut == null) {
           new File(rawFilesDirectory).mkdirs();
@@ -674,12 +570,255 @@ public final class RelayDescriptorDatabaseImporter {
         this.extrainfoOut.write(PGbytea.toPGString(rawDescriptor).
             replaceAll("\\\\", "\\\\\\\\") + "\n");
       }
-    } catch (SQLException e) {
-      this.logger.log(Level.WARNING, "Could not add extra-info "
-          + "descriptor.", e);
     } catch (IOException e) {
       this.logger.log(Level.WARNING, "Could not write extra-info "
           + "descriptor to raw database import file.", e);
+    } catch (SQLException e) {
+      this.logger.log(Level.WARNING, "Could not write extra-info "
+          + "descriptor to raw database import file.", e);
+    }
+    if (!bandwidthHistoryLines.isEmpty()) {
+      this.addBandwidthHistory(fingerprint.toLowerCase(), published,
+          bandwidthHistoryLines);
+    }
+  }
+
+  private static class BigIntArray implements java.sql.Array {
+
+    private final String stringValue;
+
+    public BigIntArray(long[] array, int offset) {
+      if (array == null) {
+        this.stringValue = "[-1:-1]={0}";
+      } else {
+        StringBuilder sb = new StringBuilder("[" + offset + ":"
+            + (offset + array.length - 1) + "]={");
+        for (int i = 0; i < array.length; i++) {
+          sb.append((i > 0 ? "," : "") + array[i]);
+        }
+        sb.append('}');
+        this.stringValue = sb.toString();
+      }
+    }
+
+    public String toString() {
+      return stringValue;
+    }
+
+    public String getBaseTypeName() {
+      return "int8";
+    }
+
+    /* The other methods are never called; no need to implement them. */
+    public void free() {
+      throw new UnsupportedOperationException();
+    }
+    public Object getArray() {
+      throw new UnsupportedOperationException();
+    }
+    public Object getArray(long index, int count) {
+      throw new UnsupportedOperationException();
+    }
+    public Object getArray(long index, int count,
+        Map<String, Class<?>> map) {
+      throw new UnsupportedOperationException();
+    }
+    public Object getArray(Map<String, Class<?>> map) {
+      throw new UnsupportedOperationException();
+    }
+    public int getBaseType() {
+      throw new UnsupportedOperationException();
+    }
+    public ResultSet getResultSet() {
+      throw new UnsupportedOperationException();
+    }
+    public ResultSet getResultSet(long index, int count) {
+      throw new UnsupportedOperationException();
+    }
+    public ResultSet getResultSet(long index, int count,
+        Map<String, Class<?>> map) {
+      throw new UnsupportedOperationException();
+    }
+    public ResultSet getResultSet(Map<String, Class<?>> map) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  public void addBandwidthHistory(String fingerprint, long published,
+      List<String> bandwidthHistoryStrings) {
+
+    /* Split history lines by date and rewrite them so that the date
+     * comes first. */
+    SortedSet<String> historyLinesByDate = new TreeSet<String>();
+    for (String bandwidthHistoryString : bandwidthHistoryStrings) {
+      String[] parts = bandwidthHistoryString.split(" ");
+      if (parts.length != 6) {
+        this.logger.finer("Bandwidth history line does not have expected "
+            + "number of elements. Ignoring this line.");
+        continue;
+      }
+      long intervalLength = 0L;
+      try {
+        intervalLength = Long.parseLong(parts[3].substring(1));
+      } catch (NumberFormatException e) {
+        this.logger.fine("Bandwidth history line does not have valid "
+            + "interval length '" + parts[3] + " " + parts[4] + "'. "
+            + "Ignoring this line.");
+        continue;
+      }
+      if (intervalLength != 900L) {
+        this.logger.fine("Bandwidth history line does not consist of "
+            + "15-minute intervals. Ignoring this line.");
+        continue;
+      }
+      String type = parts[0];
+      String intervalEndTime = parts[1] + " " + parts[2];
+      long intervalEnd, dateStart;
+      try {
+        intervalEnd = dateTimeFormat.parse(intervalEndTime).getTime();
+        dateStart = dateTimeFormat.parse(parts[1] + " 00:00:00").
+            getTime();
+      } catch (ParseException e) {
+        this.logger.fine("Parse exception while parsing timestamp in "
+            + "bandwidth history line. Ignoring this line.");
+        continue;
+      }
+      if (Math.abs(published - intervalEnd) >
+          7L * 24L * 60L * 60L * 1000L) {
+        this.logger.fine("Extra-info descriptor publication time "
+            + dateTimeFormat.format(published) + " and last interval "
+            + "time " + intervalEndTime + " in " + type + " line differ "
+            + "by more than 7 days! Not adding this line!");
+        continue;
+      }
+      long currentIntervalEnd = intervalEnd;
+      StringBuilder sb = new StringBuilder();
+      String[] values = parts[5].split(",");
+      SortedSet<String> newHistoryLines = new TreeSet<String>();
+      try {
+        for (int i = values.length - 1; i >= -1; i--) {
+          if (i == -1 || currentIntervalEnd < dateStart) {
+            sb.insert(0, intervalEndTime + " " + type + " ("
+                + intervalLength + " s) ");
+            sb.setLength(sb.length() - 1);
+            String historyLine = sb.toString();
+            newHistoryLines.add(historyLine);
+            sb = new StringBuilder();
+            dateStart -= 24L * 60L * 60L * 1000L;
+            intervalEndTime = dateTimeFormat.format(currentIntervalEnd);
+          }
+          if (i == -1) {
+            break;
+          }
+          Long.parseLong(values[i]);
+          sb.insert(0, values[i] + ",");
+          currentIntervalEnd -= intervalLength * 1000L;
+        }
+      } catch (NumberFormatException e) {
+        this.logger.fine("Number format exception while parsing "
+            + "bandwidth history line. Ignoring this line.");
+        continue;
+      }
+      historyLinesByDate.addAll(newHistoryLines);
+    }
+
+    /* Add split history lines to database. */
+    String lastDate = null;
+    historyLinesByDate.add("EOL");
+    long[] readArray = null, writtenArray = null, dirreadArray = null,
+        dirwrittenArray = null;
+    int readOffset = 0, writtenOffset = 0, dirreadOffset = 0,
+        dirwrittenOffset = 0;
+    for (String historyLine : historyLinesByDate) {
+      String[] parts = historyLine.split(" ");
+      String currentDate = parts[0];
+      if (lastDate != null && (historyLine.equals("EOL") ||
+          !currentDate.equals(lastDate))) {
+        BigIntArray readIntArray = new BigIntArray(readArray,
+            readOffset);
+        BigIntArray writtenIntArray = new BigIntArray(writtenArray,
+            writtenOffset);
+        BigIntArray dirreadIntArray = new BigIntArray(dirreadArray,
+            dirreadOffset);
+        BigIntArray dirwrittenIntArray = new BigIntArray(dirwrittenArray,
+            dirwrittenOffset);
+        if (this.csH != null) {
+          try {
+            long dateMillis = dateTimeFormat.parse(lastDate
+                + " 00:00:00").getTime();
+            this.addDateToScheduledUpdates(dateMillis);
+            this.csH.setString(1, fingerprint);
+            this.csH.setDate(2, new java.sql.Date(dateMillis));
+            this.csH.setArray(3, readIntArray);
+            this.csH.setArray(4, writtenIntArray);
+            this.csH.setArray(5, dirreadIntArray);
+            this.csH.setArray(6, dirwrittenIntArray);
+            this.csH.addBatch();
+            rhsCount++;
+            if (rhsCount % autoCommitCount == 0)  {
+              this.csH.executeBatch();
+            }
+          } catch (SQLException e) {
+            this.logger.log(Level.WARNING, "Could not insert bandwidth "
+                + "history line into database.", e);
+          } catch (ParseException e) {
+            this.logger.log(Level.WARNING, "Could not insert bandwidth "
+                + "history line into database.", e);
+          }
+        }
+        if (this.rawFilesDirectory != null) {
+          try {
+            if (this.bwhistOut == null) {
+              new File(rawFilesDirectory).mkdirs();
+              this.bwhistOut = new BufferedWriter(new FileWriter(
+                  rawFilesDirectory + "/bwhist.sql"));
+            }
+            this.bwhistOut.write("SELECT insert_bwhist('" + fingerprint
+                + "','" + lastDate + "','" + readIntArray.toString()
+                + "','" + writtenIntArray.toString() + "','"
+                + dirreadIntArray.toString() + "','"
+                + dirwrittenIntArray.toString() + "');\n");
+          } catch (IOException e) {
+            this.logger.log(Level.WARNING, "Could not write bandwidth "
+                + "history to raw database import file.", e);
+          }
+        }
+        readArray = writtenArray = dirreadArray = dirwrittenArray = null;
+      }
+      if (historyLine.equals("EOL")) {
+        break;
+      }
+      long lastIntervalTime;
+      try {
+        lastIntervalTime = dateTimeFormat.parse(parts[0] + " "
+            + parts[1]).getTime() - dateTimeFormat.parse(parts[0]
+            + " 00:00:00").getTime();
+      } catch (ParseException e) {
+        continue;
+      }
+      String[] stringValues = parts[5].split(",");
+      long[] longValues = new long[stringValues.length];
+      for (int i = 0; i < longValues.length; i++) {
+        longValues[i] = Long.parseLong(stringValues[i]);
+      }
+
+      int offset = (int) (lastIntervalTime / (15L * 60L * 1000L))
+          - longValues.length + 1;
+      String type = parts[2];
+      if (type.equals("read-history")) {
+        readArray = longValues;
+        readOffset = offset;
+      } else if (type.equals("write-history")) {
+        writtenArray = longValues;
+        writtenOffset = offset;
+      } else if (type.equals("dirreq-read-history")) {
+        dirreadArray = longValues;
+        dirreadOffset = offset;
+      } else if (type.equals("dirreq-write-history")) {
+        dirwrittenArray = longValues;
+        dirwrittenOffset = offset;
+      }
+      lastDate = currentDate;
     }
   }
 
@@ -933,6 +1072,8 @@ public final class RelayDescriptorDatabaseImporter {
     /* Commit any stragglers before closing. */
     if (this.conn != null) {
       try {
+        this.csH.executeBatch();
+
         this.conn.commit();
       } catch (SQLException e)  {
         this.logger.log(Level.WARNING, "Could not commit final records to "
diff --git a/src/org/torproject/ernie/db/RelayDescriptorParser.java b/src/org/torproject/ernie/db/RelayDescriptorParser.java
index 14bfc20..2c31473 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorParser.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorParser.java
@@ -308,8 +308,7 @@ public class RelayDescriptorParser {
         String dir = line.split(" ")[2];
         String statsEnd = null;
         long seconds = -1L;
-        SortedMap<String, String> bandwidthHistory =
-            new TreeMap<String, String>();
+        List<String> bandwidthHistory = new ArrayList<String>();
         boolean skip = false;
         while ((line = br.readLine()) != null) {
           if (line.startsWith("published ")) {
@@ -319,37 +318,7 @@ public class RelayDescriptorParser {
               line.startsWith("write-history ") ||
               line.startsWith("dirreq-read-history ") ||
               line.startsWith("dirreq-write-history ")) {
-            String[] parts = line.split(" ");
-            if (parts.length == 6) {
-              String type = parts[0];
-              String intervalEndTime = parts[1] + " " + parts[2];
-              long intervalEnd = dateTimeFormat.parse(intervalEndTime).
-                  getTime();
-              if (Math.abs(published - intervalEnd) >
-                  7L * 24L * 60L * 60L * 1000L) {
-                this.logger.fine("Extra-info descriptor publication time "
-                    + publishedTime + " and last interval time "
-                    + intervalEndTime + " in " + type + " line differ by "
-                    + "more than 7 days! Not adding this line!");
-                continue;
-              }
-              try {
-                long intervalLength = Long.parseLong(parts[3].
-                    substring(1));
-                String[] values = parts[5].split(",");
-                for (int i = values.length - 1; i >= 0; i--) {
-                  Long.parseLong(values[i]);
-                  bandwidthHistory.put(intervalEnd + "," + type,
-                      intervalEnd + "," + type + "," + values[i]);
-                  intervalEnd -= intervalLength * 1000L;
-                }
-              } catch (NumberFormatException e) {
-                this.logger.log(Level.WARNING, "Could not parse "
-                    + line.split(" ")[0] + " line '" + line + "' in "
-                    + "descriptor. Skipping.", e);
-                break;
-              }
-            }
+            bandwidthHistory.add(line);
           } else if (line.startsWith("dirreq-stats-end ")) {
             String[] parts = line.split(" ");
             if (parts.length < 5) {



More information about the tor-commits mailing list