[or-cvs] [metrics-db/master] Move materialized view update logic from the database to Java.

karsten at torproject.org karsten at torproject.org
Tue Jan 18 10:15:26 UTC 2011


commit 57fdd87876d72d4ba895bbd4ce8e14ee170ee2c7
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Tue Jan 18 11:04:03 2011 +0100

    Move materialized view update logic from the database to Java.
    
    So far, we used row-level triggers to memorize which dates had new or
    updated data and needed to be included in the next materialized view
    refresh run.  The row-level triggers wrote these dates to a table which
    was read by refresh functions.  After a successful refresh run, the table
    was emptied.  There were two problems with this approach:
    
    First, the refresh run is executed in a single long-running transaction.
    If someone adds new data to the database while a refresh run is active,
    the changed dates won't be included in the next refresh run.
    
    Second, row-level triggers have a negative impact on performance.  It
    turns out that removing them reduces import time by roughly 18 %.
    
    The new approach uses Java to memorize which dates need to be updated.
---
 db/tordir.sql                                      |  145 ++++----------------
 .../ernie/db/RelayDescriptorDatabaseImporter.java  |   56 ++++++++
 2 files changed, 81 insertions(+), 120 deletions(-)

diff --git a/db/tordir.sql b/db/tordir.sql
index a69ddba..689a4b4 100644
--- a/db/tordir.sql
+++ b/db/tordir.sql
@@ -228,127 +228,31 @@ CREATE TABLE relay_statuses_per_day (
     CONSTRAINT relay_statuses_per_day_pkey PRIMARY KEY(date)
 );
 
--- TABLE updates
--- A helper table which is used to keep track of what tables and where
--- need to be updated upon refreshes.
+-- Dates to be included in the next refresh run.
+CREATE TABLE scheduled_updates (
+    id SERIAL,
+    date DATE NOT NULL
+);
+
+-- Dates in the current refresh run.  When starting a refresh run, we copy
+-- the rows from scheduled_updates here in order to delete just those
+-- lines after the refresh run.  Otherwise we might forget scheduled dates
+-- that have been added during a refresh run.  If this happens we're going
+-- to update these dates in the next refresh run.
 CREATE TABLE updates (
-    "date" date NOT NULL,
-    CONSTRAINT updates_pkey PRIMARY KEY(date)
+    id INTEGER,
+    date DATE
 );
 
 CREATE LANGUAGE plpgsql;
 
