[tor-commits] [collector/release] Remember processed files between module runs.

karsten at torproject.org karsten at torproject.org
Wed Jan 15 22:25:08 UTC 2020


commit 741401a0daffda52fd1de81b29b276ed9e939ba5
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Tue Jan 7 13:30:28 2020 +0100

    Remember processed files between module runs.
    
    The three recently added modules to archive Snowflake statistics,
    bridge pool assignments, and BridgeDB metrics have in common that they
    process any input files regardless of whether they already processed
    them before.
    
    The problem is that the input files processed by these modules are
    either never removed (Snowflake statistics) or only removed manually
    by the operator (bridge pool assignments and BridgeDB statistics).
    
    The effect is that non-recent BridgeDB metrics and bridge pool
    assignments are being placed in the indexed/recent/ directory in the
    next execution after they are deleted for being older than 72 hours.
    The same would happen with Snowflake statistics after the operator
    removes them from the out/ directory.
    
    The fix is to use a state file containing file names of previously
    processed files and only process a file not found in there. This is
    the same approach as taken for bridge descriptor tarballs.
---
 CHANGELOG.md                                       |  3 ++
 .../bridgedb/BridgedbMetricsProcessor.java         | 18 +++++++++
 .../BridgePoolAssignmentsProcessor.java            | 19 +++++++++
 .../metrics/collector/cron/CollecTorMain.java      | 47 ++++++++++++++++++++++
 .../snowflake/SnowflakeStatsDownloader.java        | 13 ++++++
 5 files changed, 100 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index fe7937c..fb295d6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,9 @@
    - Give up on periodically checking the configuration file for
      updates and reloading it in case of changes.
    - Avoid reprocessing webstats files.
+   - Remember processed files between module runs for archived
+     Snowflake statistics, bridge pool assignments, and BridgeDB
+     metrics.
 
  * Minor changes
    - Remove dependency on metrics-lib's internal package.
diff --git a/src/main/java/org/torproject/metrics/collector/bridgedb/BridgedbMetricsProcessor.java b/src/main/java/org/torproject/metrics/collector/bridgedb/BridgedbMetricsProcessor.java
index c6b939b..0073ee3 100644
--- a/src/main/java/org/torproject/metrics/collector/bridgedb/BridgedbMetricsProcessor.java
+++ b/src/main/java/org/torproject/metrics/collector/bridgedb/BridgedbMetricsProcessor.java
@@ -24,7 +24,9 @@ import java.nio.file.Paths;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
+import java.util.SortedSet;
 import java.util.Stack;
