[tor-commits] [collector/master] Refactor ArchiveReader.

karsten at torproject.org karsten at torproject.org
Mon Aug 27 12:34:23 UTC 2018


commit 1caca7c1f4786ef31207b42ed8298998c989487b
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Mon Aug 20 21:08:19 2018 +0200

    Refactor ArchiveReader.
---
 .../collector/relaydescs/ArchiveReader.java        | 100 +++++++++++++--------
 .../collector/relaydescs/ArchiveWriter.java        |   3 +-
 2 files changed, 66 insertions(+), 37 deletions(-)

diff --git a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java
index 74700f7..7c59054 100644
--- a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java
+++ b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java
@@ -46,30 +46,52 @@ public class ArchiveReader {
   private Map<String, Set<String>> microdescriptorValidAfterTimes =
       new HashMap<>();
 
-  /** Reads all descriptors from the given directory, possibly using a
-   * parse history file, and passes them to the given descriptor
-   * parser. */
-  public ArchiveReader(RelayDescriptorParser rdp, File archivesDirectory,
-      File statsDirectory, boolean keepImportHistory) {
+  private RelayDescriptorParser rdp;
+
+  private File archivesDirectory;
+
+  private boolean keepImportHistory;
 
+  private int parsedFiles = 0;
+
+  private int ignoredFiles = 0;
+
+  private SortedSet<String> archivesImportHistory = new TreeSet<>();
+
+  private File archivesImportHistoryFile;
+
+  /** Initializes an archive reader but without reading any descriptors yet. */
+  ArchiveReader(RelayDescriptorParser rdp, File archivesDirectory,
+      File statsDirectory, boolean keepImportHistory) {
     if (rdp == null || archivesDirectory == null
         || statsDirectory == null) {
       throw new IllegalArgumentException();
     }
-
-    rdp.setArchiveReader(this);
-    int parsedFiles = 0;
-    int ignoredFiles = 0;
-    SortedSet<String> archivesImportHistory = new TreeSet<>();
-    File archivesImportHistoryFile = new File(statsDirectory,
+    this.rdp = rdp;
+    this.rdp.setArchiveReader(this);
+    this.archivesDirectory = archivesDirectory;
+    this.keepImportHistory = keepImportHistory;
+    this.archivesImportHistoryFile = new File(statsDirectory,
         "archives-import-history");
-    if (keepImportHistory && archivesImportHistoryFile.exists()) {
+  }
+
+  /** Reads all descriptors from the given directory, possibly using a
+   * parse history file, and passes them to the given descriptor
+   * parser. */
+  public void readDescriptors() {
+    this.readHistoryFile();
+    this.readDescriptorFiles();
+    this.writeHistoryFile();
+  }
+
+  private void readHistoryFile() {
+    if (this.keepImportHistory && this.archivesImportHistoryFile.exists()) {
       try {
         BufferedReader br = new BufferedReader(new FileReader(
-            archivesImportHistoryFile));
+            this.archivesImportHistoryFile));
         String line;
         while ((line = br.readLine()) != null) {
-          archivesImportHistory.add(line);
+          this.archivesImportHistory.add(line);
         }
         br.close();
       } catch (IOException e) {
@@ -77,11 +99,14 @@ public class ArchiveReader {
             + "history file. Skipping.", e);
       }
     }
-    if (archivesDirectory.exists()) {
-      logger.debug("Importing files in directory " + archivesDirectory
+  }
+
+  private void readDescriptorFiles() {
+    if (this.archivesDirectory.exists()) {
+      logger.debug("Importing files in directory " + this.archivesDirectory
           + "/...");
       Stack<File> filesInInputDir = new Stack<>();
-      filesInInputDir.add(archivesDirectory);
+      filesInInputDir.add(this.archivesDirectory);
       List<File> problems = new ArrayList<>();
       Set<File> filesToRetry = new HashSet<>();
       while (!filesInInputDir.isEmpty()) {
@@ -91,9 +116,9 @@ public class ArchiveReader {
         } else {
           try {
             BufferedInputStream bis;
-            if (keepImportHistory
-                && archivesImportHistory.contains(pop.getName())) {
-              ignoredFiles++;
+            if (this.keepImportHistory
+                && this.archivesImportHistory.contains(pop.getName())) {
+              this.ignoredFiles++;
               continue;
             } else if (pop.getName().endsWith(".tar.bz2")) {
               logger.warn("Cannot parse compressed tarball "
@@ -116,15 +141,15 @@ public class ArchiveReader {
             }
             bis.close();
             byte[] allData = baos.toByteArray();
-            boolean stored = rdp.parse(allData);
+            boolean stored = this.rdp.parse(allData);
             if (!stored) {
               filesToRetry.add(pop);
               continue;
             }
-            if (keepImportHistory) {
-              archivesImportHistory.add(pop.getName());
+            if (this.keepImportHistory) {
+              this.archivesImportHistory.add(pop.getName());
             }
-            parsedFiles++;
+            this.parsedFiles++;
           } catch (IOException e) {
             problems.add(pop);
             if (problems.size() > 3) {
@@ -219,10 +244,10 @@ public class ArchiveReader {
               }
             }
           }
-          if (keepImportHistory) {
-            archivesImportHistory.add(pop.getName());
+          if (this.keepImportHistory) {
+            this.archivesImportHistory.add(pop.getName());
           }
-          parsedFiles++;
+          this.parsedFiles++;
         } catch (IOException e) {
           problems.add(pop);
           if (problems.size() > 3) {
@@ -232,10 +257,10 @@ public class ArchiveReader {
       }
       if (problems.isEmpty()) {
         logger.debug("Finished importing files in directory "
-            + archivesDirectory + "/.");
+            + this.archivesDirectory + "/.");
       } else {
         StringBuilder sb = new StringBuilder("Failed importing files in "
-            + "directory " + archivesDirectory + "/:");
+            + "directory " + this.archivesDirectory + "/:");
         int printed = 0;
         for (File f : problems) {
           sb.append("\n  ").append(f.getAbsolutePath());
@@ -246,12 +271,15 @@ public class ArchiveReader {
         }
       }
     }
-    if (keepImportHistory) {
+  }
+
+  private void writeHistoryFile() {
+    if (this.keepImportHistory) {
       try {
-        archivesImportHistoryFile.getParentFile().mkdirs();
+        this.archivesImportHistoryFile.getParentFile().mkdirs();
         BufferedWriter bw = new BufferedWriter(new FileWriter(
-            archivesImportHistoryFile));
-        for (String line : archivesImportHistory) {
+            this.archivesImportHistoryFile));
+        for (String line : this.archivesImportHistory) {
           bw.write(line + "\n");
         }
         bw.close();
@@ -261,15 +289,15 @@ public class ArchiveReader {
       }
     }
     logger.info("Finished importing relay descriptors from local "
-        + "directory:\nParsed " + parsedFiles + ", ignored "
-        + ignoredFiles + " files.");
+        + "directory:\nParsed " + this.parsedFiles + ", ignored "
+        + this.ignoredFiles + " files.");
   }
 
   /** Stores the valid-after time and microdescriptor digests of a given
    * microdesc consensus, so that microdescriptors (which don't contain a
    * publication time) can later be sorted into the correct month
    * folders. */
-  public void haveParsedMicrodescConsensus(String validAfterTime,
+  void haveParsedMicrodescConsensus(String validAfterTime,
       SortedSet<String> microdescriptorDigests) {
     for (String microdescriptor : microdescriptorDigests) {
       if (!this.microdescriptorValidAfterTimes.containsKey(
diff --git a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java
index ac3f5e3..8679439 100644
--- a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java
+++ b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java
@@ -169,7 +169,8 @@ public class ArchiveWriter extends CollecTorMain {
       new ArchiveReader(rdp,
           config.getPath(Key.RelayLocalOrigins).toFile(),
           statsDirectory,
-          config.getBool(Key.KeepDirectoryArchiveImportHistory));
+          config.getBool(Key.KeepDirectoryArchiveImportHistory))
+          .readDescriptors();
       this.intermediateStats("importing relay descriptors from local "
           + "directory");
     }





More information about the tor-commits mailing list