--- FUNCTION update_status
--- This keeps the updates table up to date for the time graphs.
-CREATE OR REPLACE FUNCTION update_status() RETURNS TRIGGER AS $$
-    BEGIN
-    IF (TG_OP='INSERT' OR TG_OP='UPDATE') THEN
-        IF (SELECT COUNT(*) FROM updates
-            WHERE DATE=DATE(new.validafter)) = 0 THEN
-            INSERT INTO updates
-            VALUES (DATE(NEW.validafter));
-        END IF;
-    END IF;
-    IF (TG_OP='DELETE' OR TG_OP='UPDATE') THEN
-        IF (SELECT COUNT(*) FROM updates
-            WHERE DATE=DATE(old.validafter)) = 0 THEN
-            INSERT INTO updates
-            VALUES (DATE(OLD.validafter));
-        END IF;
-    END IF;
-    RETURN NULL; -- result is ignored since this is an AFTER trigger
-END;
-$$ LANGUAGE plpgsql;
-
--- TRIGGER update_status
--- This calls the function update_status() each time a row is inserted,
--- updated, or deleted from the statusentry table.
-CREATE TRIGGER update_status
-AFTER INSERT OR UPDATE OR DELETE
-ON statusentry
-    FOR EACH ROW EXECUTE PROCEDURE update_status();
-
--- FUNCTION update_desc
--- This keeps the updates table up to date for the time graphs.
-CREATE OR REPLACE FUNCTION update_desc() RETURNS TRIGGER AS $$
-    BEGIN
-    IF (TG_OP='INSERT' OR TG_OP='UPDATE') THEN
-      BEGIN
-        IF (SELECT COUNT(*) FROM updates
-            WHERE DATE=DATE(new.published)) = 0 THEN
-            INSERT INTO updates
-            VALUES (DATE(NEW.published));
-        END IF;
-        IF (SELECT COUNT(*) FROM updates
-            WHERE DATE=DATE(new.published)+1) = 0 THEN
-            INSERT INTO updates
-            VALUES (DATE(NEW.published)+1);
-        END IF;
-      END;
-    END IF;
-    IF (TG_OP='DELETE' OR TG_OP='UPDATE') THEN
-      BEGIN
-        IF (SELECT COUNT(*) FROM updates
-            WHERE DATE=DATE(old.published)) = 0 THEN
-            INSERT INTO updates
-            VALUES (DATE(OLD.published));
-        END IF;
-        IF (SELECT COUNT(*) FROM updates
-            WHERE DATE=DATE(old.published)+1) = 0 THEN
-            INSERT INTO updates
-            VALUES (DATE(OLD.published)+1);
-        END IF;
-      END;
-    END IF;
-    RETURN NULL; -- result is ignored since this is an AFTER trigger
-END;
-$$ LANGUAGE plpgsql;
-
--- TRIGGER update_desc
--- This calls the function update_desc() each time a row is inserted,
--- updated, or deleted from the descriptors table.
-CREATE TRIGGER update_desc
-AFTER INSERT OR UPDATE OR DELETE
-ON descriptor
-    FOR EACH ROW EXECUTE PROCEDURE update_desc();
-
--- FUNCTION update_bwhist
--- This keeps the updates table up to date for the time graphs.
-CREATE OR REPLACE FUNCTION update_bwhist() RETURNS TRIGGER AS $$
-    BEGIN
-    IF (TG_OP='INSERT' OR TG_OP='UPDATE') THEN
-      IF (SELECT COUNT(*) FROM updates
-          WHERE DATE = DATE(NEW.intervalend)) = 0 THEN
-          INSERT INTO updates
-          VALUES (DATE(NEW.intervalend));
-      END IF;
-    END IF;
-    IF (TG_OP='DELETE' OR TG_OP='UPDATE') THEN
-      IF (SELECT COUNT(*) FROM updates
-          WHERE DATE = DATE(OLD.intervalend)) = 0 THEN
-          INSERT INTO updates
-          VALUES (DATE(OLD.intervalend));
-      END IF;
-    END IF;
-    RETURN NULL; -- result is ignored since this is an AFTER trigger
-END;
-$$ LANGUAGE plpgsql;
-
--- TRIGGER update_desc
--- This calls the function update_desc() each time a row is inserted,
--- updated, or deleted from the descriptors table.
-CREATE TRIGGER update_bwhist
-AFTER INSERT OR UPDATE OR DELETE
-ON bwhist
-    FOR EACH ROW EXECUTE PROCEDURE update_bwhist();
-
 -- FUNCTION refresh_relay_statuses_per_day()
 -- Updates helper table which is used to refresh the aggregate tables.
 CREATE OR REPLACE FUNCTION refresh_relay_statuses_per_day()
 RETURNS INTEGER AS $$
     BEGIN
     DELETE FROM relay_statuses_per_day
