[tor-commits] [collector/master] Implements #19018 'Run CollecTor without crontab'. Added scheduler logic, adapted and added tests, adapted coverage check.

karsten at torproject.org karsten at torproject.org
Mon Jul 25 19:54:28 UTC 2016


commit 997d4a4a2e6fca2bc95134537fcfebe930db8de1
Author: iwakeh <iwakeh at torproject.org>
Date:   Wed Jul 20 13:23:18 2016 +0200

    Implements #19018 'Run CollecTor without crontab'. Added scheduler logic, adapted and added tests, adapted coverage check.
---
 build.xml                                          |   8 +-
 src/main/java/org/torproject/collector/Main.java   |  54 ++++-------
 .../bridgedescs/SanitizedBridgesWriter.java        |  32 +------
 .../torproject/collector/cron/CollecTorMain.java   |  28 ++++++
 .../org/torproject/collector/cron/Scheduler.java   | 102 +++++++++++++++++++++
 .../collector/exitlists/ExitListDownloader.java    |  27 +-----
 .../collector/index/CreateIndexJson.java           |  29 ++++--
 .../org/torproject/collector/main/LockFile.java    |  66 -------------
 .../collector/relaydescs/ArchiveWriter.java        |  61 ++++--------
 .../collector/torperf/TorperfDownloader.java       |  33 ++-----
 .../java/org/torproject/collector/MainTest.java    |  84 +++++++++++++++--
 .../java/org/torproject/collector/cron/Dummy.java  |  15 +++
 .../torproject/collector/cron/SchedulerTest.java   |  54 +++++++++++
 src/test/resources/junittest.policy                |   1 +
 14 files changed, 355 insertions(+), 239 deletions(-)

diff --git a/build.xml b/build.xml
index 51471f6..ba50fbf 100644
--- a/build.xml
+++ b/build.xml
@@ -210,7 +210,7 @@
       <classpath refid="cobertura.test.classpath" />
       <formatter type="xml" />
       <batchtest toDir="${testresult}" >
-        <fileset dir="${testclasses}" />
+        <fileset dir="${testclasses}" includes="**/*Test.class" />
       </batchtest>
     </junit>
     <cobertura-report format="html" destdir="${coverageresult}" >
@@ -218,8 +218,10 @@
         <include name="**/*.java" />
       </fileset>
     </cobertura-report>
-    <cobertura-check branchrate="0" totallinerate="15" totalbranchrate="5" >
+    <cobertura-check branchrate="0" totallinerate="4" totalbranchrate="1" >
       <regex pattern="org.torproject.collector.conf.*" branchrate="100" linerate="100"/>
+      <regex pattern="org.torproject.collector.cron.*" branchrate="66" linerate="73"/>
+      <regex pattern="org.torproject.collector.Main" branchrate="66" linerate="94"/>
     </cobertura-check>
   </target>
   <target name="test" depends="compile,compile-tests">
@@ -231,7 +233,7 @@
       <classpath refid="test.classpath"/>
       <formatter type="plain" usefile="false"/>
       <batchtest>
-        <fileset dir="${testclasses}" />
+        <fileset dir="${testclasses}" includes="**/*Test.class" />
       </batchtest>
     </junit>
   </target>
diff --git a/src/main/java/org/torproject/collector/Main.java b/src/main/java/org/torproject/collector/Main.java
index 81234c3..97c7a0c 100644
--- a/src/main/java/org/torproject/collector/Main.java
+++ b/src/main/java/org/torproject/collector/Main.java
@@ -5,6 +5,9 @@ package org.torproject.collector;
 
 import org.torproject.collector.bridgedescs.SanitizedBridgesWriter;
 import org.torproject.collector.conf.Configuration;
+import org.torproject.collector.conf.Key;
+import org.torproject.collector.cron.CollecTorMain;
+import org.torproject.collector.cron.Scheduler;
 import org.torproject.collector.exitlists.ExitListDownloader;
 import org.torproject.collector.index.CreateIndexJson;
 import org.torproject.collector.relaydescs.ArchiveWriter;
