commit 6c43548bfb0ffe92ca6f5fd67075ff506c546519 Author: iwakeh iwakeh@torproject.org Date: Fri Jul 22 14:35:07 2016 +0200
Prevent updateindex NPE, b/c of dubious files that are regcognized as directories. Adjusted coverage rate. Modified scheduler test. Reduced repeated try-catch in run methods. Log cron package to all module logs. --- build.xml | 6 +-- .../bridgedescs/SanitizedBridgesWriter.java | 18 +++---- .../torproject/collector/cron/CollecTorMain.java | 26 ++++++++++ .../org/torproject/collector/cron/Scheduler.java | 36 ++++++++++---- .../collector/exitlists/ExitListDownloader.java | 16 ++---- .../collector/index/CreateIndexJson.java | 57 +++++++++++++++------- .../collector/relaydescs/ArchiveWriter.java | 43 ++++++++-------- .../collector/torperf/TorperfDownloader.java | 14 ++---- src/main/resources/logback.xml | 12 ++++- .../java/org/torproject/collector/cron/Dummy.java | 18 ++++--- .../torproject/collector/cron/SchedulerTest.java | 18 +++++-- 11 files changed, 167 insertions(+), 97 deletions(-)
diff --git a/build.xml b/build.xml index eccae8b..643d94c 100644 --- a/build.xml +++ b/build.xml @@ -227,10 +227,10 @@ <include name="**/*.java" /> </fileset> </cobertura-report> - <cobertura-check branchrate="0" totallinerate="4" totalbranchrate="1" > + <cobertura-check totallinerate="5" totalbranchrate="1" > <regex pattern="org.torproject.collector.conf.*" branchrate="100" linerate="100"/> - <regex pattern="org.torproject.collector.cron.*" branchrate="66" linerate="73"/> - <regex pattern="org.torproject.collector.Main" branchrate="66" linerate="94"/> + <regex pattern="org.torproject.collector.cron" branchrate="66" linerate="80" /> + <regex pattern="org.torproject.collector.Main" branchrate="66" linerate="94" /> </cobertura-check> </target> <target name="test" depends="compile,compile-tests"> diff --git a/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java b/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java index 5c33566..e00f70b 100644 --- a/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java +++ b/src/main/java/org/torproject/collector/bridgedescs/SanitizedBridgesWriter.java @@ -81,18 +81,12 @@ public class SanitizedBridgesWriter extends CollecTorMain { private SecureRandom secureRandom;
@Override - public void run() { - logger.info("Starting bridge-descriptors module of CollecTor."); - try { - startProcessing(); - } catch (ConfigurationException ce) { - logger.error("Configuration failed: " + ce, ce); - throw new RuntimeException(ce); - } - logger.info("Terminating bridge-descriptors module of CollecTor."); + public String module() { + return "bridgedescs"; }
- private void startProcessing() throws ConfigurationException { + @Override + protected void startProcessing() throws ConfigurationException {
File bridgeDirectoriesDirectory = config.getPath(Key.BridgeSnapshotsDirectory).toFile(); @@ -102,7 +96,9 @@ public class SanitizedBridgesWriter extends CollecTorMain {
if (bridgeDirectoriesDirectory == null || sanitizedBridgesDirectory == null || statsDirectory == null) { - throw new IllegalArgumentException(); + throw new ConfigurationException("BridgeSnapshotsDirectory, " + + "SanitizedBridgesWriteDirectory, StatsPath should be set. " + + "Please, edit the 'collector.properties' file."); }
/* Memorize argument values. */ diff --git a/src/main/java/org/torproject/collector/cron/CollecTorMain.java b/src/main/java/org/torproject/collector/cron/CollecTorMain.java index 7a00e68..21d8948 100644 --- a/src/main/java/org/torproject/collector/cron/CollecTorMain.java +++ b/src/main/java/org/torproject/collector/cron/CollecTorMain.java @@ -4,6 +4,7 @@ package org.torproject.collector.cron;
import org.torproject.collector.conf.Configuration; +import org.torproject.collector.conf.ConfigurationException; import org.torproject.collector.conf.Key;
import org.slf4j.Logger; @@ -18,11 +19,36 @@ import java.util.concurrent.TimeUnit;
public abstract class CollecTorMain implements Runnable {
+ private static Logger log = LoggerFactory.getLogger(CollecTorMain.class); + protected Configuration config;
public CollecTorMain( Configuration conf) { this.config = conf; }
+ /** + * Log errors preventing successful completion of the module. + */ + @Override + public final void run() { + log.info("Starting {} module of CollecTor.", module()); + try { + startProcessing(); + } catch (ConfigurationException | RuntimeException ce) { + log.error("The {} module failed: {}", module(), ce.getMessage(), ce); + } + log.info("Terminating {} module of CollecTor.", module()); + } + + /** + * Module specific code goes here. + */ + protected abstract void startProcessing() throws ConfigurationException; + + /** + * Returns the module name for logging purposes. + */ + public abstract String module(); }
diff --git a/src/main/java/org/torproject/collector/cron/Scheduler.java b/src/main/java/org/torproject/collector/cron/Scheduler.java index e4f2aa3..6bc90ca 100644 --- a/src/main/java/org/torproject/collector/cron/Scheduler.java +++ b/src/main/java/org/torproject/collector/cron/Scheduler.java @@ -16,22 +16,28 @@ import java.util.Calendar; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit;
/** * Scheduler that starts the modules configured in collector.properties. */ -public class Scheduler { +public class Scheduler implements ThreadFactory {
public static final String ACTIVATED = "Activated"; public static final String PERIODMIN = "PeriodMinutes"; public static final String OFFSETMIN = "OffsetMinutes";
- private final Logger log = LoggerFactory.getLogger(Scheduler.class); + private static final Logger log = LoggerFactory.getLogger(Scheduler.class); + + private final ThreadFactory threads = Executors.defaultThreadFactory(); + + private int currentThreadNo = 0;
private final ScheduledExecutorService scheduler = - Executors.newScheduledThreadPool(1); + Executors.newScheduledThreadPool(10, this);
private static Scheduler instance = new Scheduler();
@@ -60,11 +66,10 @@ public class Scheduler { } } catch (ConfigurationException | IllegalAccessException | InstantiationException | InvocationTargetException - | NoSuchMethodException | RuntimeException ex) { + | NoSuchMethodException | RejectedExecutionException + | NullPointerException ex) { log.error("Cannot schedule " + ctmEntry.getValue().getName() + ". Reason: " + ex.getMessage(), ex); - shutdownScheduler(); - throw new RuntimeException("Halted scheduling.", ex); } } } @@ -73,12 +78,13 @@ public class Scheduler { this.log.info("Periodic updater started for " + ctm.getClass().getName() + "; offset=" + offset + ", period=" + period + "."); int currentMinute = Calendar.getInstance().get(Calendar.MINUTE); - int initialDelay = (60 - currentMinute + offset) % 60; + int initialDelay = (period - (currentMinute % period) + offset) % period;
/* Run after initialDelay delay and then every period min. */ this.log.info("Periodic updater will start every " + period + "th min " - + "at minute " + ((currentMinute + initialDelay) % 60) + "."); - this.scheduler.scheduleAtFixedRate(ctm, initialDelay, 60, + + "at minute " + ((currentMinute + initialDelay) % period) + "." + + " The first start will happen in " + initialDelay + " minute(s)."); + this.scheduler.scheduleAtFixedRate(ctm, initialDelay, period, TimeUnit.MINUTES); }
@@ -98,5 +104,17 @@ public class Scheduler { } } } + + /** + * Provide a nice name for debugging and log thread creation. + */ + @Override + public Thread newThread(Runnable runner) { + Thread newThread = threads.newThread(runner); + newThread.setName("CollecTor-Scheduled-Thread-" + ++currentThreadNo); + log.info("New Thread created: " + newThread.getName()); + return newThread; + } + }
diff --git a/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java b/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java index e6720cd..8b5277b 100644 --- a/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java +++ b/src/main/java/org/torproject/collector/exitlists/ExitListDownloader.java @@ -42,18 +42,12 @@ public class ExitListDownloader extends CollecTorMain { }
@Override - public void run() { - logger.info("Starting exit-lists module of CollecTor."); - try { - startProcessing(); - } catch (ConfigurationException ce) { - logger.error("Configuration failed: " + ce, ce); - throw new RuntimeException(ce); - } - logger.info("Terminating exit-lists module of CollecTor."); + public String module() { + return "exitlists"; }
- private void startProcessing() throws ConfigurationException { + @Override + protected void startProcessing() throws ConfigurationException {
SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -92,7 +86,7 @@ public class ExitListDownloader extends CollecTorMain { return; } if (downloadedExitList == null) { - logger.warn("Failed downloading exit list"); + logger.warn("Failed downloading exit list."); return; }
diff --git a/src/main/java/org/torproject/collector/index/CreateIndexJson.java b/src/main/java/org/torproject/collector/index/CreateIndexJson.java index 80f183c..08c28c6 100644 --- a/src/main/java/org/torproject/collector/index/CreateIndexJson.java +++ b/src/main/java/org/torproject/collector/index/CreateIndexJson.java @@ -13,6 +13,8 @@ import com.google.gson.GsonBuilder;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory;
import java.io.BufferedWriter; import java.io.File; @@ -38,17 +40,19 @@ import java.util.zip.GZIPOutputStream; * we'll likely have to do that. */ public class CreateIndexJson extends CollecTorMain {
+ private static Logger log = LoggerFactory.getLogger(CreateIndexJson.class); + private static File indexJsonFile;
private static String basePath;
private static File[] indexedDirectories;
- static final String dateTimePattern = "yyyy-MM-dd HH:mm"; + private static final String dateTimePattern = "yyyy-MM-dd HH:mm";
- static final Locale dateTimeLocale = Locale.US; + private static final Locale dateTimeLocale = Locale.US;
- static final TimeZone dateTimezone = TimeZone.getTimeZone("UTC"); + private static final TimeZone dateTimezone = TimeZone.getTimeZone("UTC");
/** Creates indexes of directories containing archived and recent * descriptors and write index files to disk. */ @@ -57,7 +61,12 @@ public class CreateIndexJson extends CollecTorMain { }
@Override - public void run() { + public String module() { + return "updateindex"; + } + + @Override + protected void startProcessing() throws ConfigurationException { try { indexJsonFile = new File(config.getPath(Key.IndexPath).toFile(), "index.json"); @@ -67,12 +76,12 @@ public class CreateIndexJson extends CollecTorMain { config.getPath(Key.RecentPath).toFile() }; writeIndex(indexDirectories()); } catch (Exception e) { - throw new RuntimeException("Cannot run index creation: " + e.getMessage(), - e); + log.error("Cannot run index creation: " + e.getMessage(), e); + throw new RuntimeException(e); } }
- static class DirectoryNode implements Comparable<DirectoryNode> { + private class DirectoryNode implements Comparable<DirectoryNode> { String path; SortedSet<FileNode> files; SortedSet<DirectoryNode> directories; @@ -90,7 +99,7 @@ public class CreateIndexJson extends CollecTorMain { }
@SuppressWarnings({"checkstyle:membername", "checkstyle:parametername"}) - static class IndexNode { + private class IndexNode { String index_created; String path; SortedSet<FileNode> files; @@ -107,7 +116,7 @@ public class CreateIndexJson extends CollecTorMain { }
@SuppressWarnings({"checkstyle:membername", "checkstyle:parametername"}) - static class FileNode implements Comparable<FileNode> { + private class FileNode implements Comparable<FileNode> { String path; long size; String last_modified; @@ -123,7 +132,7 @@ public class CreateIndexJson extends CollecTorMain { } }
- static DateFormat dateTimeFormat; + private static DateFormat dateTimeFormat;
static { dateTimeFormat = new SimpleDateFormat(dateTimePattern, @@ -132,30 +141,44 @@ public class CreateIndexJson extends CollecTorMain { dateTimeFormat.setTimeZone(dateTimezone); }
- static IndexNode indexDirectories() { + private IndexNode indexDirectories() { SortedSet<DirectoryNode> directoryNodes = new TreeSet<DirectoryNode>(); + log.trace("indexing: " + indexedDirectories[0] + " " + + indexedDirectories[1]); for (File directory : indexedDirectories) { if (directory.exists() && directory.isDirectory()) { - directoryNodes.add(indexDirectory(directory)); + DirectoryNode dn = indexDirectory(directory); + if (null != dn) { + directoryNodes.add(dn); + } } } return new IndexNode(dateTimeFormat.format( System.currentTimeMillis()), basePath, null, directoryNodes); }
- static DirectoryNode indexDirectory(File directory) { + private DirectoryNode indexDirectory(File directory) { SortedSet<FileNode> fileNodes = new TreeSet<FileNode>(); SortedSet<DirectoryNode> directoryNodes = new TreeSet<DirectoryNode>(); - for (File fileOrDirectory : directory.listFiles()) { + log.trace("indexing: " + directory); + File[] fileList = directory.listFiles(); + if (null == fileList) { + log.warn("Indexing dubious directory: " + directory); + return null; + } + for (File fileOrDirectory : fileList) { if (fileOrDirectory.getName().startsWith(".")) { continue; } if (fileOrDirectory.isFile()) { fileNodes.add(indexFile(fileOrDirectory)); } else { - directoryNodes.add(indexDirectory(fileOrDirectory)); + DirectoryNode dn = indexDirectory(fileOrDirectory); + if (null != dn) { + directoryNodes.add(dn); + } } } DirectoryNode directoryNode = new DirectoryNode( @@ -164,13 +187,13 @@ public class CreateIndexJson extends CollecTorMain { return directoryNode; }
- static FileNode indexFile(File file) { + private FileNode indexFile(File file) { FileNode fileNode = new FileNode(file.getName(), file.length(), dateTimeFormat.format(file.lastModified())); return fileNode; }
- static void writeIndex(IndexNode indexNode) throws IOException { + private void writeIndex(IndexNode indexNode) throws IOException { Gson gson = new GsonBuilder().create(); String indexNodeString = gson.toJson(indexNode); Writer[] writers = new Writer[] { diff --git a/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java b/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java index 4ad87f5..24b7ab5 100644 --- a/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java +++ b/src/main/java/org/torproject/collector/relaydescs/ArchiveWriter.java @@ -111,33 +111,24 @@ public class ArchiveWriter extends CollecTorMain { }
@Override - public void run() { - logger.info("Starting relay-descriptors module of CollecTor."); - try { - recentPath = config.getPath(Key.RecentPath); - recentPathName = recentPath.toString(); - File statsDir = config.getPath(Key.StatsPath).toFile(); - storedServerDescriptorsFile = - new File(statsDir, "stored-server-descriptors"); - storedExtraInfoDescriptorsFile = - new File(statsDir, "stored-extra-info-descriptors"); - storedMicrodescriptorsFile = - new File(statsDir, "stored-microdescriptors"); - - startProcessing(); - new ReferenceChecker(recentPath.toFile(), - new File(statsDir, "references"), - new File(statsDir, "references-history")).check(); - } catch (ConfigurationException ce) { - logger.error("Configuration failed: " + ce, ce); - throw new RuntimeException(ce); - } - logger.info("Terminating relay-descriptors module of CollecTor."); + public String module() { + return "relaydescs"; }
- private void startProcessing() throws ConfigurationException { + @Override + protected void startProcessing() throws ConfigurationException { + recentPath = config.getPath(Key.RecentPath); + recentPathName = recentPath.toString(); + File statsDir = config.getPath(Key.StatsPath).toFile(); + storedServerDescriptorsFile + = new File(statsDir, "stored-server-descriptors"); + storedExtraInfoDescriptorsFile + = new File(statsDir, "stored-extra-info-descriptors"); + storedMicrodescriptorsFile + = new File(statsDir, "stored-microdescriptors"); File statsDirectory = config.getPath(Key.StatsPath).toFile(); - this.outputDirectory = config.getPath(Key.DirectoryArchivesOutputDirectory).toString(); + this.outputDirectory = config.getPath(Key.DirectoryArchivesOutputDirectory) + .toString(); SimpleDateFormat rsyncCatFormat = new SimpleDateFormat( "yyyy-MM-dd-HH-mm-ss"); rsyncCatFormat.setTimeZone(TimeZone.getTimeZone("UTC")); @@ -197,6 +188,10 @@ public class ArchiveWriter extends CollecTorMain { this.cleanUpRsyncDirectory();
this.saveDescriptorDigests(); + + new ReferenceChecker(recentPath.toFile(), + new File(statsDir, "references"), + new File(statsDir, "references-history")).check(); }
private void loadDescriptorDigests() { diff --git a/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java b/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java index 1fa2d41..6f8daf0 100644 --- a/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java +++ b/src/main/java/org/torproject/collector/torperf/TorperfDownloader.java @@ -46,18 +46,12 @@ public class TorperfDownloader extends CollecTorMain { private File torperfLastMergedFile;
@Override - public void run() { - logger.info("Starting torperf module of CollecTor."); - try { - startProcessing(); - } catch (ConfigurationException ce) { - logger.error("Configuration failed: " + ce, ce); - throw new RuntimeException(ce); - } - logger.info("Terminating torperf module of CollecTor."); + public String module() { + return "torperf"; }
- private void startProcessing() throws ConfigurationException { + @Override + protected void startProcessing() throws ConfigurationException { this.torperfFilesLines = config.getStringArray(Key.TorperfFilesLines); this.torperfOutputDirectory = config.getPath(Key.TorperfOutputDirectory) .toFile(); diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 9c9426d..f31cf74 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -93,7 +93,7 @@ </encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter"> - <level>TRACE</level> + <level>INFO</level> </filter> </appender>
@@ -114,7 +114,7 @@ <appender-ref ref="FILETORPERF" /> </logger>
- <logger name="org.torproject.collector.index" > + <logger name="org.torproject.collector.index" level="INFO" > <appender-ref ref="FILEUPDATEINDEX" /> </logger>
@@ -126,6 +126,14 @@ <appender-ref ref="FILEUPDATEINDEX" /> </logger>
+ <logger name="org.torproject.collector.cron" > + <appender-ref ref="FILEBRIDGEDESCS" /> + <appender-ref ref="FILEEXITLISTS" /> + <appender-ref ref="FILERELAYDESCS" /> + <appender-ref ref="FILETORPERF" /> + <appender-ref ref="FILEUPDATEINDEX" /> + </logger> + <logger name="org.torproject" > <appender-ref ref="CONSOLE" /> </logger> diff --git a/src/test/java/org/torproject/collector/cron/Dummy.java b/src/test/java/org/torproject/collector/cron/Dummy.java index 0231e69..449cc55 100644 --- a/src/test/java/org/torproject/collector/cron/Dummy.java +++ b/src/test/java/org/torproject/collector/cron/Dummy.java @@ -1,15 +1,21 @@ package org.torproject.collector.cron;
import org.torproject.collector.conf.Configuration; +import org.torproject.collector.conf.ConfigurationException;
public class Dummy extends CollecTorMain {
- public Dummy(Configuration c) { - super(c); - } + public Dummy(Configuration c) { + super(c); + }
- @Override - public void run() { + @Override + public void startProcessing() throws ConfigurationException { + // dummy doesn't do anything. + }
- } + @Override + public String module() { + return "dummy"; + } } diff --git a/src/test/java/org/torproject/collector/cron/SchedulerTest.java b/src/test/java/org/torproject/collector/cron/SchedulerTest.java index 0c4e922..4c01f91 100644 --- a/src/test/java/org/torproject/collector/cron/SchedulerTest.java +++ b/src/test/java/org/torproject/collector/cron/SchedulerTest.java @@ -30,8 +30,16 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
public class SchedulerTest {
- private static final String runConfigProperties = "TorperfActivated=true\n" - + "TorperfPeriodMinutes=10\nTorperfOffsetMinutes=7\n"; + private static final String runConfigProperties = + "TorperfActivated=true\nTorperfPeriodMinutes=1\nTorperfOffsetMinutes=0\n" + + "RelaydescsActivated=true\nRelaydescsPeriodMinutes=1" + + "\nRelaydescsOffsetMinutes=0\n" + + "ExitlistsActivated=true\nExitlistsPeriodMinutes=1\n" + + "ExitlistsOffsetMinutes=0\n" + + "UpdateindexActivated=true\nUpdateindexPeriodMinutes=1\n" + + "UpdateindexOffsetMinutes=0\n" + + "BridgedescsActivated=true\nBridgedescsPeriodMinutes=1\n" + + "BridgedescsOffsetMinutes=0\n";
@Test() public void testSimpleSchedule() throws Exception { @@ -39,15 +47,17 @@ public class SchedulerTest { Configuration conf = new Configuration(); conf.load(new ByteArrayInputStream(runConfigProperties.getBytes())); ctms.put(Key.TorperfActivated, Dummy.class); + ctms.put(Key.BridgedescsActivated, Dummy.class); + ctms.put(Key.RelaydescsActivated, Dummy.class); + ctms.put(Key.ExitlistsActivated, Dummy.class); + ctms.put(Key.UpdateindexActivated, Dummy.class); Field schedulerField = Scheduler.class.getDeclaredField("scheduler"); schedulerField.setAccessible(true); ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) schedulerField.get(Scheduler.getInstance()); assertTrue(stpe.getQueue().isEmpty()); Scheduler.getInstance().scheduleModuleRuns(ctms, conf); - assertEquals(stpe.getQueue().size(), 1); Scheduler.getInstance().shutdownScheduler(); - assertTrue(stpe.isShutdown()); }
}