[tor-commits] [collector/master] Implements task-19895 run-once functionality.

karsten at torproject.org karsten at torproject.org
Fri Sep 2 14:38:06 UTC 2016


commit 95bba76f5adb9defa2b9ae23762b0ff1f0bb737e
Author: iwakeh <iwakeh at torproject.org>
Date:   Wed Aug 31 11:52:38 2016 +0200

    Implements task-19895 run-once functionality.
    
    CollecTorMain implements Callable and scheduler waits for termination
    of all started Callables.
---
 .../torproject/collector/cron/CollecTorMain.java   | 11 ++++-
 .../org/torproject/collector/cron/Scheduler.java   | 57 +++++++++++++---------
 2 files changed, 44 insertions(+), 24 deletions(-)

diff --git a/src/main/java/org/torproject/collector/cron/CollecTorMain.java b/src/main/java/org/torproject/collector/cron/CollecTorMain.java
index 26c9671..c7d0c7e 100644
--- a/src/main/java/org/torproject/collector/cron/CollecTorMain.java
+++ b/src/main/java/org/torproject/collector/cron/CollecTorMain.java
@@ -14,9 +14,11 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Observable;
 import java.util.Observer;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public abstract class CollecTorMain implements Observer, Runnable {
+public abstract class CollecTorMain implements Callable<Object>, Observer,
+    Runnable {
 
   private static final Logger logger = LoggerFactory.getLogger(
       CollecTorMain.class);
@@ -58,6 +60,13 @@ public abstract class CollecTorMain implements Observer, Runnable {
     logger.info("Terminating {} module of CollecTor.", module());
   }
 
+  /** Wrapper for <code>run</code>. */
+  @Override
+  public final Object call() {
+    run();
+    return null;
+  }
+
   @Override
   public synchronized void update(Observable obs, Object obj) {
     newConfigAvailable.set(true);
diff --git a/src/main/java/org/torproject/collector/cron/Scheduler.java b/src/main/java/org/torproject/collector/cron/Scheduler.java
index 78789f4..12fe1bb 100644
--- a/src/main/java/org/torproject/collector/cron/Scheduler.java
+++ b/src/main/java/org/torproject/collector/cron/Scheduler.java
@@ -11,8 +11,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -51,6 +53,7 @@ public final class Scheduler implements ThreadFactory {
    */
   public void scheduleModuleRuns(Map<Key,
       Class<? extends CollecTorMain>> collecTorMains, Configuration conf) {
+    List<Callable<Object>> runOnceMains = new ArrayList<>();
     for (Map.Entry<Key, Class<? extends CollecTorMain>> ctmEntry
         : collecTorMains.entrySet()) {
       try {
@@ -58,9 +61,15 @@ public final class Scheduler implements ThreadFactory {
           String prefix = ctmEntry.getKey().name().replace(ACTIVATED, "");
           CollecTorMain ctm = ctmEntry.getValue()
               .getConstructor(Configuration.class).newInstance(conf);
-          scheduleExecutions(conf.getBool(Key.RunOnce), ctm,
-              conf.getInt(Key.valueOf(prefix + OFFSETMIN)),
-              conf.getInt(Key.valueOf(prefix + PERIODMIN)));
+          if (conf.getBool(Key.RunOnce)) {
+            logger.info("Prepare single run for " + ctm.getClass().getName()
+                + ".");
+            runOnceMains.add(Executors.callable(ctm));
+          } else {
+            scheduleExecutions(ctm,
+                conf.getInt(Key.valueOf(prefix + OFFSETMIN)),
+                conf.getInt(Key.valueOf(prefix + PERIODMIN)));
+          }
         }
       } catch (ConfigurationException | IllegalAccessException
           | InstantiationException | InvocationTargetException
@@ -70,30 +79,32 @@ public final class Scheduler implements ThreadFactory {
             + ". Reason: " + ex.getMessage(), ex);
       }
     }
+    try {
+      if (conf.getBool(Key.RunOnce)) {
+        scheduler.invokeAll(runOnceMains);
+      }
+    } catch (ConfigurationException | InterruptedException
+        | RejectedExecutionException | NullPointerException ex) {
+      logger.error("Cannot schedule run-once: " + ex.getMessage(), ex);
+    }
   }
 
   private static final long MILLIS_IN_A_MINUTE = 60_000L;
 
-  private void scheduleExecutions(boolean runOnce, CollecTorMain ctm,
-      int offset, int period) {
-    if (runOnce) {
-      logger.info("Single run for " + ctm.getClass().getName() + ".");
-      this.scheduler.execute(ctm);
-    } else {
-      logger.info("Periodic updater started for " + ctm.getClass().getName()
-          + "; offset=" + offset + ", period=" + period + ".");
-      long periodMillis = period * MILLIS_IN_A_MINUTE;
-      long initialDelayMillis = computeInitialDelayMillis(
-          System.currentTimeMillis(), offset * MILLIS_IN_A_MINUTE, periodMillis);
-
-      /* Run after initialDelay delay and then every period min. */
-      logger.info("Periodic updater will first run in {} and then every {} "
-          + "minutes.", initialDelayMillis < MILLIS_IN_A_MINUTE
-          ? "under 1 minute"
-          : (initialDelayMillis / MILLIS_IN_A_MINUTE) + " minute(s)", period);
-      this.scheduler.scheduleAtFixedRate(ctm, initialDelayMillis, periodMillis,
-          TimeUnit.MILLISECONDS);
-    }
+  private void scheduleExecutions(CollecTorMain ctm, int offset, int period) {
+    logger.info("Periodic updater started for " + ctm.getClass().getName()
+        + "; offset=" + offset + ", period=" + period + ".");
+    long periodMillis = period * MILLIS_IN_A_MINUTE;
+    long initialDelayMillis = computeInitialDelayMillis(
+        System.currentTimeMillis(), offset * MILLIS_IN_A_MINUTE, periodMillis);
+
+    /* Run after initialDelay delay and then every period min. */
+    logger.info("Periodic updater will first run in {} and then every {} "
+        + "minutes.", initialDelayMillis < MILLIS_IN_A_MINUTE
+        ? "under 1 minute"
+        : (initialDelayMillis / MILLIS_IN_A_MINUTE) + " minute(s)", period);
+    this.scheduler.scheduleAtFixedRate(ctm, initialDelayMillis, periodMillis,
+        TimeUnit.MILLISECONDS);
   }
 
   protected static long computeInitialDelayMillis(long currentMillis,



More information about the tor-commits mailing list