@@ -37,34 +40,31 @@ public class Main {
   /** All possible main classes.
    * If a new CollecTorMain class is available, just add it to this map.
    */
-  static final Map<String, Class> collecTorMains = new HashMap<>();
+  static final Map<Key, Class<? extends CollecTorMain>> collecTorMains = new HashMap<>();
 
   static { // add a new main class here
-    collecTorMains.put("bridgedescs", SanitizedBridgesWriter.class);
-    collecTorMains.put("exitlists", ExitListDownloader.class);
-    collecTorMains.put("updateindex", CreateIndexJson.class);
-    collecTorMains.put("relaydescs", ArchiveWriter.class);
-    collecTorMains.put("torperf", TorperfDownloader.class);
+    collecTorMains.put(Key.BridgedescsActivated, SanitizedBridgesWriter.class);
+    collecTorMains.put(Key.ExitlistsActivated, ExitListDownloader.class);
+    collecTorMains.put(Key.UpdateindexActivated, CreateIndexJson.class);
+    collecTorMains.put(Key.RelaydescsActivated, ArchiveWriter.class);
+    collecTorMains.put(Key.TorperfActivated, TorperfDownloader.class);
   }
 
-  private static final String modules = collecTorMains.keySet().toString()
-      .replace("[", "").replace("]", "").replaceAll(", ", "|");
-
   private static Configuration conf = new Configuration();
 
   /**
-   * One argument is necessary.
+   * At most one argument.
    * See class description {@link Main}.
    */
   public static void main(String[] args) throws Exception {
     File confFile = null;
-    if (null == args || args.length < 1 || args.length > 2) {
-      printUsage("CollecTor needs one or two arguments.");
-      return;
-    } else if (args.length == 1) {
+    if (args == null || args.length == 0) {
       confFile = new File(CONF_FILE);
-    } else if (args.length == 2) {
-      confFile = new File(args[1]);
+    } else if (args.length == 1) {
+      confFile = new File(args[0]);
+    } else {
+      printUsage("CollecTor takes at most one argument.");
+      return;
     }
     if (!confFile.exists() || confFile.length() < 1L) {
       writeDefaultConfig(confFile);
@@ -72,12 +72,12 @@ public class Main {
     } else {
       readConfigurationFrom(confFile);
     }
-    invokeGivenMain(args[0]);
+    Scheduler.getInstance().scheduleModuleRuns(collecTorMains, conf);
   }
 
   private static void printUsage(String msg) {
     final String usage = "Usage:\njava -jar collector.jar "
-        + "<" + modules + ">  [path/to/configFile]";
+        + "[path/to/configFile]";
     System.out.println(msg + "\n" + usage);
   }
 
@@ -105,23 +105,5 @@ public class Main {
     }
   }
 
-  private static void invokeGivenMain(String mainId) {
-    Class clazz = collecTorMains.get(mainId);
-    if (null == clazz) {
-      printUsage("Unknown argument: " + mainId);
-    }
-    invokeMainOnClass(clazz);
-  }
-
-  private static void invokeMainOnClass(Class clazz) {
-    try {
-      clazz.getMethod("main", new Class[] { Configuration.class })
-          .invoke(null, (Object) conf);
-    } catch (NoSuchMethodException | IllegalAccessException
-       | InvocationTargetException e) {
-      log.error("Cannot invoke 'main' method on "
-          + clazz.getName() + ". " + e, e);
-    }
-  }
 }
 
diff --git a/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java b/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java
index 121f8ca..5c33566 100644
--- a/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java
+++ b/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java
@@ -6,7 +6,7 @@ package org.torproject.collector.bridgedescs;
 import org.torproject.collector.conf.Configuration;
 import org.torproject.collector.conf.ConfigurationException;
 import org.torproject.collector.conf.Key;
-import org.torproject.collector.main.LockFile;
+import org.torproject.collector.cron.CollecTorMain;
 
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Base64;
@@ -49,36 +49,12 @@ import java.util.TreeMap;
  * by the bridge to advertise their capabilities), and extra-info
  * descriptors (published by the bridge, mainly for statistical analysis).</p>
  */
