commit 95bba76f5adb9defa2b9ae23762b0ff1f0bb737e Author: iwakeh iwakeh@torproject.org Date: Wed Aug 31 11:52:38 2016 +0200
Implements task-19895 run-once functionality.
CollecTorMain implements Callable and scheduler waits for termination of all started Callables. --- .../torproject/collector/cron/CollecTorMain.java | 11 ++++- .../org/torproject/collector/cron/Scheduler.java | 57 +++++++++++++--------- 2 files changed, 44 insertions(+), 24 deletions(-)
diff --git a/src/main/java/org/torproject/collector/cron/CollecTorMain.java b/src/main/java/org/torproject/collector/cron/CollecTorMain.java index 26c9671..c7d0c7e 100644 --- a/src/main/java/org/torproject/collector/cron/CollecTorMain.java +++ b/src/main/java/org/torproject/collector/cron/CollecTorMain.java @@ -14,9 +14,11 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Observable; import java.util.Observer; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean;
-public abstract class CollecTorMain implements Observer, Runnable { +public abstract class CollecTorMain implements Callable<Object>, Observer, + Runnable {
private static final Logger logger = LoggerFactory.getLogger( CollecTorMain.class); @@ -58,6 +60,13 @@ public abstract class CollecTorMain implements Observer, Runnable { logger.info("Terminating {} module of CollecTor.", module()); }
+ /** Wrapper for <code>run</code>. */ + @Override + public final Object call() { + run(); + return null; + } + @Override public synchronized void update(Observable obs, Object obj) { newConfigAvailable.set(true); diff --git a/src/main/java/org/torproject/collector/cron/Scheduler.java b/src/main/java/org/torproject/collector/cron/Scheduler.java index 78789f4..12fe1bb 100644 --- a/src/main/java/org/torproject/collector/cron/Scheduler.java +++ b/src/main/java/org/torproject/collector/cron/Scheduler.java @@ -11,8 +11,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; @@ -51,6 +53,7 @@ public final class Scheduler implements ThreadFactory { */ public void scheduleModuleRuns(Map<Key, Class<? extends CollecTorMain>> collecTorMains, Configuration conf) { + List<Callable<Object>> runOnceMains = new ArrayList<>(); for (Map.Entry<Key, Class<? extends CollecTorMain>> ctmEntry : collecTorMains.entrySet()) { try { @@ -58,9 +61,15 @@ public final class Scheduler implements ThreadFactory { String prefix = ctmEntry.getKey().name().replace(ACTIVATED, ""); CollecTorMain ctm = ctmEntry.getValue() .getConstructor(Configuration.class).newInstance(conf); - scheduleExecutions(conf.getBool(Key.RunOnce), ctm, - conf.getInt(Key.valueOf(prefix + OFFSETMIN)), - conf.getInt(Key.valueOf(prefix + PERIODMIN))); + if (conf.getBool(Key.RunOnce)) { + logger.info("Prepare single run for " + ctm.getClass().getName() + + "."); + runOnceMains.add(Executors.callable(ctm)); + } else { + scheduleExecutions(ctm, + conf.getInt(Key.valueOf(prefix + OFFSETMIN)), + conf.getInt(Key.valueOf(prefix + PERIODMIN))); + } } } catch (ConfigurationException | IllegalAccessException | InstantiationException | InvocationTargetException @@ -70,30 +79,32 @@ public final class Scheduler implements ThreadFactory { + ". Reason: " + ex.getMessage(), ex); } } + try { + if (conf.getBool(Key.RunOnce)) { + scheduler.invokeAll(runOnceMains); + } + } catch (ConfigurationException | InterruptedException + | RejectedExecutionException | NullPointerException ex) { + logger.error("Cannot schedule run-once: " + ex.getMessage(), ex); + } }
private static final long MILLIS_IN_A_MINUTE = 60_000L;
- private void scheduleExecutions(boolean runOnce, CollecTorMain ctm, - int offset, int period) { - if (runOnce) { - logger.info("Single run for " + ctm.getClass().getName() + "."); - this.scheduler.execute(ctm); - } else { - logger.info("Periodic updater started for " + ctm.getClass().getName() - + "; offset=" + offset + ", period=" + period + "."); - long periodMillis = period * MILLIS_IN_A_MINUTE; - long initialDelayMillis = computeInitialDelayMillis( - System.currentTimeMillis(), offset * MILLIS_IN_A_MINUTE, periodMillis); - - /* Run after initialDelay delay and then every period min. */ - logger.info("Periodic updater will first run in {} and then every {} " - + "minutes.", initialDelayMillis < MILLIS_IN_A_MINUTE - ? "under 1 minute" - : (initialDelayMillis / MILLIS_IN_A_MINUTE) + " minute(s)", period); - this.scheduler.scheduleAtFixedRate(ctm, initialDelayMillis, periodMillis, - TimeUnit.MILLISECONDS); - } + private void scheduleExecutions(CollecTorMain ctm, int offset, int period) { + logger.info("Periodic updater started for " + ctm.getClass().getName() + + "; offset=" + offset + ", period=" + period + "."); + long periodMillis = period * MILLIS_IN_A_MINUTE; + long initialDelayMillis = computeInitialDelayMillis( + System.currentTimeMillis(), offset * MILLIS_IN_A_MINUTE, periodMillis); + + /* Run after initialDelay delay and then every period min. */ + logger.info("Periodic updater will first run in {} and then every {} " + + "minutes.", initialDelayMillis < MILLIS_IN_A_MINUTE + ? "under 1 minute" + : (initialDelayMillis / MILLIS_IN_A_MINUTE) + " minute(s)", period); + this.scheduler.scheduleAtFixedRate(ctm, initialDelayMillis, periodMillis, + TimeUnit.MILLISECONDS); }
protected static long computeInitialDelayMillis(long currentMillis,
tor-commits@lists.torproject.org