[tor-commits] [onionoo/master] Split uptime data writer into two classes.

karsten at torproject.org karsten at torproject.org
Fri Apr 11 07:38:01 UTC 2014


commit c2117e85a31bf38bb01bb7eeaff34b210786e386
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Tue Apr 8 20:42:18 2014 +0200

    Split uptime data writer into two classes.
---
 src/org/torproject/onionoo/Main.java               |   20 +-
 src/org/torproject/onionoo/UptimeDataWriter.java   |  469 --------------------
 .../torproject/onionoo/UptimeDocumentWriter.java   |  299 +++++++++++++
 .../torproject/onionoo/UptimeStatusUpdater.java    |  200 +++++++++
 4 files changed, 514 insertions(+), 474 deletions(-)

diff --git a/src/org/torproject/onionoo/Main.java b/src/org/torproject/onionoo/Main.java
index abae5cf..1a5d083 100644
--- a/src/org/torproject/onionoo/Main.java
+++ b/src/org/torproject/onionoo/Main.java
@@ -38,9 +38,12 @@ public class Main {
     Logger.printStatusTime("Initialized weights data writer");
     ClientsDataWriter cdw = new ClientsDataWriter(dso, ds, t);
     Logger.printStatusTime("Initialized clients data writer");
-    UptimeDataWriter udw = new UptimeDataWriter(dso, ds, t);
-    Logger.printStatusTime("Initialized uptime data writer");
-    StatusUpdater[] sus = new StatusUpdater[] { ndw, bdw, wdw, cdw, udw };
+    UptimeStatusUpdater usu = new UptimeStatusUpdater(dso, ds);
+    Logger.printStatusTime("Initialized uptime status updater");
+    StatusUpdater[] sus = new StatusUpdater[] { ndw, bdw, wdw, cdw, usu };
+
+    UptimeDocumentWriter udw = new UptimeDocumentWriter(dso, ds, t);
+    Logger.printStatusTime("Initialized uptime document writer");
     DocumentWriter[] dws = new DocumentWriter[] { ndw, bdw, wdw, cdw,
         udw };
 
@@ -86,8 +89,15 @@ public class Main {
             statsString);
       }
     }
-    /* TODO Print status updater statistics once all data writers have
-     * been separated. */
+    /* TODO Print status updater statistics for *all* status updaters once
+     * all data writers have been separated. */
+    for (DocumentWriter dw : new DocumentWriter[] { udw }) {
+      String statsString = dw.getStatsString();
+      if (statsString != null) {
+        Logger.printStatistics(dw.getClass().getSimpleName(),
+            statsString);
+      }
+    }
     Logger.printStatistics("Descriptor source", dso.getStatsString());
     Logger.printStatistics("Document store", ds.getStatsString());
     Logger.printStatistics("GeoIP lookup service", ls.getStatsString());