-public class SanitizedBridgesWriter extends Thread {
+public class SanitizedBridgesWriter extends CollecTorMain {
 
   private static Logger logger = LoggerFactory.getLogger(SanitizedBridgesWriter.class);
 
-  /** Executes the bridge-descriptors module using the given
-   * configuration. */
-  public static void main(Configuration config) throws ConfigurationException {
-
-    logger.info("Starting bridge-descriptors module of CollecTor.");
-
-    // Use lock file to avoid overlapping runs
-    LockFile lf = new LockFile(config.getPath(Key.LockFilePath).toString(), "bridge-descriptors");
-    lf.acquireLock();
-
-    // Sanitize bridge descriptors
-    new SanitizedBridgesWriter(config).run();
-
-    // Remove lock file
-    lf.releaseLock();
-
-    logger.info("Terminating bridge-descriptors module of CollecTor.");
-  }
-
-  private Configuration config;
-
-  /**
-   * Initializes this class.
-   */
   public SanitizedBridgesWriter(Configuration config) {
-    this.config = config;
+    super(config);
   }
 
   private String rsyncCatString;
@@ -106,12 +82,14 @@ public class SanitizedBridgesWriter extends Thread {
 
   @Override
   public void run() {
+    logger.info("Starting bridge-descriptors module of CollecTor.");
     try {
       startProcessing();
     } catch (ConfigurationException ce) {
       logger.error("Configuration failed: " + ce, ce);
       throw new RuntimeException(ce);
     }
+    logger.info("Terminating bridge-descriptors module of CollecTor.");
   }
 
   private void startProcessing() throws ConfigurationException {
diff --git a/src/main/java/org/torproject/collector/cron/CollecTorMain.java b/src/main/java/org/torproject/collector/cron/CollecTorMain.java
new file mode 100644
index 0000000..7a00e68
--- /dev/null
+++ b/src/main/java/org/torproject/collector/cron/CollecTorMain.java
@@ -0,0 +1,28 @@
+/* Copyright 2016 The Tor Project
+ * See LICENSE for licensing information */
+
+package org.torproject.collector.cron;
+
+import org.torproject.collector.conf.Configuration;
+import org.torproject.collector.conf.Key;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Calendar;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public abstract class CollecTorMain implements Runnable {
+
+  protected Configuration config;
+
+  public CollecTorMain( Configuration conf) {
+    this.config = conf;
+  }
+
+}
+
diff --git a/src/main/java/org/torproject/collector/cron/Scheduler.java b/src/main/java/org/torproject/collector/cron/Scheduler.java
new file mode 100644
index 0000000..e4f2aa3
--- /dev/null
+++ b/src/main/java/org/torproject/collector/cron/Scheduler.java
@@ -0,0 +1,102 @@
+/* Copyright 2016 The Tor Project
+ * See LICENSE for licensing information */
+
+package org.torproject.collector.cron;
+
+import org.torproject.collector.conf.Configuration;
+import org.torproject.collector.conf.ConfigurationException;
+import org.torproject.collector.conf.Key;
+import org.torproject.collector.cron.CollecTorMain;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Scheduler that starts the modules configured in collector.properties.
+ */
+public class Scheduler {
+
+  public static final String ACTIVATED = "Activated";
+  public static final String PERIODMIN = "PeriodMinutes";
+  public static final String OFFSETMIN = "OffsetMinutes";
+
+  private final Logger log = LoggerFactory.getLogger(Scheduler.class);
+
+  private final ScheduledExecutorService scheduler =
+      Executors.newScheduledThreadPool(1);
+
+  private static Scheduler instance = new Scheduler();
+
+  private Scheduler(){}
+
+  public static Scheduler getInstance() {
+    return instance;
+  }
+
+  /**
+   * Schedule all classes given according to the parameters in the
+   * the configuration.
+   */
+  public void scheduleModuleRuns(Map<Key,
+      Class<? extends CollecTorMain>> collecTorMains, Configuration conf) {
+    for ( Map.Entry<Key, Class<? extends CollecTorMain>> ctmEntry
+        : collecTorMains.entrySet() ) {
+      try {
+        if ( conf.getBool(ctmEntry.getKey()) ) {
+          String prefix = ctmEntry.getKey().name().replace(ACTIVATED, "");
+          CollecTorMain ctm = ctmEntry.getValue()
+              .getConstructor(Configuration.class).newInstance(conf);
+          scheduleExecutions(ctm,
+              conf.getInt(Key.valueOf(prefix + OFFSETMIN)),
+              conf.getInt(Key.valueOf(prefix + PERIODMIN)));
+        }
+      } catch (ConfigurationException | IllegalAccessException
+          | InstantiationException | InvocationTargetException
+          | NoSuchMethodException | RuntimeException ex) {
+        log.error("Cannot schedule " + ctmEntry.getValue().getName()
+            + ". Reason: " + ex.getMessage(), ex);
+        shutdownScheduler();
+        throw new RuntimeException("Halted scheduling.", ex);
+      }
+    }
+  }
+
+  private void scheduleExecutions(CollecTorMain ctm, int offset, int period) {
+    this.log.info("Periodic updater started for " + ctm.getClass().getName()
+        +  "; offset=" + offset + ", period=" + period + ".");
+    int currentMinute = Calendar.getInstance().get(Calendar.MINUTE);
+    int initialDelay = (60 - currentMinute + offset) % 60;
+
+    /* Run after initialDelay delay and then every period min. */
+    this.log.info("Periodic updater will start every " + period + "th min "
+        + "at minute " + ((currentMinute + initialDelay) % 60) + ".");
+    this.scheduler.scheduleAtFixedRate(ctm, initialDelay, 60,
+        TimeUnit.MINUTES);
+  }
+
+  /**
+   * Try to shutdown smoothly, i.e., wait for running tasks to terminate.
+   */
+  public void shutdownScheduler() {
+    try {
+      scheduler.shutdown();
+      scheduler.awaitTermination(20L, java.util.concurrent.TimeUnit.MINUTES);
+      log.info("Shutdown of all scheduled tasks completed successfully.");
+    } catch ( InterruptedException ie ) {
+      List<Runnable> notTerminated = scheduler.shutdownNow();
+      log.error("Regular shutdown failed for: " + notTerminated);
+      if ( !notTerminated.isEmpty() ) {
+        log.error("Forced shutdown failed for: " + notTerminated);
+      }
+    }
+  }
+}
+
diff --git a/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java b/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java
index 79fe19f..e6720cd 100644
--- a/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java
+++ b/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java
@@ -6,7 +6,7 @@ package org.torproject.collector.exitlists;
 import org.torproject.collector.conf.Configuration;
 import org.torproject.collector.conf.ConfigurationException;
 import org.torproject.collector.conf.Key;
-import org.torproject.collector.main.LockFile;
+import org.torproject.collector.cron.CollecTorMain;
 import org.torproject.descriptor.Descriptor;
 import org.torproject.descriptor.DescriptorParseException;
 import org.torproject.descriptor.DescriptorParser;
@@ -32,42 +32,25 @@ import java.util.Stack;
 import java.util.TimeZone;
 import java.util.TreeSet;
 
-public class ExitListDownloader extends Thread {
+public class ExitListDownloader extends CollecTorMain {
 
   private static Logger logger = LoggerFactory.getLogger(ExitListDownloader.class);
 
-  /** Execute the exit-lists module using the given configuration. */
-  public static void main(Configuration config) throws ConfigurationException {
-    logger.info("Starting exit-lists module of CollecTor.");
-
-    // Use lock file to avoid overlapping runs
-    LockFile lf = new LockFile(config.getPath(Key.LockFilePath).toString(), "exit-lists");
-    lf.acquireLock();
-
-    // Download exit list and store it to disk
-    new ExitListDownloader(config).run();
-
-    // Remove lock file
-    lf.releaseLock();
-
-    logger.info("Terminating exit-lists module of CollecTor.");
-  }
-
-  private Configuration config;
-
   /** Instanciate the exit-lists module using the given configuration. */
   public ExitListDownloader(Configuration config) {
-    this.config = config;
+    super(config);
   }
 
   @Override
   public void run() {
+    logger.info("Starting exit-lists module of CollecTor.");
     try {
       startProcessing();
     } catch (ConfigurationException ce) {
       logger.error("Configuration failed: " + ce, ce);
       throw new RuntimeException(ce);
     }
+    logger.info("Terminating exit-lists module of CollecTor.");
   }
 
   private void startProcessing() throws ConfigurationException {
diff --git a/src/main/java/org/torproject/collector/index/CreateIndexJson.java b/src/main/java/org/torproject/collector/index/CreateIndexJson.java
index 7da2b5e..80f183c 100644
--- a/src/main/java/org/torproject/collector/index/CreateIndexJson.java
+++ b/src/main/java/org/torproject/collector/index/CreateIndexJson.java
@@ -6,6 +6,7 @@ package org.torproject.collector.index;
 import org.torproject.collector.conf.Configuration;
 import org.torproject.collector.conf.ConfigurationException;
 import org.torproject.collector.conf.Key;
+import org.torproject.collector.cron.CollecTorMain;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -35,7 +36,7 @@ import java.util.zip.GZIPOutputStream;
  * cache index parts of directories or files that haven't changed.
  * Example: if we parse include cryptographic hashes or @type information,
  * we'll likely have to do that. */
-public class CreateIndexJson {
+public class CreateIndexJson extends CollecTorMain {
 
   private static File indexJsonFile;
 
@@ -51,14 +52,24 @@ public class CreateIndexJson {
 
   /** Creates indexes of directories containing archived and recent
    * descriptors and write index files to disk. */
-  public static void main(Configuration config)
-      throws ConfigurationException, IOException {
-    indexJsonFile =  new File(config.getPath(Key.IndexPath).toFile(), "index.json");
-    basePath = config.getProperty(Key.InstanceBaseUrl.name());
-    indexedDirectories = new File[] {
-        config.getPath(Key.ArchivePath).toFile(),
-        config.getPath(Key.RecentPath).toFile() };
-    writeIndex(indexDirectories());
+  public CreateIndexJson(Configuration conf) {
+    super(conf);
+  }
+
+  @Override
+  public void run() {
+    try {
+      indexJsonFile =  new File(config.getPath(Key.IndexPath).toFile(),
+          "index.json");
+      basePath = config.getProperty(Key.InstanceBaseUrl.name());
+      indexedDirectories = new File[] {
+          config.getPath(Key.ArchivePath).toFile(),
+          config.getPath(Key.RecentPath).toFile() };
+      writeIndex(indexDirectories());
+    } catch (Exception e) {
+      throw new RuntimeException("Cannot run index creation: " + e.getMessage(),
+          e);
+    }
   }
 
   static class DirectoryNode implements Comparable<DirectoryNode> {
diff --git a/src/main/java/org/torproject/collector/main/LockFile.java b/src/main/java/org/torproject/collector/main/LockFile.java
deleted file mode 100644
index 977b0b9..0000000
--- a/src/main/java/org/torproject/collector/main/LockFile.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/* Copyright 2010--2016 The Tor Project
- * See LICENSE for licensing information */
-
-package org.torproject.collector.main;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-
-public class LockFile {
-
-  private final File lockFile;
-  private final String moduleName;
-  private final Logger logger = LoggerFactory.getLogger(LockFile.class);
-
-  public LockFile(String moduleName) {
-    this("lock", moduleName);
-  }
-
-  public LockFile(String lockFilePath, String moduleName) {
-    this.lockFile = new File(lockFilePath, moduleName);
-    this.moduleName = moduleName;
-  }
-
-  /** Acquires the lock by checking whether a lock file already exists,
-   * and if not, by creating one with the current system time as
-   * content. */
-  public boolean acquireLock() {
-    this.logger.debug("Trying to acquire lock...");
-    try {
-      if (this.lockFile.exists()) {
-        BufferedReader br = new BufferedReader(new FileReader(
-            this.lockFile));
-        long runStarted = Long.parseLong(br.readLine());
-        br.close();
-        if (System.currentTimeMillis() - runStarted < 55L * 60L * 1000L) {
-          throw new RuntimeException("Cannot acquire lock for " + moduleName);
-        }
-      }
-      this.lockFile.getParentFile().mkdirs();
-      BufferedWriter bw = new BufferedWriter(new FileWriter(
-          this.lockFile));
-      bw.append("" + System.currentTimeMillis() + "\n");
-      bw.close();
-      this.logger.debug("Acquired lock.");
-      return true;
-    } catch (IOException e) {
-      throw new RuntimeException("Caught exception while trying to acquire "
-          + "lock for " + moduleName);
-    }
-  }
-
-  /** Releases the lock by deleting the lock file, if present. */
-  public void releaseLock() {
-    this.logger.debug("Releasing lock...");
-    this.lockFile.delete();
-    this.logger.debug("Released lock.");
-  }
-}
-
diff --git a/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java b/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java
index db05bc5..cbab5ea 100644
--- a/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java
+++ b/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java
@@ -6,7 +6,7 @@ package org.torproject.collector.relaydescs;
 import org.torproject.collector.conf.Configuration;
 import org.torproject.collector.conf.ConfigurationException;
 import org.torproject.collector.conf.Key;
-import org.torproject.collector.main.LockFile;
+import org.torproject.collector.cron.CollecTorMain;
 import org.torproject.descriptor.DescriptorParseException;
 import org.torproject.descriptor.DescriptorParser;
 import org.torproject.descriptor.DescriptorSourceFactory;
@@ -38,12 +38,10 @@ import java.util.Stack;
 import java.util.TimeZone;
 import java.util.TreeMap;
 
-public class ArchiveWriter extends Thread {
+public class ArchiveWriter extends CollecTorMain {
 
   private static Logger logger = LoggerFactory.getLogger(ArchiveWriter.class);
 
-  private Configuration config;
-
   private long now = System.currentTimeMillis();
   private String outputDirectory;
   private String rsyncCatString;
@@ -99,64 +97,45 @@ public class ArchiveWriter extends Thread {
 
   private StringBuilder intermediateStats = new StringBuilder();
 
-  private static Path recentPath;
-  private static String recentPathName;
+  private Path recentPath;
+  private String recentPathName;
   private static final String RELAY_DESCRIPTORS = "relay-descriptors";
   private static final String MICRO = "micro";
   private static final String CONSENSUS_MICRODESC = "consensus-microdesc";
   private static final String MICRODESC = "microdesc";
   private static final String MICRODESCS = "microdescs";
 
-  /** Executes the relay-descriptors module using the given
-   * configuration. */
-  public static void main(Configuration config) throws ConfigurationException {
-
-    logger.info("Starting relay-descriptors module of CollecTor.");
-
-    // Use lock file to avoid overlapping runs
-    LockFile lf = new LockFile(config.getPath(Key.LockFilePath).toString(), RELAY_DESCRIPTORS);
-    lf.acquireLock();
-
-    recentPath = config.getPath(Key.RecentPath);
-    recentPathName = recentPath.toString();
-
-    // Import/download relay descriptors from the various sources
-    new ArchiveWriter(config).run();
-
-    new ReferenceChecker(
-        recentPath.toFile(),
-        new File(config.getPath(Key.StatsPath).toFile(), "references"),
-        new File(config.getPath(Key.StatsPath).toFile(), "references-history")).check();
-
-    // Remove lock file
-    lf.releaseLock();
-
-    logger.info("Terminating relay-descriptors module of CollecTor.");
-  }
-
   /** Initialize an archive writer with a given configuration. */
   public ArchiveWriter(Configuration config) throws ConfigurationException {
-    this.config = config;
-    storedServerDescriptorsFile =
-        new File(config.getPath(Key.StatsPath).toFile(), "stored-server-descriptors");
-    storedExtraInfoDescriptorsFile =
-        new File(config.getPath(Key.StatsPath).toFile(), "stored-extra-info-descriptors");
-    storedMicrodescriptorsFile =
-        new File(config.getPath(Key.StatsPath).toFile(), "stored-microdescriptors");
+    super(config);
   }
 
   @Override
   public void run() {
+    logger.info("Starting relay-descriptors module of CollecTor.");
     try {
+      recentPath = config.getPath(Key.RecentPath);
+      recentPathName = recentPath.toString();
+      File statsDir = config.getPath(Key.StatsPath).toFile();
+      storedServerDescriptorsFile =
+          new File(statsDir, "stored-server-descriptors");
+      storedExtraInfoDescriptorsFile =
+          new File(statsDir, "stored-extra-info-descriptors");
+      storedMicrodescriptorsFile =
+          new File(statsDir, "stored-microdescriptors");
+
       startProcessing();
+      new ReferenceChecker(recentPath.toFile(),
+          new File(statsDir, "references"),
+          new File(statsDir, "references-history")).check();
     } catch (ConfigurationException ce) {
       logger.error("Configuration failed: " + ce, ce);
       throw new RuntimeException(ce);
     }
+    logger.info("Terminating relay-descriptors module of CollecTor.");
   }
 
   private void startProcessing() throws ConfigurationException {
-
     File statsDirectory = new File("stats");
     this.outputDirectory = config.getPath(Key.DirectoryArchivesOutputDirectory).toString();
     SimpleDateFormat rsyncCatFormat = new SimpleDateFormat(
diff --git a/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java b/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java
index 7616dd8..1fa2d41 100644
--- a/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java
+++ b/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java
@@ -6,7 +6,7 @@ package org.torproject.collector.torperf;
 import org.torproject.collector.conf.Configuration;
 import org.torproject.collector.conf.ConfigurationException;
 import org.torproject.collector.conf.Key;
-import org.torproject.collector.main.LockFile;
+import org.torproject.collector.cron.CollecTorMain;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,30 +32,11 @@ import java.util.TreeMap;
 /* Download possibly truncated Torperf .data and .extradata files from
  * configured sources, append them to the files we already have, and merge
  * the two files into the .tpf format. */
-public class TorperfDownloader extends Thread {
+public class TorperfDownloader extends CollecTorMain {
   private static Logger logger = LoggerFactory.getLogger(TorperfDownloader.class);
 
-  /** Executes the torperf module using the given configuration. */
-  public static void main(Configuration config) throws ConfigurationException {
-    logger.info("Starting torperf module of CollecTor.");
-
-    // Use lock file to avoid overlapping runs
-    LockFile lf = new LockFile(config.getPath(Key.LockFilePath).toString(), "torperf");
-    lf.acquireLock();
-
-    // Process Torperf files
-    new TorperfDownloader(config).run();
-
-    // Remove lock file
-    lf.releaseLock();
-
-    logger.info("Terminating torperf module of CollecTor.");
-  }
-
-  private Configuration config;
-
   public TorperfDownloader(Configuration config) {
-    this.config = config;
+    super(config);
   }
 
   private File torperfOutputDirectory = null;
@@ -66,12 +47,14 @@ public class TorperfDownloader extends Thread {
 
   @Override
   public void run() {
+    logger.info("Starting torperf module of CollecTor.");
     try {
       startProcessing();
     } catch (ConfigurationException ce) {
       logger.error("Configuration failed: " + ce, ce);
       throw new RuntimeException(ce);
     }
+    logger.info("Terminating torperf module of CollecTor.");
   }
 
   private void startProcessing() throws ConfigurationException {
@@ -309,9 +292,6 @@ public class TorperfDownloader extends Thread {
   private String mergeFiles(File dataFile, File extradataFile,
       String source, int fileSize, String skipUntil) throws IOException,
       ConfigurationException {
-    SortedMap<String, String> config = new TreeMap<String, String>();
-    config.put("SOURCE", source);
-    config.put("FILESIZE", String.valueOf(fileSize));
     if (!dataFile.exists() || !extradataFile.exists()) {
       this.logger.warn("File " + dataFile.getAbsolutePath() + " or "
           + extradataFile.getAbsolutePath() + " is missing.");
@@ -426,11 +406,12 @@ public class TorperfDownloader extends Thread {
       /* Write output line to .tpf file. */
       SortedMap<String, String> keysAndValues =
           new TreeMap<String, String>();
+      keysAndValues.put("SOURCE", source);
+      keysAndValues.put("FILESIZE", String.valueOf(fileSize));
       if (extradata != null) {
         keysAndValues.putAll(extradata);
       }
       keysAndValues.putAll(data);
-      keysAndValues.putAll(config);
       this.logger.debug("Writing " + dataFile.getName() + ":"
           + skippedLineCount++ + ".");
       lineD = brD.readLine();
diff --git a/src/test/java/org/torproject/collector/MainTest.java b/src/test/java/org/torproject/collector/MainTest.java
index 5991f78..b48a0a9 100644
--- a/src/test/java/org/torproject/collector/MainTest.java
+++ b/src/test/java/org/torproject/collector/MainTest.java
@@ -4,16 +4,19 @@ package org.torproject.collector;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.torproject.collector.conf.Key;
+import org.torproject.collector.cron.Scheduler;
 
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.BufferedWriter;
+import java.io.IOException;
 import java.io.File;
 import java.nio.file.Files;
 import java.util.List;
@@ -27,23 +30,54 @@ public class MainTest {
   @Rule
   public TemporaryFolder tmpf = new TemporaryFolder();
 
+  @Test(expected = IOException.class)
+  public void testInitializationConfigException() throws Exception {
+    File conf = new File(Main.CONF_FILE);
+    assertFalse("Please remove " + Main.CONF_FILE + " before running tests!", conf.exists());
+    Main.main(new String[] {"/tmp/"});
+    assertTrue(conf.exists());
+    assertTrue(conf.delete());
+  }
+
+  @Test()
+  public void testInitializationNullArgs() throws Exception {
+    File conf = new File(Main.CONF_FILE);
+    assertFalse("Please remove " + Main.CONF_FILE + " before running tests!", conf.exists());
+    Main.main(null);
+    assertTrue(conf.exists());
+    assertTrue(conf.delete());
+  }
+
+  @Test()
+  public void testInitializationEmptyArgs() throws Exception {
+    File conf = new File(Main.CONF_FILE);
+    assertFalse("Please remove " + Main.CONF_FILE + " before running tests!", conf.exists());
+    Main.main(new String[]{});
+    assertTrue(conf.exists());
+    assertTrue(conf.delete());
+  }
+
+  @Test()
+  public void testInitializationTooManyArgs() throws Exception {
+    File conf = new File(Main.CONF_FILE);
+    assertFalse("Please remove " + Main.CONF_FILE + " before running tests!", conf.exists());
+    Main.main(new String[]{"x", "y"});
+    assertFalse(conf.exists());
+  }
+
   @Test()
   public void testSmoke() throws Exception {
-    System.out.println("\n!!!!   Three ERROR log messages are expected."
-        + "\nOne each from: ExitListDownloader, "
-        + "TorperfDownloader, and CreateIndexJson.\n");
     File conf = tmpf.newFile("test.conf");
     File lockPath = tmpf.newFolder("test.lock");
     assertEquals(0L, conf.length());
-    Main.main(new String[]{"relaydescs", conf.toString()});
+    Main.main(new String[]{conf.toString()});
     assertTrue(4_000L <= conf.length());
-    changeLockFilePath(conf, lockPath);
-    for ( String key : Main.collecTorMains.keySet()) {
-      Main.main(new String[]{key, conf.toString()});
-    }
+    changeFilePathsAndSetActivation(conf, lockPath, "TorperfActivated");
+    Main.main(new String[]{conf.toString()});
+    for(int t=0; t<1_000_000; t++) { }
   }
 
-  private void changeLockFilePath(File f, File l) throws Exception {
+  private void changeFilePathsAndSetActivation(File f, File l, String a) throws Exception {
     List<String> lines = Files.readAllLines(f.toPath());
     BufferedWriter bw = Files.newBufferedWriter(f.toPath());
     File in = tmpf.newFolder();
@@ -57,6 +91,8 @@ public class MainTest {
         line = line.replace(inStr, in.toString() + inStr);
       } else if (line.contains(outStr)) {
         line = line.replace(outStr, out.toString() + outStr);
+      } else if (line.contains(a)) {
+        line = line.replace("false", "true");
       }
       bw.write(line);
       bw.newLine();
@@ -86,5 +122,35 @@ public class MainTest {
       }
     }
   }
+
+  /* Verifies that every collecTorMain class is configured in the
+   * default collector.properties file and the other way around. */
+  @Test()
+  public void testRunConfiguration() throws Exception {
+    Properties props = new Properties();
+    props.load(getClass().getClassLoader().getResourceAsStream(
+        Main.CONF_FILE));
+    String[] runConfigSettings = new String[] {Scheduler.ACTIVATED,
+        Scheduler.PERIODMIN, Scheduler.OFFSETMIN};
+    for (Key key : Main.collecTorMains.keySet()) {
+      for ( String part : runConfigSettings ){
+        String key2 = key.name().replace("Activated", part);
+        assertNotNull("Property '" + key2 + "' not specified in "
+            + Main.CONF_FILE + ".",
+            props.getProperty(key2));
+      }
+    }
+    for (String propName : props.stringPropertyNames()) {
+      for ( String part : runConfigSettings ){
+        if( propName.contains(part) ){
+          String key2 = propName.replace(part, "");
+          assertTrue("CollecTorMain '" + key2
+              + "' not specified in Main.class.",
+              Main.collecTorMains.containsKey(Key.valueOf(key2 + "Activated")));
+        }
+      }
+    }
+  }
+
 }
 
diff --git a/src/test/java/org/torproject/collector/cron/Dummy.java b/src/test/java/org/torproject/collector/cron/Dummy.java
new file mode 100644
index 0000000..0231e69
--- /dev/null
+++ b/src/test/java/org/torproject/collector/cron/Dummy.java
@@ -0,0 +1,15 @@
+package org.torproject.collector.cron;
+
+import org.torproject.collector.conf.Configuration;
+
+public class Dummy extends CollecTorMain {
+
+    public Dummy(Configuration c) {
+      super(c);
+    }
+
+    @Override
+    public void run() {
+
+    }
+}
diff --git a/src/test/java/org/torproject/collector/cron/SchedulerTest.java b/src/test/java/org/torproject/collector/cron/SchedulerTest.java
new file mode 100644
index 0000000..0c4e922
--- /dev/null
+++ b/src/test/java/org/torproject/collector/cron/SchedulerTest.java
@@ -0,0 +1,54 @@
+/* Copyright 2016 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.collector.cron;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.torproject.collector.conf.Key;
+import org.torproject.collector.conf.Configuration;
+import org.torproject.collector.cron.Scheduler;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.lang.reflect.Field;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+public class SchedulerTest {
+
+  private static final String runConfigProperties = "TorperfActivated=true\n"
+      + "TorperfPeriodMinutes=10\nTorperfOffsetMinutes=7\n";
+
+  @Test()
+  public void testSimpleSchedule() throws Exception {
+    Map<Key, Class<? extends CollecTorMain>> ctms = new HashMap<>();
+    Configuration conf = new Configuration();
+    conf.load(new ByteArrayInputStream(runConfigProperties.getBytes()));
+    ctms.put(Key.TorperfActivated, Dummy.class);
+    Field schedulerField = Scheduler.class.getDeclaredField("scheduler");
+    schedulerField.setAccessible(true);
+    ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
+        schedulerField.get(Scheduler.getInstance());
+    assertTrue(stpe.getQueue().isEmpty());
+    Scheduler.getInstance().scheduleModuleRuns(ctms, conf);
+    assertEquals(stpe.getQueue().size(), 1);
+    Scheduler.getInstance().shutdownScheduler();
+    assertTrue(stpe.isShutdown());
+  }
+
+}
+
diff --git a/src/test/resources/junittest.policy b/src/test/resources/junittest.policy
index 208a172..35c30c0 100644
--- a/src/test/resources/junittest.policy
+++ b/src/test/resources/junittest.policy
@@ -5,6 +5,7 @@ grant {
   permission java.util.PropertyPermission "*", "read, write";
   permission java.lang.RuntimePermission "setIO";
   permission java.lang.RuntimePermission "accessDeclaredMembers";
+  permission java.lang.RuntimePermission "modifyThread";
   permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
   permission java.lang.RuntimePermission "shutdownHooks";
   permission java.lang.RuntimePermission "accessClassInPackage.sun.reflect";





More information about the tor-commits mailing list