-    WHERE date IN (SELECT * FROM updates);
+    WHERE date IN (SELECT date FROM updates);
     INSERT INTO relay_statuses_per_day (date, count)
     SELECT DATE(validafter) AS date, COUNT(*) AS count
     FROM (SELECT DISTINCT validafter
@@ -374,7 +278,7 @@ CREATE OR REPLACE FUNCTION refresh_network_size() RETURNS INTEGER AS $$
     BEGIN
 
     DELETE FROM network_size
-    WHERE date IN (SELECT * FROM updates);
+    WHERE date IN (SELECT date FROM updates);
 
         INSERT INTO network_size
         (date, avg_running, avg_exit, avg_guard, avg_fast, avg_stable)
@@ -407,7 +311,7 @@ CREATE OR REPLACE FUNCTION refresh_network_size_hour() RETURNS INTEGER AS $$
     BEGIN
 
     DELETE FROM network_size_hour
-    WHERE DATE(validafter) IN (SELECT * FROM updates);
+    WHERE DATE(validafter) IN (SELECT date FROM updates);
 
     INSERT INTO network_size_hour
     (validafter, avg_running, avg_exit, avg_guard, avg_fast, avg_stable)
@@ -432,7 +336,7 @@ CREATE OR REPLACE FUNCTION refresh_relay_platforms() RETURNS INTEGER AS $$
     BEGIN
 
     DELETE FROM relay_platforms
-    WHERE date IN (SELECT * FROM updates);
+    WHERE date IN (SELECT date FROM updates);
 
     INSERT INTO relay_platforms
     (date, avg_linux, avg_darwin, avg_bsd, avg_windows, avg_other)
@@ -475,7 +379,7 @@ CREATE OR REPLACE FUNCTION refresh_relay_versions() RETURNS INTEGER AS $$
     BEGIN
 
     DELETE FROM relay_versions
-    WHERE date IN (SELECT * FROM updates);
+    WHERE date IN (SELECT date FROM updates);
 
     INSERT INTO relay_versions
     (date, version, relays)
@@ -508,7 +412,7 @@ CREATE OR REPLACE FUNCTION refresh_total_bandwidth() RETURNS INTEGER AS $$
     BEGIN
 
     DELETE FROM total_bandwidth
-    WHERE date IN (SELECT * FROM updates);
+    WHERE date IN (SELECT date FROM updates);
 
     INSERT INTO total_bandwidth
     (bwavg, bwburst, bwobserved, bwadvertised, date)
@@ -544,7 +448,7 @@ $$ LANGUAGE plpgsql;
 -- FUNCTION refresh_total_bwhist()
 CREATE OR REPLACE FUNCTION refresh_total_bwhist() RETURNS INTEGER AS $$
   BEGIN
-  DELETE FROM total_bwhist WHERE date IN (SELECT * FROM updates);
+  DELETE FROM total_bwhist WHERE date IN (SELECT date FROM updates);
   INSERT INTO total_bwhist (date, read, written, dirread, dirwritten)
   SELECT date,
          SUM(read) AS read,
@@ -591,7 +495,7 @@ CREATE OR REPLACE FUNCTION refresh_user_stats() RETURNS INTEGER AS $$
   BEGIN
   -- Start by deleting user statistics of the dates we're about to
   -- regenerate.
-  DELETE FROM user_stats WHERE date IN (SELECT * FROM updates);
+  DELETE FROM user_stats WHERE date IN (SELECT date FROM updates);
   -- Now insert new user statistics.
   INSERT INTO user_stats (date, country, r, dw, dr, drw, drr, bw, br, bwd,
       brd, bwr, brr, bwdr, brdr, bwp, brp, bwn, brn)
@@ -806,10 +710,11 @@ CREATE TABLE gettor_stats (
     CONSTRAINT gettor_stats_pkey PRIMARY KEY("date", bundle)
 );
 
--- FUNCTION refresh_all()
--- This function refreshes all statistics in the database.
+-- Refresh all statistics in the database.
 CREATE OR REPLACE FUNCTION refresh_all() RETURNS INTEGER AS $$
   BEGIN
+    DELETE FROM updates;
+    INSERT INTO updates SELECT * FROM scheduled_updates;
     PERFORM refresh_relay_statuses_per_day();
     PERFORM refresh_network_size();
     PERFORM refresh_network_size_hour();
@@ -818,7 +723,7 @@ CREATE OR REPLACE FUNCTION refresh_all() RETURNS INTEGER AS $$
     PERFORM refresh_total_bandwidth();
     PERFORM refresh_total_bwhist();
     PERFORM refresh_user_stats();
-    DELETE FROM updates;
+    DELETE FROM scheduled_updates WHERE id IN (SELECT id FROM updates);
   RETURN 1;
   END;
 $$ LANGUAGE plpgsql;
diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index b023227..e423ef9 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -93,6 +93,18 @@ public final class RelayDescriptorDatabaseImporter {
   private PreparedStatement psQs;
 
   /**
+   * Set of dates that have been inserted into the database for being
+   * included in the next refresh run.
+   */
+  private Set<Long> scheduledUpdates;
+
+  /**
+   * Prepared statement to insert a date into the database that shall be
+   * included in the next refresh run.
+   */
+  private PreparedStatement psU;
+
+  /**
    * Prepared statement to insert a network status consensus entry into
    * the database.
    */
@@ -280,6 +292,9 @@ public final class RelayDescriptorDatabaseImporter {
         this.psQ = conn.prepareStatement("INSERT INTO dirreq_stats "
             + "(source, statsend, seconds, country, requests) VALUES "
             + "(?, ?, ?, ?, ?)");
+        this.psU = conn.prepareStatement("INSERT INTO scheduled_updates "
+            + "(date) VALUES (?)");
+        this.scheduledUpdates = new HashSet<Long>();
       } catch (SQLException e) {
         this.logger.log(Level.WARNING, "Could not connect to database or "
             + "prepare statements.", e);
@@ -298,6 +313,24 @@ public final class RelayDescriptorDatabaseImporter {
     this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
   }
 
+  private void addDateToScheduledUpdates(long timestamp)
+      throws SQLException {
+    long dateMillis = 0L;
+    try {
+      dateMillis = this.dateTimeFormat.parse(
+          this.dateTimeFormat.format(timestamp).substring(0, 10)
+          + " 00:00:00").getTime();
+    } catch (ParseException e) {
+      this.logger.log(Level.WARNING, "Internal parsing error.", e);
+      return;
+    }
+    if (!this.scheduledUpdates.contains(dateMillis)) {
+      this.psU.setDate(1, new java.sql.Date(dateMillis));
+      this.psU.execute();
+      this.scheduledUpdates.add(dateMillis);
+    }
+  }
+
   /**
    * Insert network status consensus entry into database.
    */
@@ -308,6 +341,7 @@ public final class RelayDescriptorDatabaseImporter {
       String ports, byte[] rawDescriptor) {
     try {
       if (this.psSs != null && this.psRs != null && this.psR != null) {
+        this.addDateToScheduledUpdates(validAfter);
         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         Timestamp validAfterTimestamp = new Timestamp(validAfter);
         if (lastCheckedStatusEntries != validAfter) {
@@ -430,6 +464,9 @@ public final class RelayDescriptorDatabaseImporter {
       String extraInfoDigest, byte[] rawDescriptor) {
     try {
       if (this.psDs != null && this.psD != null) {
+        this.addDateToScheduledUpdates(published);
+        this.addDateToScheduledUpdates(
+            published + 24L * 60L * 60L * 1000L);
         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         this.psDs.setString(1, descriptor);
         ResultSet rs = psDs.executeQuery();
@@ -652,6 +689,7 @@ public final class RelayDescriptorDatabaseImporter {
   public void addConsensus(long validAfter, byte[] rawDescriptor) {
     try {
       if (this.psCs != null && this.psC != null) {
+        this.addDateToScheduledUpdates(validAfter);
         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         Timestamp validAfterTimestamp = new Timestamp(validAfter);
         this.psCs.setTimestamp(1, validAfterTimestamp, cal);
@@ -752,6 +790,7 @@ public final class RelayDescriptorDatabaseImporter {
     }
     if (this.psBs != null && this.psB != null) {
       try {
+        this.addDateToScheduledUpdates(statsEndTime);
         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         Timestamp statsEndTimestamp = new Timestamp(statsEndTime);
         this.psBs.setString(1, source);
@@ -814,6 +853,7 @@ public final class RelayDescriptorDatabaseImporter {
     }
     if (this.psQs != null && this.psQ != null) {
       try {
+        this.addDateToScheduledUpdates(statsEndTime);
         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         Timestamp statsEndTimestamp = new Timestamp(statsEndTime);
         this.psQs.setString(1, source);
@@ -874,6 +914,22 @@ public final class RelayDescriptorDatabaseImporter {
         + "conn-bi-direct stats lines", rcsCount, rrsCount, rvsCount,
         rdsCount, resCount, rhsCount, rqsCount, rbsCount));
 
+    /* Insert scheduled updates a second time, just in case the refresh
+     * run has started since inserting them the first time in which case
+     * it will miss the data inserted afterwards.  We cannot, however,
+     * insert them only now, because if a Java execution fails at a random
+     * point, we might have added data, but not the corresponding dates to
+     * update statistics. */
+    try {
+      for (long dateMillis : this.scheduledUpdates) {
+        this.psU.setDate(1, new java.sql.Date(dateMillis));
+        this.psU.execute();
+      }
+    } catch (SQLException e) {
+      this.logger.log(Level.WARNING, "Could not add scheduled dates for "
+          + "the next refresh run.", e);
+    }
+
     /* Commit any stragglers before closing. */
     if (this.conn != null) {
       try {



More information about the tor-commits mailing list