commit 997d4a4a2e6fca2bc95134537fcfebe930db8de1 Author: iwakeh iwakeh@torproject.org Date: Wed Jul 20 13:23:18 2016 +0200
Implements #19018 'Run CollecTor without crontab'. Added scheduler logic, adapted and added tests, adapted coverage check. --- build.xml | 8 +- src/main/java/org/torproject/collector/Main.java | 54 ++++------- .../bridgedescs/SanitizedBridgesWriter.java | 32 +------ .../torproject/collector/cron/CollecTorMain.java | 28 ++++++ .../org/torproject/collector/cron/Scheduler.java | 102 +++++++++++++++++++++ .../collector/exitlists/ExitListDownloader.java | 27 +----- .../collector/index/CreateIndexJson.java | 29 ++++-- .../org/torproject/collector/main/LockFile.java | 66 ------------- .../collector/relaydescs/ArchiveWriter.java | 61 ++++-------- .../collector/torperf/TorperfDownloader.java | 33 ++----- .../java/org/torproject/collector/MainTest.java | 84 +++++++++++++++-- .../java/org/torproject/collector/cron/Dummy.java | 15 +++ .../torproject/collector/cron/SchedulerTest.java | 54 +++++++++++ src/test/resources/junittest.policy | 1 + 14 files changed, 355 insertions(+), 239 deletions(-)
diff --git a/build.xml b/build.xml index 51471f6..ba50fbf 100644 --- a/build.xml +++ b/build.xml @@ -210,7 +210,7 @@ <classpath refid="cobertura.test.classpath" /> <formatter type="xml" /> <batchtest toDir="${testresult}" > - <fileset dir="${testclasses}" /> + <fileset dir="${testclasses}" includes="**/*Test.class" /> </batchtest> </junit> <cobertura-report format="html" destdir="${coverageresult}" > @@ -218,8 +218,10 @@ <include name="**/*.java" /> </fileset> </cobertura-report> - <cobertura-check branchrate="0" totallinerate="15" totalbranchrate="5" > + <cobertura-check branchrate="0" totallinerate="4" totalbranchrate="1" > <regex pattern="org.torproject.collector.conf.*" branchrate="100" linerate="100"/> + <regex pattern="org.torproject.collector.cron.*" branchrate="66" linerate="73"/> + <regex pattern="org.torproject.collector.Main" branchrate="66" linerate="94"/> </cobertura-check> </target> <target name="test" depends="compile,compile-tests"> @@ -231,7 +233,7 @@ <classpath refid="test.classpath"/> <formatter type="plain" usefile="false"/> <batchtest> - <fileset dir="${testclasses}" /> + <fileset dir="${testclasses}" includes="**/*Test.class" /> </batchtest> </junit> </target> diff --git a/src/main/java/org/torproject/collector/Main.java b/src/main/java/org/torproject/collector/Main.java index 81234c3..97c7a0c 100644 --- a/src/main/java/org/torproject/collector/Main.java +++ b/src/main/java/org/torproject/collector/Main.java @@ -5,6 +5,9 @@ package org.torproject.collector;
import org.torproject.collector.bridgedescs.SanitizedBridgesWriter; import org.torproject.collector.conf.Configuration; +import org.torproject.collector.conf.Key; +import org.torproject.collector.cron.CollecTorMain; +import org.torproject.collector.cron.Scheduler; import org.torproject.collector.exitlists.ExitListDownloader; import org.torproject.collector.index.CreateIndexJson; import org.torproject.collector.relaydescs.ArchiveWriter; @@ -37,34 +40,31 @@ public class Main { /** All possible main classes. * If a new CollecTorMain class is available, just add it to this map. */ - static final Map<String, Class> collecTorMains = new HashMap<>(); + static final Map<Key, Class<? extends CollecTorMain>> collecTorMains = new HashMap<>();
static { // add a new main class here - collecTorMains.put("bridgedescs", SanitizedBridgesWriter.class); - collecTorMains.put("exitlists", ExitListDownloader.class); - collecTorMains.put("updateindex", CreateIndexJson.class); - collecTorMains.put("relaydescs", ArchiveWriter.class); - collecTorMains.put("torperf", TorperfDownloader.class); + collecTorMains.put(Key.BridgedescsActivated, SanitizedBridgesWriter.class); + collecTorMains.put(Key.ExitlistsActivated, ExitListDownloader.class); + collecTorMains.put(Key.UpdateindexActivated, CreateIndexJson.class); + collecTorMains.put(Key.RelaydescsActivated, ArchiveWriter.class); + collecTorMains.put(Key.TorperfActivated, TorperfDownloader.class); }
- private static final String modules = collecTorMains.keySet().toString() - .replace("[", "").replace("]", "").replaceAll(", ", "|"); - private static Configuration conf = new Configuration();
/** - * One argument is necessary. + * At most one argument. * See class description {@link Main}. */ public static void main(String[] args) throws Exception { File confFile = null; - if (null == args || args.length < 1 || args.length > 2) { - printUsage("CollecTor needs one or two arguments."); - return; - } else if (args.length == 1) { + if (args == null || args.length == 0) { confFile = new File(CONF_FILE); - } else if (args.length == 2) { - confFile = new File(args[1]); + } else if (args.length == 1) { + confFile = new File(args[0]); + } else { + printUsage("CollecTor takes at most one argument."); + return; } if (!confFile.exists() || confFile.length() < 1L) { writeDefaultConfig(confFile); @@ -72,12 +72,12 @@ public class Main { } else { readConfigurationFrom(confFile); } - invokeGivenMain(args[0]); + Scheduler.getInstance().scheduleModuleRuns(collecTorMains, conf); }
private static void printUsage(String msg) { final String usage = "Usage:\njava -jar collector.jar " - + "<" + modules + "> [path/to/configFile]"; + + "[path/to/configFile]"; System.out.println(msg + "\n" + usage); }
@@ -105,23 +105,5 @@ public class Main { } }
- private static void invokeGivenMain(String mainId) { - Class clazz = collecTorMains.get(mainId); - if (null == clazz) { - printUsage("Unknown argument: " + mainId); - } - invokeMainOnClass(clazz); - } - - private static void invokeMainOnClass(Class clazz) { - try { - clazz.getMethod("main", new Class[] { Configuration.class }) - .invoke(null, (Object) conf); - } catch (NoSuchMethodException | IllegalAccessException - | InvocationTargetException e) { - log.error("Cannot invoke 'main' method on " - + clazz.getName() + ". " + e, e); - } - } }
diff --git a/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java b/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java index 121f8ca..5c33566 100644 --- a/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java +++ b/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java @@ -6,7 +6,7 @@ package org.torproject.collector.bridgedescs; import org.torproject.collector.conf.Configuration; import org.torproject.collector.conf.ConfigurationException; import org.torproject.collector.conf.Key; -import org.torproject.collector.main.LockFile; +import org.torproject.collector.cron.CollecTorMain;
import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Base64; @@ -49,36 +49,12 @@ import java.util.TreeMap; * by the bridge to advertise their capabilities), and extra-info * descriptors (published by the bridge, mainly for statistical analysis).</p> */ -public class SanitizedBridgesWriter extends Thread { +public class SanitizedBridgesWriter extends CollecTorMain {
private static Logger logger = LoggerFactory.getLogger(SanitizedBridgesWriter.class);
- /** Executes the bridge-descriptors module using the given - * configuration. */ - public static void main(Configuration config) throws ConfigurationException { - - logger.info("Starting bridge-descriptors module of CollecTor."); - - // Use lock file to avoid overlapping runs - LockFile lf = new LockFile(config.getPath(Key.LockFilePath).toString(), "bridge-descriptors"); - lf.acquireLock(); - - // Sanitize bridge descriptors - new SanitizedBridgesWriter(config).run(); - - // Remove lock file - lf.releaseLock(); - - logger.info("Terminating bridge-descriptors module of CollecTor."); - } - - private Configuration config; - - /** - * Initializes this class. - */ public SanitizedBridgesWriter(Configuration config) { - this.config = config; + super(config); }
private String rsyncCatString; @@ -106,12 +82,14 @@ public class SanitizedBridgesWriter extends Thread {
@Override public void run() { + logger.info("Starting bridge-descriptors module of CollecTor."); try { startProcessing(); } catch (ConfigurationException ce) { logger.error("Configuration failed: " + ce, ce); throw new RuntimeException(ce); } + logger.info("Terminating bridge-descriptors module of CollecTor."); }
private void startProcessing() throws ConfigurationException { diff --git a/src/main/java/org/torproject/collector/cron/CollecTorMain.java b/src/main/java/org/torproject/collector/cron/CollecTorMain.java new file mode 100644 index 0000000..7a00e68 --- /dev/null +++ b/src/main/java/org/torproject/collector/cron/CollecTorMain.java @@ -0,0 +1,28 @@ +/* Copyright 2016 The Tor Project + * See LICENSE for licensing information */ + +package org.torproject.collector.cron; + +import org.torproject.collector.conf.Configuration; +import org.torproject.collector.conf.Key; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.Calendar; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public abstract class CollecTorMain implements Runnable { + + protected Configuration config; + + public CollecTorMain( Configuration conf) { + this.config = conf; + } + +} + diff --git a/src/main/java/org/torproject/collector/cron/Scheduler.java b/src/main/java/org/torproject/collector/cron/Scheduler.java new file mode 100644 index 0000000..e4f2aa3 --- /dev/null +++ b/src/main/java/org/torproject/collector/cron/Scheduler.java @@ -0,0 +1,102 @@ +/* Copyright 2016 The Tor Project + * See LICENSE for licensing information */ + +package org.torproject.collector.cron; + +import org.torproject.collector.conf.Configuration; +import org.torproject.collector.conf.ConfigurationException; +import org.torproject.collector.conf.Key; +import org.torproject.collector.cron.CollecTorMain; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.Calendar; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Scheduler that starts the modules configured in collector.properties. + */ +public class Scheduler { + + public static final String ACTIVATED = "Activated"; + public static final String PERIODMIN = "PeriodMinutes"; + public static final String OFFSETMIN = "OffsetMinutes"; + + private final Logger log = LoggerFactory.getLogger(Scheduler.class); + + private final ScheduledExecutorService scheduler = + Executors.newScheduledThreadPool(1); + + private static Scheduler instance = new Scheduler(); + + private Scheduler(){} + + public static Scheduler getInstance() { + return instance; + } + + /** + * Schedule all classes given according to the parameters in the + * the configuration. + */ + public void scheduleModuleRuns(Map<Key, + Class<? extends CollecTorMain>> collecTorMains, Configuration conf) { + for ( Map.Entry<Key, Class<? extends CollecTorMain>> ctmEntry + : collecTorMains.entrySet() ) { + try { + if ( conf.getBool(ctmEntry.getKey()) ) { + String prefix = ctmEntry.getKey().name().replace(ACTIVATED, ""); + CollecTorMain ctm = ctmEntry.getValue() + .getConstructor(Configuration.class).newInstance(conf); + scheduleExecutions(ctm, + conf.getInt(Key.valueOf(prefix + OFFSETMIN)), + conf.getInt(Key.valueOf(prefix + PERIODMIN))); + } + } catch (ConfigurationException | IllegalAccessException + | InstantiationException | InvocationTargetException + | NoSuchMethodException | RuntimeException ex) { + log.error("Cannot schedule " + ctmEntry.getValue().getName() + + ". Reason: " + ex.getMessage(), ex); + shutdownScheduler(); + throw new RuntimeException("Halted scheduling.", ex); + } + } + } + + private void scheduleExecutions(CollecTorMain ctm, int offset, int period) { + this.log.info("Periodic updater started for " + ctm.getClass().getName() + + "; offset=" + offset + ", period=" + period + "."); + int currentMinute = Calendar.getInstance().get(Calendar.MINUTE); + int initialDelay = (60 - currentMinute + offset) % 60; + + /* Run after initialDelay delay and then every period min. */ + this.log.info("Periodic updater will start every " + period + "th min " + + "at minute " + ((currentMinute + initialDelay) % 60) + "."); + this.scheduler.scheduleAtFixedRate(ctm, initialDelay, 60, + TimeUnit.MINUTES); + } + + /** + * Try to shutdown smoothly, i.e., wait for running tasks to terminate. + */ + public void shutdownScheduler() { + try { + scheduler.shutdown(); + scheduler.awaitTermination(20L, java.util.concurrent.TimeUnit.MINUTES); + log.info("Shutdown of all scheduled tasks completed successfully."); + } catch ( InterruptedException ie ) { + List<Runnable> notTerminated = scheduler.shutdownNow(); + log.error("Regular shutdown failed for: " + notTerminated); + if ( !notTerminated.isEmpty() ) { + log.error("Forced shutdown failed for: " + notTerminated); + } + } + } +} + diff --git a/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java b/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java index 79fe19f..e6720cd 100644 --- a/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java +++ b/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java @@ -6,7 +6,7 @@ package org.torproject.collector.exitlists; import org.torproject.collector.conf.Configuration; import org.torproject.collector.conf.ConfigurationException; import org.torproject.collector.conf.Key; -import org.torproject.collector.main.LockFile; +import org.torproject.collector.cron.CollecTorMain; import org.torproject.descriptor.Descriptor; import org.torproject.descriptor.DescriptorParseException; import org.torproject.descriptor.DescriptorParser; @@ -32,42 +32,25 @@ import java.util.Stack; import java.util.TimeZone; import java.util.TreeSet;
-public class ExitListDownloader extends Thread { +public class ExitListDownloader extends CollecTorMain {
private static Logger logger = LoggerFactory.getLogger(ExitListDownloader.class);
- /** Execute the exit-lists module using the given configuration. */ - public static void main(Configuration config) throws ConfigurationException { - logger.info("Starting exit-lists module of CollecTor."); - - // Use lock file to avoid overlapping runs - LockFile lf = new LockFile(config.getPath(Key.LockFilePath).toString(), "exit-lists"); - lf.acquireLock(); - - // Download exit list and store it to disk - new ExitListDownloader(config).run(); - - // Remove lock file - lf.releaseLock(); - - logger.info("Terminating exit-lists module of CollecTor."); - } - - private Configuration config; - /** Instanciate the exit-lists module using the given configuration. */ public ExitListDownloader(Configuration config) { - this.config = config; + super(config); }
@Override public void run() { + logger.info("Starting exit-lists module of CollecTor."); try { startProcessing(); } catch (ConfigurationException ce) { logger.error("Configuration failed: " + ce, ce); throw new RuntimeException(ce); } + logger.info("Terminating exit-lists module of CollecTor."); }
private void startProcessing() throws ConfigurationException { diff --git a/src/main/java/org/torproject/collector/index/CreateIndexJson.java b/src/main/java/org/torproject/collector/index/CreateIndexJson.java index 7da2b5e..80f183c 100644 --- a/src/main/java/org/torproject/collector/index/CreateIndexJson.java +++ b/src/main/java/org/torproject/collector/index/CreateIndexJson.java @@ -6,6 +6,7 @@ package org.torproject.collector.index; import org.torproject.collector.conf.Configuration; import org.torproject.collector.conf.ConfigurationException; import org.torproject.collector.conf.Key; +import org.torproject.collector.cron.CollecTorMain;
import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -35,7 +36,7 @@ import java.util.zip.GZIPOutputStream; * cache index parts of directories or files that haven't changed. * Example: if we parse include cryptographic hashes or @type information, * we'll likely have to do that. */ -public class CreateIndexJson { +public class CreateIndexJson extends CollecTorMain {
private static File indexJsonFile;
@@ -51,14 +52,24 @@ public class CreateIndexJson {
/** Creates indexes of directories containing archived and recent * descriptors and write index files to disk. */ - public static void main(Configuration config) - throws ConfigurationException, IOException { - indexJsonFile = new File(config.getPath(Key.IndexPath).toFile(), "index.json"); - basePath = config.getProperty(Key.InstanceBaseUrl.name()); - indexedDirectories = new File[] { - config.getPath(Key.ArchivePath).toFile(), - config.getPath(Key.RecentPath).toFile() }; - writeIndex(indexDirectories()); + public CreateIndexJson(Configuration conf) { + super(conf); + } + + @Override + public void run() { + try { + indexJsonFile = new File(config.getPath(Key.IndexPath).toFile(), + "index.json"); + basePath = config.getProperty(Key.InstanceBaseUrl.name()); + indexedDirectories = new File[] { + config.getPath(Key.ArchivePath).toFile(), + config.getPath(Key.RecentPath).toFile() }; + writeIndex(indexDirectories()); + } catch (Exception e) { + throw new RuntimeException("Cannot run index creation: " + e.getMessage(), + e); + } }
static class DirectoryNode implements Comparable<DirectoryNode> { diff --git a/src/main/java/org/torproject/collector/main/LockFile.java b/src/main/java/org/torproject/collector/main/LockFile.java deleted file mode 100644 index 977b0b9..0000000 --- a/src/main/java/org/torproject/collector/main/LockFile.java +++ /dev/null @@ -1,66 +0,0 @@ -/* Copyright 2010--2016 The Tor Project - * See LICENSE for licensing information */ - -package org.torproject.collector.main; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; - -public class LockFile { - - private final File lockFile; - private final String moduleName; - private final Logger logger = LoggerFactory.getLogger(LockFile.class); - - public LockFile(String moduleName) { - this("lock", moduleName); - } - - public LockFile(String lockFilePath, String moduleName) { - this.lockFile = new File(lockFilePath, moduleName); - this.moduleName = moduleName; - } - - /** Acquires the lock by checking whether a lock file already exists, - * and if not, by creating one with the current system time as - * content. */ - public boolean acquireLock() { - this.logger.debug("Trying to acquire lock..."); - try { - if (this.lockFile.exists()) { - BufferedReader br = new BufferedReader(new FileReader( - this.lockFile)); - long runStarted = Long.parseLong(br.readLine()); - br.close(); - if (System.currentTimeMillis() - runStarted < 55L * 60L * 1000L) { - throw new RuntimeException("Cannot acquire lock for " + moduleName); - } - } - this.lockFile.getParentFile().mkdirs(); - BufferedWriter bw = new BufferedWriter(new FileWriter( - this.lockFile)); - bw.append("" + System.currentTimeMillis() + "\n"); - bw.close(); - this.logger.debug("Acquired lock."); - return true; - } catch (IOException e) { - throw new RuntimeException("Caught exception while trying to acquire " - + "lock for " + moduleName); - } - } - - /** Releases the lock by deleting the lock file, if present. */ - public void releaseLock() { - this.logger.debug("Releasing lock..."); - this.lockFile.delete(); - this.logger.debug("Released lock."); - } -} - diff --git a/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java b/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java index db05bc5..cbab5ea 100644 --- a/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java +++ b/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java @@ -6,7 +6,7 @@ package org.torproject.collector.relaydescs; import org.torproject.collector.conf.Configuration; import org.torproject.collector.conf.ConfigurationException; import org.torproject.collector.conf.Key; -import org.torproject.collector.main.LockFile; +import org.torproject.collector.cron.CollecTorMain; import org.torproject.descriptor.DescriptorParseException; import org.torproject.descriptor.DescriptorParser; import org.torproject.descriptor.DescriptorSourceFactory; @@ -38,12 +38,10 @@ import java.util.Stack; import java.util.TimeZone; import java.util.TreeMap;
-public class ArchiveWriter extends Thread { +public class ArchiveWriter extends CollecTorMain {
private static Logger logger = LoggerFactory.getLogger(ArchiveWriter.class);
- private Configuration config; - private long now = System.currentTimeMillis(); private String outputDirectory; private String rsyncCatString; @@ -99,64 +97,45 @@ public class ArchiveWriter extends Thread {
private StringBuilder intermediateStats = new StringBuilder();
- private static Path recentPath; - private static String recentPathName; + private Path recentPath; + private String recentPathName; private static final String RELAY_DESCRIPTORS = "relay-descriptors"; private static final String MICRO = "micro"; private static final String CONSENSUS_MICRODESC = "consensus-microdesc"; private static final String MICRODESC = "microdesc"; private static final String MICRODESCS = "microdescs";
- /** Executes the relay-descriptors module using the given - * configuration. */ - public static void main(Configuration config) throws ConfigurationException { - - logger.info("Starting relay-descriptors module of CollecTor."); - - // Use lock file to avoid overlapping runs - LockFile lf = new LockFile(config.getPath(Key.LockFilePath).toString(), RELAY_DESCRIPTORS); - lf.acquireLock(); - - recentPath = config.getPath(Key.RecentPath); - recentPathName = recentPath.toString(); - - // Import/download relay descriptors from the various sources - new ArchiveWriter(config).run(); - - new ReferenceChecker( - recentPath.toFile(), - new File(config.getPath(Key.StatsPath).toFile(), "references"), - new File(config.getPath(Key.StatsPath).toFile(), "references-history")).check(); - - // Remove lock file - lf.releaseLock(); - - logger.info("Terminating relay-descriptors module of CollecTor."); - } - /** Initialize an archive writer with a given configuration. */ public ArchiveWriter(Configuration config) throws ConfigurationException { - this.config = config; - storedServerDescriptorsFile = - new File(config.getPath(Key.StatsPath).toFile(), "stored-server-descriptors"); - storedExtraInfoDescriptorsFile = - new File(config.getPath(Key.StatsPath).toFile(), "stored-extra-info-descriptors"); - storedMicrodescriptorsFile = - new File(config.getPath(Key.StatsPath).toFile(), "stored-microdescriptors"); + super(config); }
@Override public void run() { + logger.info("Starting relay-descriptors module of CollecTor."); try { + recentPath = config.getPath(Key.RecentPath); + recentPathName = recentPath.toString(); + File statsDir = config.getPath(Key.StatsPath).toFile(); + storedServerDescriptorsFile = + new File(statsDir, "stored-server-descriptors"); + storedExtraInfoDescriptorsFile = + new File(statsDir, "stored-extra-info-descriptors"); + storedMicrodescriptorsFile = + new File(statsDir, "stored-microdescriptors"); + startProcessing(); + new ReferenceChecker(recentPath.toFile(), + new File(statsDir, "references"), + new File(statsDir, "references-history")).check(); } catch (ConfigurationException ce) { logger.error("Configuration failed: " + ce, ce); throw new RuntimeException(ce); } + logger.info("Terminating relay-descriptors module of CollecTor."); }
private void startProcessing() throws ConfigurationException { - File statsDirectory = new File("stats"); this.outputDirectory = config.getPath(Key.DirectoryArchivesOutputDirectory).toString(); SimpleDateFormat rsyncCatFormat = new SimpleDateFormat( diff --git a/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java b/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java index 7616dd8..1fa2d41 100644 --- a/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java +++ b/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java @@ -6,7 +6,7 @@ package org.torproject.collector.torperf; import org.torproject.collector.conf.Configuration; import org.torproject.collector.conf.ConfigurationException; import org.torproject.collector.conf.Key; -import org.torproject.collector.main.LockFile; +import org.torproject.collector.cron.CollecTorMain;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,30 +32,11 @@ import java.util.TreeMap; /* Download possibly truncated Torperf .data and .extradata files from * configured sources, append them to the files we already have, and merge * the two files into the .tpf format. */ -public class TorperfDownloader extends Thread { +public class TorperfDownloader extends CollecTorMain { private static Logger logger = LoggerFactory.getLogger(TorperfDownloader.class);
- /** Executes the torperf module using the given configuration. */ - public static void main(Configuration config) throws ConfigurationException { - logger.info("Starting torperf module of CollecTor."); - - // Use lock file to avoid overlapping runs - LockFile lf = new LockFile(config.getPath(Key.LockFilePath).toString(), "torperf"); - lf.acquireLock(); - - // Process Torperf files - new TorperfDownloader(config).run(); - - // Remove lock file - lf.releaseLock(); - - logger.info("Terminating torperf module of CollecTor."); - } - - private Configuration config; - public TorperfDownloader(Configuration config) { - this.config = config; + super(config); }
private File torperfOutputDirectory = null; @@ -66,12 +47,14 @@ public class TorperfDownloader extends Thread {
@Override public void run() { + logger.info("Starting torperf module of CollecTor."); try { startProcessing(); } catch (ConfigurationException ce) { logger.error("Configuration failed: " + ce, ce); throw new RuntimeException(ce); } + logger.info("Terminating torperf module of CollecTor."); }
private void startProcessing() throws ConfigurationException { @@ -309,9 +292,6 @@ public class TorperfDownloader extends Thread { private String mergeFiles(File dataFile, File extradataFile, String source, int fileSize, String skipUntil) throws IOException, ConfigurationException { - SortedMap<String, String> config = new TreeMap<String, String>(); - config.put("SOURCE", source); - config.put("FILESIZE", String.valueOf(fileSize)); if (!dataFile.exists() || !extradataFile.exists()) { this.logger.warn("File " + dataFile.getAbsolutePath() + " or " + extradataFile.getAbsolutePath() + " is missing."); @@ -426,11 +406,12 @@ public class TorperfDownloader extends Thread { /* Write output line to .tpf file. */ SortedMap<String, String> keysAndValues = new TreeMap<String, String>(); + keysAndValues.put("SOURCE", source); + keysAndValues.put("FILESIZE", String.valueOf(fileSize)); if (extradata != null) { keysAndValues.putAll(extradata); } keysAndValues.putAll(data); - keysAndValues.putAll(config); this.logger.debug("Writing " + dataFile.getName() + ":" + skippedLineCount++ + "."); lineD = brD.readLine(); diff --git a/src/test/java/org/torproject/collector/MainTest.java b/src/test/java/org/torproject/collector/MainTest.java index 5991f78..b48a0a9 100644 --- a/src/test/java/org/torproject/collector/MainTest.java +++ b/src/test/java/org/torproject/collector/MainTest.java @@ -4,16 +4,19 @@ package org.torproject.collector;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail;
import org.torproject.collector.conf.Key; +import org.torproject.collector.cron.Scheduler;
import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder;
import java.io.BufferedWriter; +import java.io.IOException; import java.io.File; import java.nio.file.Files; import java.util.List; @@ -27,23 +30,54 @@ public class MainTest { @Rule public TemporaryFolder tmpf = new TemporaryFolder();
+ @Test(expected = IOException.class) + public void testInitializationConfigException() throws Exception { + File conf = new File(Main.CONF_FILE); + assertFalse("Please remove " + Main.CONF_FILE + " before running tests!", conf.exists()); + Main.main(new String[] {"/tmp/"}); + assertTrue(conf.exists()); + assertTrue(conf.delete()); + } + + @Test() + public void testInitializationNullArgs() throws Exception { + File conf = new File(Main.CONF_FILE); + assertFalse("Please remove " + Main.CONF_FILE + " before running tests!", conf.exists()); + Main.main(null); + assertTrue(conf.exists()); + assertTrue(conf.delete()); + } + + @Test() + public void testInitializationEmptyArgs() throws Exception { + File conf = new File(Main.CONF_FILE); + assertFalse("Please remove " + Main.CONF_FILE + " before running tests!", conf.exists()); + Main.main(new String[]{}); + assertTrue(conf.exists()); + assertTrue(conf.delete()); + } + + @Test() + public void testInitializationTooManyArgs() throws Exception { + File conf = new File(Main.CONF_FILE); + assertFalse("Please remove " + Main.CONF_FILE + " before running tests!", conf.exists()); + Main.main(new String[]{"x", "y"}); + assertFalse(conf.exists()); + } + @Test() public void testSmoke() throws Exception { - System.out.println("\n!!!! Three ERROR log messages are expected." - + "\nOne each from: ExitListDownloader, " - + "TorperfDownloader, and CreateIndexJson.\n"); File conf = tmpf.newFile("test.conf"); File lockPath = tmpf.newFolder("test.lock"); assertEquals(0L, conf.length()); - Main.main(new String[]{"relaydescs", conf.toString()}); + Main.main(new String[]{conf.toString()}); assertTrue(4_000L <= conf.length()); - changeLockFilePath(conf, lockPath); - for ( String key : Main.collecTorMains.keySet()) { - Main.main(new String[]{key, conf.toString()}); - } + changeFilePathsAndSetActivation(conf, lockPath, "TorperfActivated"); + Main.main(new String[]{conf.toString()}); + for(int t=0; t<1_000_000; t++) { } }
- private void changeLockFilePath(File f, File l) throws Exception { + private void changeFilePathsAndSetActivation(File f, File l, String a) throws Exception { List<String> lines = Files.readAllLines(f.toPath()); BufferedWriter bw = Files.newBufferedWriter(f.toPath()); File in = tmpf.newFolder(); @@ -57,6 +91,8 @@ public class MainTest { line = line.replace(inStr, in.toString() + inStr); } else if (line.contains(outStr)) { line = line.replace(outStr, out.toString() + outStr); + } else if (line.contains(a)) { + line = line.replace("false", "true"); } bw.write(line); bw.newLine(); @@ -86,5 +122,35 @@ public class MainTest { } } } + + /* Verifies that every collecTorMain class is configured in the + * default collector.properties file and the other way around. */ + @Test() + public void testRunConfiguration() throws Exception { + Properties props = new Properties(); + props.load(getClass().getClassLoader().getResourceAsStream( + Main.CONF_FILE)); + String[] runConfigSettings = new String[] {Scheduler.ACTIVATED, + Scheduler.PERIODMIN, Scheduler.OFFSETMIN}; + for (Key key : Main.collecTorMains.keySet()) { + for ( String part : runConfigSettings ){ + String key2 = key.name().replace("Activated", part); + assertNotNull("Property '" + key2 + "' not specified in " + + Main.CONF_FILE + ".", + props.getProperty(key2)); + } + } + for (String propName : props.stringPropertyNames()) { + for ( String part : runConfigSettings ){ + if( propName.contains(part) ){ + String key2 = propName.replace(part, ""); + assertTrue("CollecTorMain '" + key2 + + "' not specified in Main.class.", + Main.collecTorMains.containsKey(Key.valueOf(key2 + "Activated"))); + } + } + } + } + }
diff --git a/src/test/java/org/torproject/collector/cron/Dummy.java b/src/test/java/org/torproject/collector/cron/Dummy.java new file mode 100644 index 0000000..0231e69 --- /dev/null +++ b/src/test/java/org/torproject/collector/cron/Dummy.java @@ -0,0 +1,15 @@ +package org.torproject.collector.cron; + +import org.torproject.collector.conf.Configuration; + +public class Dummy extends CollecTorMain { + + public Dummy(Configuration c) { + super(c); + } + + @Override + public void run() { + + } +} diff --git a/src/test/java/org/torproject/collector/cron/SchedulerTest.java b/src/test/java/org/torproject/collector/cron/SchedulerTest.java new file mode 100644 index 0000000..0c4e922 --- /dev/null +++ b/src/test/java/org/torproject/collector/cron/SchedulerTest.java @@ -0,0 +1,54 @@ +/* Copyright 2016 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.collector.cron; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.torproject.collector.conf.Key; +import org.torproject.collector.conf.Configuration; +import org.torproject.collector.cron.Scheduler; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.lang.reflect.Field; +import java.nio.file.Files; +import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +public class SchedulerTest { + + private static final String runConfigProperties = "TorperfActivated=true\n" + + "TorperfPeriodMinutes=10\nTorperfOffsetMinutes=7\n"; + + @Test() + public void testSimpleSchedule() throws Exception { + Map<Key, Class<? extends CollecTorMain>> ctms = new HashMap<>(); + Configuration conf = new Configuration(); + conf.load(new ByteArrayInputStream(runConfigProperties.getBytes())); + ctms.put(Key.TorperfActivated, Dummy.class); + Field schedulerField = Scheduler.class.getDeclaredField("scheduler"); + schedulerField.setAccessible(true); + ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) + schedulerField.get(Scheduler.getInstance()); + assertTrue(stpe.getQueue().isEmpty()); + Scheduler.getInstance().scheduleModuleRuns(ctms, conf); + assertEquals(stpe.getQueue().size(), 1); + Scheduler.getInstance().shutdownScheduler(); + assertTrue(stpe.isShutdown()); + } + +} + diff --git a/src/test/resources/junittest.policy b/src/test/resources/junittest.policy index 208a172..35c30c0 100644 --- a/src/test/resources/junittest.policy +++ b/src/test/resources/junittest.policy @@ -5,6 +5,7 @@ grant { permission java.util.PropertyPermission "*", "read, write"; permission java.lang.RuntimePermission "setIO"; permission java.lang.RuntimePermission "accessDeclaredMembers"; + permission java.lang.RuntimePermission "modifyThread"; permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; permission java.lang.RuntimePermission "shutdownHooks"; permission java.lang.RuntimePermission "accessClassInPackage.sun.reflect";
tor-commits@lists.torproject.org