commit 95bba76f5adb9defa2b9ae23762b0ff1f0bb737e
Author: iwakeh <iwakeh(a)torproject.org>
Date: Wed Aug 31 11:52:38 2016 +0200
Implements task-19895 run-once functionality.
CollecTorMain implements Callable and scheduler waits for termination
of all started Callables.
---
.../torproject/collector/cron/CollecTorMain.java | 11 ++++-
.../org/torproject/collector/cron/Scheduler.java | 57 +++++++++++++---------
2 files changed, 44 insertions(+), 24 deletions(-)
diff --git a/src/main/java/org/torproject/collector/cron/CollecTorMain.java b/src/main/java/org/torproject/collector/cron/CollecTorMain.java
index 26c9671..c7d0c7e 100644
--- a/src/main/java/org/torproject/collector/cron/CollecTorMain.java
+++ b/src/main/java/org/torproject/collector/cron/CollecTorMain.java
@@ -14,9 +14,11 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Observable;
import java.util.Observer;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
-public abstract class CollecTorMain implements Observer, Runnable {
+public abstract class CollecTorMain implements Callable<Object>, Observer,
+ Runnable {
private static final Logger logger = LoggerFactory.getLogger(
CollecTorMain.class);
@@ -58,6 +60,13 @@ public abstract class CollecTorMain implements Observer, Runnable {
logger.info("Terminating {} module of CollecTor.", module());
}
+ /** Wrapper for <code>run</code>. */
+ @Override
+ public final Object call() {
+ run();
+ return null;
+ }
+
@Override
public synchronized void update(Observable obs, Object obj) {
newConfigAvailable.set(true);
diff --git a/src/main/java/org/torproject/collector/cron/Scheduler.java b/src/main/java/org/torproject/collector/cron/Scheduler.java
index 78789f4..12fe1bb 100644
--- a/src/main/java/org/torproject/collector/cron/Scheduler.java
+++ b/src/main/java/org/torproject/collector/cron/Scheduler.java
@@ -11,8 +11,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
@@ -51,6 +53,7 @@ public final class Scheduler implements ThreadFactory {
*/
public void scheduleModuleRuns(Map<Key,
Class<? extends CollecTorMain>> collecTorMains, Configuration conf) {
+ List<Callable<Object>> runOnceMains = new ArrayList<>();
for (Map.Entry<Key, Class<? extends CollecTorMain>> ctmEntry
: collecTorMains.entrySet()) {
try {
@@ -58,9 +61,15 @@ public final class Scheduler implements ThreadFactory {
String prefix = ctmEntry.getKey().name().replace(ACTIVATED, "");
CollecTorMain ctm = ctmEntry.getValue()
.getConstructor(Configuration.class).newInstance(conf);
- scheduleExecutions(conf.getBool(Key.RunOnce), ctm,
- conf.getInt(Key.valueOf(prefix + OFFSETMIN)),
- conf.getInt(Key.valueOf(prefix + PERIODMIN)));
+ if (conf.getBool(Key.RunOnce)) {
+ logger.info("Prepare single run for " + ctm.getClass().getName()
+ + ".");
+ runOnceMains.add(Executors.callable(ctm));
+ } else {
+ scheduleExecutions(ctm,
+ conf.getInt(Key.valueOf(prefix + OFFSETMIN)),
+ conf.getInt(Key.valueOf(prefix + PERIODMIN)));
+ }
}
} catch (ConfigurationException | IllegalAccessException
| InstantiationException | InvocationTargetException
@@ -70,30 +79,32 @@ public final class Scheduler implements ThreadFactory {
+ ". Reason: " + ex.getMessage(), ex);
}
}
+ try {
+ if (conf.getBool(Key.RunOnce)) {
+ scheduler.invokeAll(runOnceMains);
+ }
+ } catch (ConfigurationException | InterruptedException
+ | RejectedExecutionException | NullPointerException ex) {
+ logger.error("Cannot schedule run-once: " + ex.getMessage(), ex);
+ }
}
private static final long MILLIS_IN_A_MINUTE = 60_000L;
- private void scheduleExecutions(boolean runOnce, CollecTorMain ctm,
- int offset, int period) {
- if (runOnce) {
- logger.info("Single run for " + ctm.getClass().getName() + ".");
- this.scheduler.execute(ctm);
- } else {
- logger.info("Periodic updater started for " + ctm.getClass().getName()
- + "; offset=" + offset + ", period=" + period + ".");
- long periodMillis = period * MILLIS_IN_A_MINUTE;
- long initialDelayMillis = computeInitialDelayMillis(
- System.currentTimeMillis(), offset * MILLIS_IN_A_MINUTE, periodMillis);
-
- /* Run after initialDelay delay and then every period min. */
- logger.info("Periodic updater will first run in {} and then every {} "
- + "minutes.", initialDelayMillis < MILLIS_IN_A_MINUTE
- ? "under 1 minute"
- : (initialDelayMillis / MILLIS_IN_A_MINUTE) + " minute(s)", period);
- this.scheduler.scheduleAtFixedRate(ctm, initialDelayMillis, periodMillis,
- TimeUnit.MILLISECONDS);
- }
+ private void scheduleExecutions(CollecTorMain ctm, int offset, int period) {
+ logger.info("Periodic updater started for " + ctm.getClass().getName()
+ + "; offset=" + offset + ", period=" + period + ".");
+ long periodMillis = period * MILLIS_IN_A_MINUTE;
+ long initialDelayMillis = computeInitialDelayMillis(
+ System.currentTimeMillis(), offset * MILLIS_IN_A_MINUTE, periodMillis);
+
+ /* Run after initialDelay delay and then every period min. */
+ logger.info("Periodic updater will first run in {} and then every {} "
+ + "minutes.", initialDelayMillis < MILLIS_IN_A_MINUTE
+ ? "under 1 minute"
+ : (initialDelayMillis / MILLIS_IN_A_MINUTE) + " minute(s)", period);
+ this.scheduler.scheduleAtFixedRate(ctm, initialDelayMillis, periodMillis,
+ TimeUnit.MILLISECONDS);
}
protected static long computeInitialDelayMillis(long currentMillis,