diff --git a/src/org/torproject/onionoo/UptimeDataWriter.java b/src/org/torproject/onionoo/UptimeDataWriter.java
deleted file mode 100644
index a2b2d29..0000000
--- a/src/org/torproject/onionoo/UptimeDataWriter.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/* Copyright 2014 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.onionoo;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.torproject.descriptor.BridgeNetworkStatus;
-import org.torproject.descriptor.Descriptor;
-import org.torproject.descriptor.NetworkStatusEntry;
-import org.torproject.descriptor.RelayNetworkStatusConsensus;
-
-public class UptimeDataWriter implements DescriptorListener,
-    StatusUpdater, FingerprintListener, DocumentWriter {
-
-  private DescriptorSource descriptorSource;
-
-  private DocumentStore documentStore;
-
-  private long now;
-
-  public UptimeDataWriter(DescriptorSource descriptorSource,
-      DocumentStore documentStore, Time time) {
-    this.descriptorSource = descriptorSource;
-    this.documentStore = documentStore;
-    this.now = time.currentTimeMillis();
-    this.registerDescriptorListeners();
-    this.registerFingerprintListeners();
-  }
-
-  private void registerDescriptorListeners() {
-    this.descriptorSource.registerDescriptorListener(this,
-        DescriptorType.RELAY_CONSENSUSES);
-    this.descriptorSource.registerDescriptorListener(this,
-        DescriptorType.BRIDGE_STATUSES);
-  }
-
-  public void registerFingerprintListeners() {
-    this.descriptorSource.registerFingerprintListener(this,
-        DescriptorType.RELAY_CONSENSUSES);
-    this.descriptorSource.registerFingerprintListener(this,
-        DescriptorType.BRIDGE_STATUSES);
-  }
-
-  public void processDescriptor(Descriptor descriptor, boolean relay) {
-    if (descriptor instanceof RelayNetworkStatusConsensus) {
-      this.processRelayNetworkStatusConsensus(
-          (RelayNetworkStatusConsensus) descriptor);
-    } else if (descriptor instanceof BridgeNetworkStatus) {
-      this.processBridgeNetworkStatus(
-          (BridgeNetworkStatus) descriptor);
-    }
-  }
-
-  private SortedSet<Long> newRelayStatuses = new TreeSet<Long>(),
-      newBridgeStatuses = new TreeSet<Long>();
-  private SortedMap<String, SortedSet<Long>>
-      newRunningRelays = new TreeMap<String, SortedSet<Long>>(),
-      newRunningBridges = new TreeMap<String, SortedSet<Long>>();
-
-  private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L;
-
-  private void processRelayNetworkStatusConsensus(
-      RelayNetworkStatusConsensus consensus) {
-    SortedSet<String> fingerprints = new TreeSet<String>();
-    for (NetworkStatusEntry entry :
-        consensus.getStatusEntries().values()) {
-      if (entry.getFlags().contains("Running")) {
-        fingerprints.add(entry.getFingerprint());
-      }
-    }
-    if (!fingerprints.isEmpty()) {
-      long dateHourMillis = (consensus.getValidAfterMillis()
-          / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS;
-      for (String fingerprint : fingerprints) {
-        if (!this.newRunningRelays.containsKey(fingerprint)) {
-          this.newRunningRelays.put(fingerprint, new TreeSet<Long>());
-        }
-        this.newRunningRelays.get(fingerprint).add(dateHourMillis);
-      }
-      this.newRelayStatuses.add(dateHourMillis);
-    }
-  }
-
-  private void processBridgeNetworkStatus(BridgeNetworkStatus status) {
-    SortedSet<String> fingerprints = new TreeSet<String>();
-    for (NetworkStatusEntry entry :
-        status.getStatusEntries().values()) {
-      if (entry.getFlags().contains("Running")) {
-        fingerprints.add(entry.getFingerprint());
-      }
-    }
-    if (!fingerprints.isEmpty()) {
-      long dateHourMillis = (status.getPublishedMillis()
-          / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS;
-      for (String fingerprint : fingerprints) {
-        if (!this.newRunningBridges.containsKey(fingerprint)) {
-          this.newRunningBridges.put(fingerprint, new TreeSet<Long>());
-        }
-        this.newRunningBridges.get(fingerprint).add(dateHourMillis);
-      }
-      this.newBridgeStatuses.add(dateHourMillis);
-    }
-  }
-
-  public void updateStatuses() {
-    for (Map.Entry<String, SortedSet<Long>> e :
-        this.newRunningRelays.entrySet()) {
-      this.updateStatus(true, e.getKey(), e.getValue());
-    }
-    this.updateStatus(true, null, this.newRelayStatuses);
-    for (Map.Entry<String, SortedSet<Long>> e :
-        this.newRunningBridges.entrySet()) {
-      this.updateStatus(false, e.getKey(), e.getValue());
-    }
-    this.updateStatus(false, null, this.newBridgeStatuses);
-    Logger.printStatusTime("Updated uptime status files");
-  }
-
-  private void updateStatus(boolean relay, String fingerprint,
-      SortedSet<Long> newUptimeHours) {
-    UptimeStatus uptimeStatus = this.readHistory(fingerprint);
-    if (uptimeStatus == null) {
-      uptimeStatus = new UptimeStatus();
-    }
-    this.addToHistory(uptimeStatus, relay, newUptimeHours);
-    this.compressHistory(uptimeStatus);
-    this.writeHistory(fingerprint, uptimeStatus);
-  }
-
-  private UptimeStatus readHistory(String fingerprint) {
-    return fingerprint == null ?
-        documentStore.retrieve(UptimeStatus.class, true) :
-        documentStore.retrieve(UptimeStatus.class, true, fingerprint);
-  }
-
-  private void addToHistory(UptimeStatus uptimeStatus, boolean relay,
-      SortedSet<Long> newIntervals) {
-    SortedSet<UptimeHistory> history = uptimeStatus.history;
-    for (long startMillis : newIntervals) {
-      UptimeHistory interval = new UptimeHistory(relay, startMillis, 1);
-      if (!history.headSet(interval).isEmpty()) {
-        UptimeHistory prev = history.headSet(interval).last();
-        if (prev.relay == interval.relay &&
-            prev.startMillis + ONE_HOUR_MILLIS * prev.uptimeHours >
-            interval.startMillis) {
-          continue;
-        }
-      }
-      if (!history.tailSet(interval).isEmpty()) {
-        UptimeHistory next = history.tailSet(interval).first();
-        if (next.relay == interval.relay &&
-            next.startMillis < interval.startMillis + ONE_HOUR_MILLIS) {
-          continue;
-        }
-      }
-      history.add(interval);
-    }
-  }
-
-  private void compressHistory(UptimeStatus uptimeStatus) {
-    SortedSet<UptimeHistory> history = uptimeStatus.history;
-    SortedSet<UptimeHistory> compressedHistory =
-        new TreeSet<UptimeHistory>();
-    UptimeHistory lastInterval = null;
-    for (UptimeHistory interval : history) {
-      if (lastInterval != null &&
-          lastInterval.startMillis + ONE_HOUR_MILLIS
-          * lastInterval.uptimeHours == interval.startMillis &&
-          lastInterval.relay == interval.relay) {
-        lastInterval.addUptime(interval);
-      } else {
-        if (lastInterval != null) {
-          compressedHistory.add(lastInterval);
-        }
-        lastInterval = interval;
-      }
-    }
-    if (lastInterval != null) {
-      compressedHistory.add(lastInterval);
-    }
-    uptimeStatus.history = compressedHistory;
-  }
-
-  private void writeHistory(String fingerprint,
-      UptimeStatus uptimeStatus) {
-    if (fingerprint == null) {
-      this.documentStore.store(uptimeStatus);
-    } else {
-      this.documentStore.store(uptimeStatus, fingerprint);
-    }
-  }
-
-  private SortedSet<String> newRelayFingerprints = new TreeSet<String>(),
-      newBridgeFingerprints = new TreeSet<String>();
-
-  public void processFingerprints(SortedSet<String> fingerprints,
-      boolean relay) {
-    if (relay) {
-      this.newRelayFingerprints.addAll(fingerprints);
-    } else {
-      this.newBridgeFingerprints.addAll(fingerprints);
-    }
-  }
-
-  public void writeDocuments() {
-    SortedSet<UptimeHistory>
-        knownRelayStatuses = new TreeSet<UptimeHistory>(),
-        knownBridgeStatuses = new TreeSet<UptimeHistory>();
-    UptimeStatus uptimeStatus = this.documentStore.retrieve(
-        UptimeStatus.class, true);
-    if (uptimeStatus == null) {
-      return;
-    }
-    SortedSet<UptimeHistory> knownStatuses = uptimeStatus.history;
-    for (UptimeHistory status : knownStatuses) {
-      if (status.relay) {
-        knownRelayStatuses.add(status);
-      } else {
-        knownBridgeStatuses.add(status);
-      }
-    }
-    for (String fingerprint : this.newRelayFingerprints) {
-      this.updateDocument(true, fingerprint, knownRelayStatuses);
-    }
-    for (String fingerprint : this.newBridgeFingerprints) {
-      this.updateDocument(false, fingerprint, knownBridgeStatuses);
-    }
-    Logger.printStatusTime("Wrote uptime document files");
-  }
-
-  private void updateDocument(boolean relay, String fingerprint,
-      SortedSet<UptimeHistory> knownStatuses) {
-    UptimeStatus uptimeStatus = this.documentStore.retrieve(
-        UptimeStatus.class, true, fingerprint);
-    if (uptimeStatus != null) {
-      SortedSet<UptimeHistory> history = uptimeStatus.history;
-      UptimeDocument uptimeDocument = new UptimeDocument();
-      uptimeDocument.documentString = this.formatHistoryString(relay,
-          fingerprint, history, knownStatuses);
-      this.documentStore.store(uptimeDocument, fingerprint);
-    }
-  }
-
-  private String[] graphNames = new String[] {
-      "1_week",
-      "1_month",
-      "3_months",
-      "1_year",
-      "5_years" };
-
-  private long[] graphIntervals = new long[] {
-      7L * 24L * 60L * 60L * 1000L,
-      31L * 24L * 60L * 60L * 1000L,
-      92L * 24L * 60L * 60L * 1000L,
-      366L * 24L * 60L * 60L * 1000L,
-      5L * 366L * 24L * 60L * 60L * 1000L };
-
-  private long[] dataPointIntervals = new long[] {
-      60L * 60L * 1000L,
-      4L * 60L * 60L * 1000L,
-      12L * 60L * 60L * 1000L,
-      2L * 24L * 60L * 60L * 1000L,
-      10L * 24L * 60L * 60L * 1000L };
-
-  private String formatHistoryString(boolean relay, String fingerprint,
-      SortedSet<UptimeHistory> history,
-      SortedSet<UptimeHistory> knownStatuses) {
-    StringBuilder sb = new StringBuilder();
-    sb.append("{\"fingerprint\":\"" + fingerprint + "\"");
-    sb.append(",\n\"uptime\":{");
-    int graphIntervalsWritten = 0;
-    for (int graphIntervalIndex = 0; graphIntervalIndex <
-        this.graphIntervals.length; graphIntervalIndex++) {
-      String timeline = this.formatTimeline(graphIntervalIndex, relay,
-          history, knownStatuses);
-      if (timeline != null) {
-        sb.append((graphIntervalsWritten++ > 0 ? "," : "") + "\n"
-            + timeline);
-      }
-    }
-    sb.append("}");
-    sb.append("\n}\n");
-    return sb.toString();
-  }
-
-  private String formatTimeline(int graphIntervalIndex, boolean relay,
-      SortedSet<UptimeHistory> history,
-      SortedSet<UptimeHistory> knownStatuses) {
-    String graphName = this.graphNames[graphIntervalIndex];
-    long graphInterval = this.graphIntervals[graphIntervalIndex];
-    long dataPointInterval =
-        this.dataPointIntervals[graphIntervalIndex];
-    int dataPointIntervalHours = (int) (dataPointInterval
-        / ONE_HOUR_MILLIS);
-    List<Integer> statusDataPoints = new ArrayList<Integer>();
-    long intervalStartMillis = ((this.now - graphInterval)
-        / dataPointInterval) * dataPointInterval;
-    int statusHours = 0;
-    for (UptimeHistory hist : knownStatuses) {
-      if (hist.relay != relay) {
-        continue;
-      }
-      long histEndMillis = hist.startMillis + ONE_HOUR_MILLIS
-          * hist.uptimeHours;
-      if (histEndMillis < intervalStartMillis) {
-        continue;
-      }
-      while (hist.startMillis >= intervalStartMillis
-          + dataPointInterval) {
-        statusDataPoints.add(statusHours * 5 > dataPointIntervalHours
-            ? statusHours : -1);
-        statusHours = 0;
-        intervalStartMillis += dataPointInterval;
-      }
-      while (histEndMillis >= intervalStartMillis + dataPointInterval) {
-        statusHours += (int) ((intervalStartMillis + dataPointInterval
-            - Math.max(hist.startMillis, intervalStartMillis))
-            / ONE_HOUR_MILLIS);
-        statusDataPoints.add(statusHours * 5 > dataPointIntervalHours
-            ? statusHours : -1);
-        statusHours = 0;
-        intervalStartMillis += dataPointInterval;
-      }
-      statusHours += (int) ((histEndMillis - Math.max(hist.startMillis,
-          intervalStartMillis)) / ONE_HOUR_MILLIS);
-    }
-    statusDataPoints.add(statusHours * 5 > dataPointIntervalHours
-        ? statusHours : -1);
-    List<Integer> uptimeDataPoints = new ArrayList<Integer>();
-    intervalStartMillis = ((this.now - graphInterval)
-        / dataPointInterval) * dataPointInterval;
-    int uptimeHours = 0;
-    long firstStatusStartMillis = -1L;
-    for (UptimeHistory hist : history) {
-      if (hist.relay != relay) {
-        continue;
-      }
-      if (firstStatusStartMillis < 0L) {
-        firstStatusStartMillis = hist.startMillis;
-      }
-      long histEndMillis = hist.startMillis + ONE_HOUR_MILLIS
-          * hist.uptimeHours;
-      if (histEndMillis < intervalStartMillis) {
-        continue;
-      }
-      while (hist.startMillis >= intervalStartMillis
-          + dataPointInterval) {
-        if (firstStatusStartMillis < intervalStartMillis
-            + dataPointInterval) {
-          uptimeDataPoints.add(uptimeHours);
-        } else {
-          uptimeDataPoints.add(-1);
-        }
-        uptimeHours = 0;
-        intervalStartMillis += dataPointInterval;
-      }
-      while (histEndMillis >= intervalStartMillis + dataPointInterval) {
-        uptimeHours += (int) ((intervalStartMillis + dataPointInterval
-            - Math.max(hist.startMillis, intervalStartMillis))
-            / ONE_HOUR_MILLIS);
-        uptimeDataPoints.add(uptimeHours);
-        uptimeHours = 0;
-        intervalStartMillis += dataPointInterval;
-      }
-      uptimeHours += (int) ((histEndMillis - Math.max(hist.startMillis,
-          intervalStartMillis)) / ONE_HOUR_MILLIS);
-    }
-    uptimeDataPoints.add(uptimeHours);
-    List<Double> dataPoints = new ArrayList<Double>();
-    for (int dataPointIndex = 0; dataPointIndex < statusDataPoints.size();
-        dataPointIndex++) {
-      if (dataPointIndex >= uptimeDataPoints.size()) {
-        dataPoints.add(0.0);
-      } else if (uptimeDataPoints.get(dataPointIndex) >= 0 &&
-          statusDataPoints.get(dataPointIndex) > 0) {
-        dataPoints.add(((double) uptimeDataPoints.get(dataPointIndex))
-            / ((double) statusDataPoints.get(dataPointIndex)));
-      } else {
-        dataPoints.add(-1.0);
-      }
-    }
-    int firstNonNullIndex = -1, lastNonNullIndex = -1;
-    for (int dataPointIndex = 0; dataPointIndex < dataPoints.size();
-        dataPointIndex++) {
-      double dataPoint = dataPoints.get(dataPointIndex);
-      if (dataPoint >= 0.0) {
-        if (firstNonNullIndex < 0) {
-          firstNonNullIndex = dataPointIndex;
-        }
-        lastNonNullIndex = dataPointIndex;
-      }
-    }
-    if (firstNonNullIndex < 0) {
-      return null;
-    }
-    long firstDataPointMillis = (((this.now - graphInterval)
-        / dataPointInterval) + firstNonNullIndex)
-        * dataPointInterval + dataPointInterval / 2L;
-    if (graphIntervalIndex > 0 && firstDataPointMillis >=
-        this.now - graphIntervals[graphIntervalIndex - 1]) {
-      /* Skip uptime history object, because it doesn't contain
-       * anything new that wasn't already contained in the last
-       * uptime history object(s). */
-      return null;
-    }
-    long lastDataPointMillis = firstDataPointMillis
-        + (lastNonNullIndex - firstNonNullIndex) * dataPointInterval;
-    double factor = 1.0 / 999.0;
-    int count = lastNonNullIndex - firstNonNullIndex + 1;
-    StringBuilder sb = new StringBuilder();
-    SimpleDateFormat dateTimeFormat = new SimpleDateFormat(
-        "yyyy-MM-dd HH:mm:ss");
-    dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-    sb.append("\"" + graphName + "\":{"
-        + "\"first\":\"" + dateTimeFormat.format(firstDataPointMillis)
-        + "\",\"last\":\"" + dateTimeFormat.format(lastDataPointMillis)
-        + "\",\"interval\":" + String.valueOf(dataPointInterval / 1000L)
-        + ",\"factor\":" + String.format(Locale.US, "%.9f", factor)
-        + ",\"count\":" + String.valueOf(count) + ",\"values\":[");
-    int dataPointsWritten = 0, previousNonNullIndex = -2;
-    boolean foundTwoAdjacentDataPoints = false;
-    for (int dataPointIndex = firstNonNullIndex; dataPointIndex <=
-        lastNonNullIndex; dataPointIndex++) {
-      double dataPoint = dataPoints.get(dataPointIndex);
-      if (dataPoint >= 0.0) {
-        if (dataPointIndex - previousNonNullIndex == 1) {
-          foundTwoAdjacentDataPoints = true;
-        }
-        previousNonNullIndex = dataPointIndex;
-      }
-      sb.append((dataPointsWritten++ > 0 ? "," : "")
-          + (dataPoint < -0.5 ? "null" :
-          String.valueOf((long) (dataPoint * 999.0))));
-    }
-    sb.append("]}");
-    if (foundTwoAdjacentDataPoints) {
-      return sb.toString();
-    } else {
-      return null;
-    }
-  }
-
-  public String getStatsString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("    " + Logger.formatDecimalNumber(
-            this.newRelayStatuses.size()) + " hours of relay uptimes "
-            + "processed\n");
-    sb.append("    " + Logger.formatDecimalNumber(
-        this.newBridgeStatuses.size()) + " hours of bridge uptimes "
-        + "processed\n");
-    sb.append("    " + Logger.formatDecimalNumber(
-        this.newRunningRelays.size() + this.newRunningBridges.size())
-        + " uptime status files updated\n");
-    sb.append("    " + Logger.formatDecimalNumber(
-        this.newRunningRelays.size() + this.newRunningBridges.size())
-        + " uptime document files updated\n");
-    return sb.toString();
-  }
-}
-
diff --git a/src/org/torproject/onionoo/UptimeDocumentWriter.java b/src/org/torproject/onionoo/UptimeDocumentWriter.java
new file mode 100644
index 0000000..5b03153
--- /dev/null
+++ b/src/org/torproject/onionoo/UptimeDocumentWriter.java
@@ -0,0 +1,299 @@
+/* Copyright 2014 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.onionoo;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.SortedSet;
+import java.util.TimeZone;
+import java.util.TreeSet;
+
+public class UptimeDocumentWriter implements FingerprintListener,
+    DocumentWriter {
+
+  private DescriptorSource descriptorSource;
+
+  private DocumentStore documentStore;
+
+  private long now;
+
+  public UptimeDocumentWriter(DescriptorSource descriptorSource,
+      DocumentStore documentStore, Time time) {
+    this.descriptorSource = descriptorSource;
+    this.documentStore = documentStore;
+    this.now = time.currentTimeMillis();
+    this.registerFingerprintListeners();
+  }
+
+  public void registerFingerprintListeners() {
+    this.descriptorSource.registerFingerprintListener(this,
+        DescriptorType.RELAY_CONSENSUSES);
+    this.descriptorSource.registerFingerprintListener(this,
+        DescriptorType.BRIDGE_STATUSES);
+  }
+
+  private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L;
+
+  private SortedSet<String> newRelayFingerprints = new TreeSet<String>(),
+      newBridgeFingerprints = new TreeSet<String>();
+
+  public void processFingerprints(SortedSet<String> fingerprints,
+      boolean relay) {
+    if (relay) {
+      this.newRelayFingerprints.addAll(fingerprints);
+    } else {
+      this.newBridgeFingerprints.addAll(fingerprints);
+    }
+  }
+
+  public void writeDocuments() {
+    SortedSet<UptimeHistory>
+        knownRelayStatuses = new TreeSet<UptimeHistory>(),
+        knownBridgeStatuses = new TreeSet<UptimeHistory>();
+    UptimeStatus uptimeStatus = this.documentStore.retrieve(
+        UptimeStatus.class, true);
+    if (uptimeStatus == null) {
+      return;
+    }
+    SortedSet<UptimeHistory> knownStatuses = uptimeStatus.history;
+    for (UptimeHistory status : knownStatuses) {
+      if (status.relay) {
+        knownRelayStatuses.add(status);
+      } else {
+        knownBridgeStatuses.add(status);
+      }
+    }
+    for (String fingerprint : this.newRelayFingerprints) {
+      this.updateDocument(true, fingerprint, knownRelayStatuses);
+    }
+    for (String fingerprint : this.newBridgeFingerprints) {
+      this.updateDocument(false, fingerprint, knownBridgeStatuses);
+    }
+    Logger.printStatusTime("Wrote uptime document files");
+  }
+
+  private int writtenDocuments = 0;
+
+  private void updateDocument(boolean relay, String fingerprint,
+      SortedSet<UptimeHistory> knownStatuses) {
+    UptimeStatus uptimeStatus = this.documentStore.retrieve(
+        UptimeStatus.class, true, fingerprint);
+    if (uptimeStatus != null) {
+      SortedSet<UptimeHistory> history = uptimeStatus.history;
+      UptimeDocument uptimeDocument = new UptimeDocument();
+      uptimeDocument.documentString = this.formatHistoryString(relay,
+          fingerprint, history, knownStatuses);
+      this.documentStore.store(uptimeDocument, fingerprint);
+      this.writtenDocuments++;
+    }
+  }
+
+  private String[] graphNames = new String[] {
+      "1_week",
+      "1_month",
+      "3_months",
+      "1_year",
+      "5_years" };
+
+  private long[] graphIntervals = new long[] {
+      7L * 24L * 60L * 60L * 1000L,
+      31L * 24L * 60L * 60L * 1000L,
+      92L * 24L * 60L * 60L * 1000L,
+      366L * 24L * 60L * 60L * 1000L,
+      5L * 366L * 24L * 60L * 60L * 1000L };
+
+  private long[] dataPointIntervals = new long[] {
+      60L * 60L * 1000L,
+      4L * 60L * 60L * 1000L,
+      12L * 60L * 60L * 1000L,
+      2L * 24L * 60L * 60L * 1000L,
+      10L * 24L * 60L * 60L * 1000L };
+
+  private String formatHistoryString(boolean relay, String fingerprint,
+      SortedSet<UptimeHistory> history,
+      SortedSet<UptimeHistory> knownStatuses) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{\"fingerprint\":\"" + fingerprint + "\"");
+    sb.append(",\n\"uptime\":{");
+    int graphIntervalsWritten = 0;
+    for (int graphIntervalIndex = 0; graphIntervalIndex <
+        this.graphIntervals.length; graphIntervalIndex++) {
+      String timeline = this.formatTimeline(graphIntervalIndex, relay,
+          history, knownStatuses);
+      if (timeline != null) {
+        sb.append((graphIntervalsWritten++ > 0 ? "," : "") + "\n"
+            + timeline);
+      }
+    }
+    sb.append("}");
+    sb.append("\n}\n");
+    return sb.toString();
+  }
+
+  private String formatTimeline(int graphIntervalIndex, boolean relay,
+      SortedSet<UptimeHistory> history,
+      SortedSet<UptimeHistory> knownStatuses) {
+    String graphName = this.graphNames[graphIntervalIndex];
+    long graphInterval = this.graphIntervals[graphIntervalIndex];
+    long dataPointInterval =
+        this.dataPointIntervals[graphIntervalIndex];
+    int dataPointIntervalHours = (int) (dataPointInterval
+        / ONE_HOUR_MILLIS);
+    List<Integer> statusDataPoints = new ArrayList<Integer>();
+    long intervalStartMillis = ((this.now - graphInterval)
+        / dataPointInterval) * dataPointInterval;
+    int statusHours = 0;
+    for (UptimeHistory hist : knownStatuses) {
+      if (hist.relay != relay) {
+        continue;
+      }
+      long histEndMillis = hist.startMillis + ONE_HOUR_MILLIS
+          * hist.uptimeHours;
+      if (histEndMillis < intervalStartMillis) {
+        continue;
+      }
+      while (hist.startMillis >= intervalStartMillis
+          + dataPointInterval) {
+        statusDataPoints.add(statusHours * 5 > dataPointIntervalHours
+            ? statusHours : -1);
+        statusHours = 0;
+        intervalStartMillis += dataPointInterval;
+      }
+      while (histEndMillis >= intervalStartMillis + dataPointInterval) {
+        statusHours += (int) ((intervalStartMillis + dataPointInterval
+            - Math.max(hist.startMillis, intervalStartMillis))
+            / ONE_HOUR_MILLIS);
+        statusDataPoints.add(statusHours * 5 > dataPointIntervalHours
+            ? statusHours : -1);
+        statusHours = 0;
+        intervalStartMillis += dataPointInterval;
+      }
+      statusHours += (int) ((histEndMillis - Math.max(hist.startMillis,
+          intervalStartMillis)) / ONE_HOUR_MILLIS);
+    }
+    statusDataPoints.add(statusHours * 5 > dataPointIntervalHours
+        ? statusHours : -1);
+    List<Integer> uptimeDataPoints = new ArrayList<Integer>();
+    intervalStartMillis = ((this.now - graphInterval)
+        / dataPointInterval) * dataPointInterval;
+    int uptimeHours = 0;
+    long firstStatusStartMillis = -1L;
+    for (UptimeHistory hist : history) {
+      if (hist.relay != relay) {
+        continue;
+      }
+      if (firstStatusStartMillis < 0L) {
+        firstStatusStartMillis = hist.startMillis;
+      }
+      long histEndMillis = hist.startMillis + ONE_HOUR_MILLIS
+          * hist.uptimeHours;
+      if (histEndMillis < intervalStartMillis) {
+        continue;
+      }
+      while (hist.startMillis >= intervalStartMillis
+          + dataPointInterval) {
+        if (firstStatusStartMillis < intervalStartMillis
+            + dataPointInterval) {
+          uptimeDataPoints.add(uptimeHours);
+        } else {
+          uptimeDataPoints.add(-1);
+        }
+        uptimeHours = 0;
+        intervalStartMillis += dataPointInterval;
+      }
+      while (histEndMillis >= intervalStartMillis + dataPointInterval) {
+        uptimeHours += (int) ((intervalStartMillis + dataPointInterval
+            - Math.max(hist.startMillis, intervalStartMillis))
+            / ONE_HOUR_MILLIS);
+        uptimeDataPoints.add(uptimeHours);
+        uptimeHours = 0;
+        intervalStartMillis += dataPointInterval;
+      }
+      uptimeHours += (int) ((histEndMillis - Math.max(hist.startMillis,
+          intervalStartMillis)) / ONE_HOUR_MILLIS);
+    }
+    uptimeDataPoints.add(uptimeHours);
+    List<Double> dataPoints = new ArrayList<Double>();
+    for (int dataPointIndex = 0; dataPointIndex < statusDataPoints.size();
+        dataPointIndex++) {
+      if (dataPointIndex >= uptimeDataPoints.size()) {
+        dataPoints.add(0.0);
+      } else if (uptimeDataPoints.get(dataPointIndex) >= 0 &&
+          statusDataPoints.get(dataPointIndex) > 0) {
+        dataPoints.add(((double) uptimeDataPoints.get(dataPointIndex))
+            / ((double) statusDataPoints.get(dataPointIndex)));
+      } else {
+        dataPoints.add(-1.0);
+      }
+    }
+    int firstNonNullIndex = -1, lastNonNullIndex = -1;
+    for (int dataPointIndex = 0; dataPointIndex < dataPoints.size();
+        dataPointIndex++) {
+      double dataPoint = dataPoints.get(dataPointIndex);
+      if (dataPoint >= 0.0) {
+        if (firstNonNullIndex < 0) {
+          firstNonNullIndex = dataPointIndex;
+        }
+        lastNonNullIndex = dataPointIndex;
+      }
+    }
+    if (firstNonNullIndex < 0) {
+      return null;
+    }
+    long firstDataPointMillis = (((this.now - graphInterval)
+        / dataPointInterval) + firstNonNullIndex)
+        * dataPointInterval + dataPointInterval / 2L;
+    if (graphIntervalIndex > 0 && firstDataPointMillis >=
+        this.now - graphIntervals[graphIntervalIndex - 1]) {
+      /* Skip uptime history object, because it doesn't contain
+       * anything new that wasn't already contained in the last
+       * uptime history object(s). */
+      return null;
+    }
+    long lastDataPointMillis = firstDataPointMillis
+        + (lastNonNullIndex - firstNonNullIndex) * dataPointInterval;
+    double factor = 1.0 / 999.0;
+    int count = lastNonNullIndex - firstNonNullIndex + 1;
+    StringBuilder sb = new StringBuilder();
+    SimpleDateFormat dateTimeFormat = new SimpleDateFormat(
+        "yyyy-MM-dd HH:mm:ss");
+    dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    sb.append("\"" + graphName + "\":{"
+        + "\"first\":\"" + dateTimeFormat.format(firstDataPointMillis)
+        + "\",\"last\":\"" + dateTimeFormat.format(lastDataPointMillis)
+        + "\",\"interval\":" + String.valueOf(dataPointInterval / 1000L)
+        + ",\"factor\":" + String.format(Locale.US, "%.9f", factor)
+        + ",\"count\":" + String.valueOf(count) + ",\"values\":[");
+    int dataPointsWritten = 0, previousNonNullIndex = -2;
+    boolean foundTwoAdjacentDataPoints = false;
+    for (int dataPointIndex = firstNonNullIndex; dataPointIndex <=
+        lastNonNullIndex; dataPointIndex++) {
+      double dataPoint = dataPoints.get(dataPointIndex);
+      if (dataPoint >= 0.0) {
+        if (dataPointIndex - previousNonNullIndex == 1) {
+          foundTwoAdjacentDataPoints = true;
+        }
+        previousNonNullIndex = dataPointIndex;
+      }
+      sb.append((dataPointsWritten++ > 0 ? "," : "")
+          + (dataPoint < -0.5 ? "null" :
+          String.valueOf((long) (dataPoint * 999.0))));
+    }
+    sb.append("]}");
+    if (foundTwoAdjacentDataPoints) {
+      return sb.toString();
+    } else {
+      return null;
+    }
+  }
+
+  public String getStatsString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("    " + Logger.formatDecimalNumber(this.writtenDocuments)
+        + " uptime document files written\n");
+    return sb.toString();
+  }
+}
+
diff --git a/src/org/torproject/onionoo/UptimeStatusUpdater.java b/src/org/torproject/onionoo/UptimeStatusUpdater.java
new file mode 100644
index 0000000..30ab703
--- /dev/null
+++ b/src/org/torproject/onionoo/UptimeStatusUpdater.java
@@ -0,0 +1,200 @@
+/* Copyright 2014 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.onionoo;
+
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.torproject.descriptor.BridgeNetworkStatus;
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.NetworkStatusEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
+
+public class UptimeStatusUpdater implements DescriptorListener,
+    StatusUpdater {
+
+  private DescriptorSource descriptorSource;
+
+  private DocumentStore documentStore;
+
+  public UptimeStatusUpdater(DescriptorSource descriptorSource,
+      DocumentStore documentStore) {
+    this.descriptorSource = descriptorSource;
+    this.documentStore = documentStore;
+    this.registerDescriptorListeners();
+  }
+
+  private void registerDescriptorListeners() {
+    this.descriptorSource.registerDescriptorListener(this,
+        DescriptorType.RELAY_CONSENSUSES);
+    this.descriptorSource.registerDescriptorListener(this,
+        DescriptorType.BRIDGE_STATUSES);
+  }
+
+  public void processDescriptor(Descriptor descriptor, boolean relay) {
+    if (descriptor instanceof RelayNetworkStatusConsensus) {
+      this.processRelayNetworkStatusConsensus(
+          (RelayNetworkStatusConsensus) descriptor);
+    } else if (descriptor instanceof BridgeNetworkStatus) {
+      this.processBridgeNetworkStatus(
+          (BridgeNetworkStatus) descriptor);
+    }
+  }
+
+  private SortedSet<Long> newRelayStatuses = new TreeSet<Long>(),
+      newBridgeStatuses = new TreeSet<Long>();
+  private SortedMap<String, SortedSet<Long>>
+      newRunningRelays = new TreeMap<String, SortedSet<Long>>(),
+      newRunningBridges = new TreeMap<String, SortedSet<Long>>();
+
+  private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L;
+
+  private void processRelayNetworkStatusConsensus(
+      RelayNetworkStatusConsensus consensus) {
+    SortedSet<String> fingerprints = new TreeSet<String>();
+    for (NetworkStatusEntry entry :
+        consensus.getStatusEntries().values()) {
+      if (entry.getFlags().contains("Running")) {
+        fingerprints.add(entry.getFingerprint());
+      }
+    }
+    if (!fingerprints.isEmpty()) {
+      long dateHourMillis = (consensus.getValidAfterMillis()
+          / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS;
+      for (String fingerprint : fingerprints) {
+        if (!this.newRunningRelays.containsKey(fingerprint)) {
+          this.newRunningRelays.put(fingerprint, new TreeSet<Long>());
+        }
+        this.newRunningRelays.get(fingerprint).add(dateHourMillis);
+      }
+      this.newRelayStatuses.add(dateHourMillis);
+    }
+  }
+
+  private void processBridgeNetworkStatus(BridgeNetworkStatus status) {
+    SortedSet<String> fingerprints = new TreeSet<String>();
+    for (NetworkStatusEntry entry :
+        status.getStatusEntries().values()) {
+      if (entry.getFlags().contains("Running")) {
+        fingerprints.add(entry.getFingerprint());
+      }
+    }
+    if (!fingerprints.isEmpty()) {
+      long dateHourMillis = (status.getPublishedMillis()
+          / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS;
+      for (String fingerprint : fingerprints) {
+        if (!this.newRunningBridges.containsKey(fingerprint)) {
+          this.newRunningBridges.put(fingerprint, new TreeSet<Long>());
+        }
+        this.newRunningBridges.get(fingerprint).add(dateHourMillis);
+      }
+      this.newBridgeStatuses.add(dateHourMillis);
+    }
+  }
+
+  public void updateStatuses() {
+    for (Map.Entry<String, SortedSet<Long>> e :
+        this.newRunningRelays.entrySet()) {
+      this.updateStatus(true, e.getKey(), e.getValue());
+    }
+    this.updateStatus(true, null, this.newRelayStatuses);
+    for (Map.Entry<String, SortedSet<Long>> e :
+        this.newRunningBridges.entrySet()) {
+      this.updateStatus(false, e.getKey(), e.getValue());
+    }
+    this.updateStatus(false, null, this.newBridgeStatuses);
+    Logger.printStatusTime("Updated uptime status files");
+  }
+
+  private void updateStatus(boolean relay, String fingerprint,
+      SortedSet<Long> newUptimeHours) {
+    UptimeStatus uptimeStatus = this.readHistory(fingerprint);
+    if (uptimeStatus == null) {
+      uptimeStatus = new UptimeStatus();
+    }
+    this.addToHistory(uptimeStatus, relay, newUptimeHours);
+    this.compressHistory(uptimeStatus);
+    this.writeHistory(fingerprint, uptimeStatus);
+  }
+
+  private UptimeStatus readHistory(String fingerprint) {
+    return fingerprint == null ?
+        documentStore.retrieve(UptimeStatus.class, true) :
+        documentStore.retrieve(UptimeStatus.class, true, fingerprint);
+  }
+
+  private void addToHistory(UptimeStatus uptimeStatus, boolean relay,
+      SortedSet<Long> newIntervals) {
+    SortedSet<UptimeHistory> history = uptimeStatus.history;
+    for (long startMillis : newIntervals) {
+      UptimeHistory interval = new UptimeHistory(relay, startMillis, 1);
+      if (!history.headSet(interval).isEmpty()) {
+        UptimeHistory prev = history.headSet(interval).last();
+        if (prev.relay == interval.relay &&
+            prev.startMillis + ONE_HOUR_MILLIS * prev.uptimeHours >
+            interval.startMillis) {
+          continue;
+        }
+      }
+      if (!history.tailSet(interval).isEmpty()) {
+        UptimeHistory next = history.tailSet(interval).first();
+        if (next.relay == interval.relay &&
+            next.startMillis < interval.startMillis + ONE_HOUR_MILLIS) {
+          continue;
+        }
+      }
+      history.add(interval);
+    }
+  }
+
+  private void compressHistory(UptimeStatus uptimeStatus) {
+    SortedSet<UptimeHistory> history = uptimeStatus.history;
+    SortedSet<UptimeHistory> compressedHistory =
+        new TreeSet<UptimeHistory>();
+    UptimeHistory lastInterval = null;
+    for (UptimeHistory interval : history) {
+      if (lastInterval != null &&
+          lastInterval.startMillis + ONE_HOUR_MILLIS
+          * lastInterval.uptimeHours == interval.startMillis &&
+          lastInterval.relay == interval.relay) {
+        lastInterval.addUptime(interval);
+      } else {
+        if (lastInterval != null) {
+          compressedHistory.add(lastInterval);
+        }
+        lastInterval = interval;
+      }
+    }
+    if (lastInterval != null) {
+      compressedHistory.add(lastInterval);
+    }
+    uptimeStatus.history = compressedHistory;
+  }
+
+  private void writeHistory(String fingerprint,
+      UptimeStatus uptimeStatus) {
+    if (fingerprint == null) {
+      this.documentStore.store(uptimeStatus);
+    } else {
+      this.documentStore.store(uptimeStatus, fingerprint);
+    }
+  }
+
+  public String getStatsString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("    " + Logger.formatDecimalNumber(
+            this.newRelayStatuses.size()) + " hours of relay uptimes "
+            + "processed\n");
+    sb.append("    " + Logger.formatDecimalNumber(
+        this.newBridgeStatuses.size()) + " hours of bridge uptimes "
+        + "processed\n");
+    sb.append("    " + Logger.formatDecimalNumber(
+        this.newRunningRelays.size() + this.newRunningBridges.size())
+        + " uptime status files updated\n");
+    return sb.toString();
+  }
+}
+






More information about the tor-commits mailing list