+import java.util.TreeSet;
 
 public class BridgedbMetricsProcessor extends CollecTorMain {
 
@@ -40,6 +42,11 @@ public class BridgedbMetricsProcessor extends CollecTorMain {
   private File inputDirectory;
 
   /**
+   * File containing file names of previously parsed BridgeDB metrics files.
+   */
+  private Path parsedBridgedbMetricsFile;
+
+  /**
    * Directory for writing BridgeDB statistics files to be archived in tarballs.
    */
   private String outputPathName;
@@ -88,11 +95,19 @@ public class BridgedbMetricsProcessor extends CollecTorMain {
   protected void startProcessing() throws ConfigurationException {
     logger.info("Starting BridgeDB statistics module of CollecTor.");
     this.initializeConfiguration();
+    SortedSet<Path> previouslyProcessedFiles = this.readProcessedFiles(
+        this.parsedBridgedbMetricsFile);
+    SortedSet<Path> processedFiles = new TreeSet<>();
     logger.info("Reading BridgeDB statistics files in {}.",
         this.inputDirectory);
     for (Descriptor descriptor
         : DescriptorSourceFactory.createDescriptorReader()
         .readDescriptors(this.inputDirectory)) {
+      processedFiles.add(descriptor.getDescriptorFile().toPath());
+      if (previouslyProcessedFiles.contains(
+          descriptor.getDescriptorFile().toPath())) {
+        continue;
+      }
       if (descriptor instanceof BridgedbMetrics) {
         BridgedbMetrics bridgedbMetrics = (BridgedbMetrics) descriptor;
         BridgedbMetricsPersistence persistence
@@ -114,6 +129,7 @@ public class BridgedbMetricsProcessor extends CollecTorMain {
     }
     logger.info("Cleaning up directory {} containing recent files.",
         this.recentPathName);
+    this.writeProcessedFiles(this.parsedBridgedbMetricsFile, processedFiles);
     this.cleanUpRsyncDirectory();
     logger.info("Finished processing BridgeDB statistics file(s).");
   }
@@ -123,6 +139,8 @@ public class BridgedbMetricsProcessor extends CollecTorMain {
    * storing them in instance attributes.
    */
   private void initializeConfiguration() throws ConfigurationException {
+    this.parsedBridgedbMetricsFile = this.config.getPath(Key.StatsPath)
+        .resolve("processed-bridgedb-metrics");
     this.outputPathName = config.getPath(Key.OutputPath).toString();
     this.recentPathName = config.getPath(Key.RecentPath).toString();
     this.inputDirectory =
diff --git a/src/main/java/org/torproject/metrics/collector/bridgepools/BridgePoolAssignmentsProcessor.java b/src/main/java/org/torproject/metrics/collector/bridgepools/BridgePoolAssignmentsProcessor.java
index 6034e1d..ffae262 100644
--- a/src/main/java/org/torproject/metrics/collector/bridgepools/BridgePoolAssignmentsProcessor.java
+++ b/src/main/java/org/torproject/metrics/collector/bridgepools/BridgePoolAssignmentsProcessor.java
@@ -25,6 +25,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.time.DateTimeException;
 import java.time.Instant;
@@ -36,8 +37,10 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedMap;
+import java.util.SortedSet;
 import java.util.Stack;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
 public class BridgePoolAssignmentsProcessor extends CollecTorMain {
 
@@ -54,6 +57,11 @@ public class BridgePoolAssignmentsProcessor extends CollecTorMain {
   private File assignmentsDirectory;
 
   /**
+   * File containing file names of previously parsed assignments files.
+   */
+  private Path parsedBridgePoolAssignmentsFile;
+
+  /**
    * Directory containing sanitized bridge pool assignments for tarballs.
    */
   private String outputPathName;
@@ -117,9 +125,16 @@ public class BridgePoolAssignmentsProcessor extends CollecTorMain {
   protected void startProcessing() throws ConfigurationException {
     logger.info("Starting bridge-pool-assignments module of CollecTor.");
     this.initializeConfiguration();
+    SortedSet<Path> previouslyProcessedFiles = this.readProcessedFiles(
+        this.parsedBridgePoolAssignmentsFile);
+    SortedSet<Path> processedFiles = new TreeSet<>();
     List<File> assignmentFiles = this.listAssignmentFiles();
     LocalDateTime latestPublished = null;
     for (File assignmentFile : assignmentFiles) {
+      processedFiles.add(assignmentFile.toPath());
+      if (previouslyProcessedFiles.contains(assignmentFile.toPath())) {
+        continue;
+      }
       logger.info("Processing bridge pool assignment file '{}'...",
           assignmentFile.getAbsolutePath());
       SortedMap<LocalDateTime, SortedMap<String, String>>
@@ -161,6 +176,8 @@ public class BridgePoolAssignmentsProcessor extends CollecTorMain {
           + "published at {}, which is more than 5:30 hours in the past.",
           latestPublished);
     }
+    this.writeProcessedFiles(this.parsedBridgePoolAssignmentsFile,
+        processedFiles);
     this.cleanUpRsyncDirectory();
     logger.info("Finished processing bridge pool assignment file(s).");
   }
@@ -170,6 +187,8 @@ public class BridgePoolAssignmentsProcessor extends CollecTorMain {
    * storing them in instance attributes.
    */
   private void initializeConfiguration() throws ConfigurationException {
+    this.parsedBridgePoolAssignmentsFile = this.config.getPath(Key.StatsPath)
+        .resolve("parsed-bridge-pool-assignments");
     this.outputPathName = Paths.get(config.getPath(Key.OutputPath).toString(),
         "bridge-pool-assignments").toString();
     this.recentPathName = Paths.get(config.getPath(Key.RecentPath).toString(),
diff --git a/src/main/java/org/torproject/metrics/collector/cron/CollecTorMain.java b/src/main/java/org/torproject/metrics/collector/cron/CollecTorMain.java
index 49174ca..979c3ca 100644
--- a/src/main/java/org/torproject/metrics/collector/cron/CollecTorMain.java
+++ b/src/main/java/org/torproject/metrics/collector/cron/CollecTorMain.java
@@ -16,9 +16,14 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.Callable;
 
 public abstract class CollecTorMain extends SyncManager
@@ -128,5 +133,47 @@ public abstract class CollecTorMain extends SyncManager
           + ioe.getMessage(), ioe);
     }
   }
+
+  /**
+   * Read file names of processed files from the given state file.
+   *
+   * @param stateFile State file to read file names from.
+   * @return File names of processed files.
+   */
+  public SortedSet<Path> readProcessedFiles(Path stateFile) {
+    SortedSet<Path> processedFiles = new TreeSet<>();
+    if (Files.exists(stateFile)) {
+      try {
+        for (String line : Files.readAllLines(stateFile)) {
+          processedFiles.add(Paths.get(line));
+        }
+      } catch (IOException e) {
+        logger.warn("I/O error while reading processed files.", e);
+      }
+    }
+    return processedFiles;
+  }
+
+  /**
+   * Write file names of processed files to the state file.
+   *
+   * @param stateFile State file to write file names to.
+   * @param processedFiles File names of processed files.
+   */
+  public void writeProcessedFiles(Path stateFile,
+      SortedSet<Path> processedFiles) {
+    List<String> lines = new ArrayList<>();
+    for (Path processedFile : processedFiles) {
+      lines.add(processedFile.toString());
+    }
+    try {
+      if (!Files.exists(stateFile)) {
+        Files.createDirectories(stateFile.getParent());
+      }
+      Files.write(stateFile, lines);
+    } catch (IOException e) {
+      logger.warn("I/O error while writing processed files.", e);
+    }
+  }
 }
 
diff --git a/src/main/java/org/torproject/metrics/collector/snowflake/SnowflakeStatsDownloader.java b/src/main/java/org/torproject/metrics/collector/snowflake/SnowflakeStatsDownloader.java
index f40c311..cbca74a 100644
--- a/src/main/java/org/torproject/metrics/collector/snowflake/SnowflakeStatsDownloader.java
+++ b/src/main/java/org/torproject/metrics/collector/snowflake/SnowflakeStatsDownloader.java
@@ -23,6 +23,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.SortedSet;
@@ -71,6 +73,11 @@ public class SnowflakeStatsDownloader extends CollecTorMain {
     }
     logger.debug("Finished downloading {}.", url);
 
+    Path parsedSnowflakeStatsFile = this.config.getPath(Key.StatsPath)
+        .resolve("processed-snowflake-stats");
+    SortedSet<Path> previouslyProcessedFiles = this.readProcessedFiles(
+        parsedSnowflakeStatsFile);
+    SortedSet<Path> processedFiles = new TreeSet<>();
     DescriptorParser descriptorParser =
         DescriptorSourceFactory.createDescriptorParser();
     SortedSet<LocalDateTime> snowflakeStatsEnds = new TreeSet<>();
@@ -85,6 +92,11 @@ public class SnowflakeStatsDownloader extends CollecTorMain {
             = new SnowflakeStatsPersistence(snowflakeStats);
         File tarballFile = new File(outputPathName + "/"
             + persistence.getStoragePath());
+        Path relativeFileName = Paths.get(tarballFile.getName());
+        processedFiles.add(relativeFileName);
+        if (previouslyProcessedFiles.contains(relativeFileName)) {
+          continue;
+        }
         if (tarballFile.exists()) {
           continue;
         }
@@ -106,6 +118,7 @@ public class SnowflakeStatsDownloader extends CollecTorMain {
           snowflakeStatsEnds.last());
     }
 
+    this.writeProcessedFiles(parsedSnowflakeStatsFile, processedFiles);
     this.cleanUpRsyncDirectory();
   }
 





More information about the tor-commits mailing list