[tor-commits] [onionoo/master] Split clients data writer into two classes.

karsten at torproject.org karsten at torproject.org
Fri Apr 11 07:38:01 UTC 2014


commit 864c148564a6ec3be0d979b126b8ca88f01c4912
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Tue Apr 8 21:13:02 2014 +0200

    Split clients data writer into two classes.
---
 src/org/torproject/onionoo/ClientsDataWriter.java  |  486 --------------------
 .../torproject/onionoo/ClientsDocumentWriter.java  |  302 ++++++++++++
 .../torproject/onionoo/ClientsStatusUpdater.java   |  230 +++++++++
 src/org/torproject/onionoo/Main.java               |   10 +-
 4 files changed, 538 insertions(+), 490 deletions(-)

diff --git a/src/org/torproject/onionoo/ClientsDataWriter.java b/src/org/torproject/onionoo/ClientsDataWriter.java
deleted file mode 100644
index 7662f54..0000000
--- a/src/org/torproject/onionoo/ClientsDataWriter.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/* Copyright 2014 The Tor Project
- * See LICENSE for licensing information */
-package org.torproject.onionoo;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TimeZone;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.torproject.descriptor.Descriptor;
-import org.torproject.descriptor.ExtraInfoDescriptor;
-
-/*
- * Example extra-info descriptor used as input:
- *
- * extra-info ndnop2 DE6397A047ABE5F78B4C87AF725047831B221AAB
- * dirreq-stats-end 2014-02-16 16:42:11 (86400 s)
- * dirreq-v3-resp ok=856,not-enough-sigs=0,unavailable=0,not-found=0,
- *   not-modified=40,busy=0
- * bridge-stats-end 2014-02-16 16:42:17 (86400 s)
- * bridge-ips ??=8,in=8,se=8
- * bridge-ip-versions v4=8,v6=0
- *
- * Clients status file produced as intermediate output:
- *
- * 2014-02-15 16:42:11 2014-02-16 00:00:00
- *   259.042 in=86.347,se=86.347  v4=259.042
- * 2014-02-16 00:00:00 2014-02-16 16:42:11
- *   592.958 in=197.653,se=197.653  v4=592.958
- *
- * Clients document file produced as output:
- *
- * "1_month":{
- *   "first":"2014-02-03 12:00:00",
- *   "last":"2014-02-28 12:00:00",
- *   "interval":86400,
- *   "factor":0.139049349,
- *   "count":26,
- *   "values":[371,354,349,374,432,null,485,458,493,536,null,null,524,576,
- *             607,622,null,635,null,566,774,999,945,690,656,681],
- *   "countries":{"cn":0.0192,"in":0.1768,"ir":0.2487,"ru":0.0104,
- *                "se":0.1698,"sy":0.0325,"us":0.0406},
- *   "transports":{"obfs2":0.4581},
- *   "versions":{"v4":1.0000}}
- */
-public class ClientsDataWriter implements DescriptorListener,
-    StatusUpdater, FingerprintListener, DocumentWriter {
-
-  private DescriptorSource descriptorSource;
-
-  private DocumentStore documentStore;
-
-  private long now;
-
-  public ClientsDataWriter(DescriptorSource descriptorSource,
-      DocumentStore documentStore, Time time) {
-    this.descriptorSource = descriptorSource;
-    this.documentStore = documentStore;
-    this.now = time.currentTimeMillis();
-    this.registerDescriptorListeners();
-    this.registerFingerprintListeners();
-  }
-
-  private void registerDescriptorListeners() {
-    this.descriptorSource.registerDescriptorListener(this,
-        DescriptorType.BRIDGE_EXTRA_INFOS);
-  }
-
-  private void registerFingerprintListeners() {
-    this.descriptorSource.registerFingerprintListener(this,
-        DescriptorType.BRIDGE_EXTRA_INFOS);
-  }
-
-  public void processDescriptor(Descriptor descriptor, boolean relay) {
-    if (descriptor instanceof ExtraInfoDescriptor && !relay) {
-      this.processBridgeExtraInfoDescriptor(
-          (ExtraInfoDescriptor) descriptor);
-    }
-  }
-
-  private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L,
-      ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS;
-
-  private SortedMap<String, SortedSet<ClientsHistory>> newResponses =
-      new TreeMap<String, SortedSet<ClientsHistory>>();
-
-  private void processBridgeExtraInfoDescriptor(
-      ExtraInfoDescriptor descriptor) {
-    long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
-    long dirreqStatsIntervalLengthMillis =
-        descriptor.getDirreqStatsIntervalLength() * 1000L;
-    SortedMap<String, Integer> responses = descriptor.getDirreqV3Resp();
-    if (dirreqStatsEndMillis < 0L ||
-        dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS ||
-        responses == null || !responses.containsKey("ok")) {
-      return;
-    }
-    double okResponses = (double) (responses.get("ok") - 4);
-    if (okResponses < 0.0) {
-      return;
-    }
-    String hashedFingerprint = descriptor.getFingerprint().toUpperCase();
-    long dirreqStatsStartMillis = dirreqStatsEndMillis
-        - dirreqStatsIntervalLengthMillis;
-    long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
-        * ONE_DAY_MILLIS;
-    for (int i = 0; i < 2; i++) {
-      long startMillis = i == 0 ? dirreqStatsStartMillis : utcBreakMillis;
-      long endMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
-      if (startMillis >= endMillis) {
-        continue;
-      }
-      double totalResponses = okResponses
-          * ((double) (endMillis - startMillis))
-          / ((double) ONE_DAY_MILLIS);
-      SortedMap<String, Double> responsesByCountry =
-          this.weightResponsesWithUniqueIps(totalResponses,
-          descriptor.getBridgeIps(), "??");
-      SortedMap<String, Double> responsesByTransport =
-          this.weightResponsesWithUniqueIps(totalResponses,
-          descriptor.getBridgeIpTransports(), "<??>");
-      SortedMap<String, Double> responsesByVersion =
-          this.weightResponsesWithUniqueIps(totalResponses,
-          descriptor.getBridgeIpVersions(), "");
-      ClientsHistory newResponseHistory = new ClientsHistory(
-          startMillis, endMillis, totalResponses, responsesByCountry,
-          responsesByTransport, responsesByVersion); 
-      if (!this.newResponses.containsKey(hashedFingerprint)) {
-        this.newResponses.put(hashedFingerprint,
-            new TreeSet<ClientsHistory>());
-      }
-      this.newResponses.get(hashedFingerprint).add(
-          newResponseHistory);
-    }
-  }
-
-  private SortedMap<String, Double> weightResponsesWithUniqueIps(
-      double totalResponses, SortedMap<String, Integer> uniqueIps,
-      String omitString) {
-    SortedMap<String, Double> weightedResponses =
-        new TreeMap<String, Double>();
-    int totalUniqueIps = 0;
-    if (uniqueIps != null) {
-      for (Map.Entry<String, Integer> e : uniqueIps.entrySet()) {
-        if (e.getValue() > 4) {
-          totalUniqueIps += e.getValue() - 4;
-        }
-      }
-    }
-    if (totalUniqueIps > 0) {
-      for (Map.Entry<String, Integer> e : uniqueIps.entrySet()) {
-        if (!e.getKey().equals(omitString) && e.getValue() > 4) {
-          weightedResponses.put(e.getKey(),
-              (((double) (e.getValue() - 4)) * totalResponses)
-              / ((double) totalUniqueIps));
-        }
-      }
-    }
-    return weightedResponses;
-  }
-
-  public void updateStatuses() {
-    for (Map.Entry<String, SortedSet<ClientsHistory>> e :
-        this.newResponses.entrySet()) {
-      String hashedFingerprint = e.getKey();
-      ClientsStatus clientsStatus = this.documentStore.retrieve(
-          ClientsStatus.class, true, hashedFingerprint);
-      if (clientsStatus == null) {
-        clientsStatus = new ClientsStatus();
-      }
-      this.addToHistory(clientsStatus, e.getValue());
-      this.compressHistory(clientsStatus);
-      this.documentStore.store(clientsStatus, hashedFingerprint);
-    }
-    Logger.printStatusTime("Updated clients status files");
-  }
-
-  private void addToHistory(ClientsStatus clientsStatus,
-      SortedSet<ClientsHistory> newIntervals) {
-    SortedSet<ClientsHistory> history = clientsStatus.history;
-    for (ClientsHistory interval : newIntervals) {
-      if ((history.headSet(interval).isEmpty() ||
-          history.headSet(interval).last().endMillis <=
-          interval.startMillis) &&
-          (history.tailSet(interval).isEmpty() ||
-          history.tailSet(interval).first().startMillis >=
-          interval.endMillis)) {
-        history.add(interval);
-      }
-    }
-  }
-
-  private void compressHistory(ClientsStatus clientsStatus) {
-    SortedSet<ClientsHistory> history = clientsStatus.history;
-    SortedSet<ClientsHistory> compressedHistory =
-        new TreeSet<ClientsHistory>();
-    ClientsHistory lastResponses = null;
-    SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM");
-    dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-    String lastMonthString = "1970-01";
-    for (ClientsHistory responses : history) {
-      long intervalLengthMillis;
-      if (this.now - responses.endMillis <=
-          92L * 24L * 60L * 60L * 1000L) {
-        intervalLengthMillis = 24L * 60L * 60L * 1000L;
-      } else if (this.now - responses.endMillis <=
-          366L * 24L * 60L * 60L * 1000L) {
-        intervalLengthMillis = 2L * 24L * 60L * 60L * 1000L;
-      } else {
-        intervalLengthMillis = 10L * 24L * 60L * 60L * 1000L;
-      }
-      String monthString = dateTimeFormat.format(responses.startMillis);
-      if (lastResponses != null &&
-          lastResponses.endMillis == responses.startMillis &&
-          ((lastResponses.endMillis - 1L) / intervalLengthMillis) ==
-          ((responses.endMillis - 1L) / intervalLengthMillis) &&
-          lastMonthString.equals(monthString)) {
-        lastResponses.addResponses(responses);
-      } else {
-        if (lastResponses != null) {
-          compressedHistory.add(lastResponses);
-        }
-        lastResponses = responses;
-      }
-      lastMonthString = monthString;
-    }
-    if (lastResponses != null) {
-      compressedHistory.add(lastResponses);
-    }
-    clientsStatus.history = compressedHistory;
-  }
-
-  public void processFingerprints(SortedSet<String> fingerprints,
-      boolean relay) {
-    if (!relay) {
-      this.updateDocuments.addAll(fingerprints);
-    }
-  }
-
-  private SortedSet<String> updateDocuments = new TreeSet<String>();
-
-  public void writeDocuments() {
-    for (String hashedFingerprint : this.updateDocuments) {
-      ClientsStatus clientsStatus = this.documentStore.retrieve(
-          ClientsStatus.class, true, hashedFingerprint);
-      if (clientsStatus == null) {
-        continue;
-      }
-      SortedSet<ClientsHistory> history = clientsStatus.history;
-      ClientsDocument clientsDocument = new ClientsDocument();
-      clientsDocument.documentString = this.formatHistoryString(
-          hashedFingerprint, history);
-      this.documentStore.store(clientsDocument, hashedFingerprint);
-    }
-    Logger.printStatusTime("Wrote clients document files");
-  }
-
-  private String[] graphNames = new String[] {
-      "1_week",
-      "1_month",
-      "3_months",
-      "1_year",
-      "5_years" };
-
-  private long[] graphIntervals = new long[] {
-      7L * 24L * 60L * 60L * 1000L,
-      31L * 24L * 60L * 60L * 1000L,
-      92L * 24L * 60L * 60L * 1000L,
-      366L * 24L * 60L * 60L * 1000L,
-      5L * 366L * 24L * 60L * 60L * 1000L };
-
-  private long[] dataPointIntervals = new long[] {
-      24L * 60L * 60L * 1000L,
-      24L * 60L * 60L * 1000L,
-      24L * 60L * 60L * 1000L,
-      2L * 24L * 60L * 60L * 1000L,
-      10L * 24L * 60L * 60L * 1000L };
-
-  private String formatHistoryString(String hashedFingerprint,
-      SortedSet<ClientsHistory> history) {
-    StringBuilder sb = new StringBuilder();
-    sb.append("{\"fingerprint\":\"" + hashedFingerprint + "\"");
-    sb.append(",\n\"average_clients\":{");
-    int graphIntervalsWritten = 0;
-    for (int graphIntervalIndex = 0; graphIntervalIndex <
-        this.graphIntervals.length; graphIntervalIndex++) {
-      String timeline = this.formatTimeline(graphIntervalIndex, history);
-      if (timeline != null) {
-        sb.append((graphIntervalsWritten++ > 0 ? "," : "") + "\n"
-            + timeline);
-      }
-    }
-    sb.append("}");
-    sb.append("\n}\n");
-    return sb.toString();
-  }
-
-  private String formatTimeline(int graphIntervalIndex,
-      SortedSet<ClientsHistory> history) {
-    String graphName = this.graphNames[graphIntervalIndex];
-    long graphInterval = this.graphIntervals[graphIntervalIndex];
-    long dataPointInterval =
-        this.dataPointIntervals[graphIntervalIndex];
-    List<Double> dataPoints = new ArrayList<Double>();
-    long intervalStartMillis = ((this.now - graphInterval)
-        / dataPointInterval) * dataPointInterval;
-    long millis = 0L;
-    double responses = 0.0, totalResponses = 0.0;
-    SortedMap<String, Double>
-        totalResponsesByCountry = new TreeMap<String, Double>(),
-        totalResponsesByTransport = new TreeMap<String, Double>(),
-        totalResponsesByVersion = new TreeMap<String, Double>();
-    for (ClientsHistory hist : history) {
-      if (hist.endMillis < intervalStartMillis) {
-        continue;
-      }
-      while ((intervalStartMillis / dataPointInterval) !=
-          (hist.endMillis / dataPointInterval)) {
-        dataPoints.add(millis * 2L < dataPointInterval
-            ? -1.0 : responses * ((double) ONE_DAY_MILLIS)
-            / (((double) millis) * 10.0));
-        responses = 0.0;
-        millis = 0L;
-        intervalStartMillis += dataPointInterval;
-      }
-      responses += hist.totalResponses;
-      totalResponses += hist.totalResponses;
-      for (Map.Entry<String, Double> e :
-          hist.responsesByCountry.entrySet()) {
-        if (!totalResponsesByCountry.containsKey(e.getKey())) {
-          totalResponsesByCountry.put(e.getKey(), 0.0);
-        }
-        totalResponsesByCountry.put(e.getKey(), e.getValue()
-            + totalResponsesByCountry.get(e.getKey()));
-      }
-      for (Map.Entry<String, Double> e :
-          hist.responsesByTransport.entrySet()) {
-        if (!totalResponsesByTransport.containsKey(e.getKey())) {
-          totalResponsesByTransport.put(e.getKey(), 0.0);
-        }
-        totalResponsesByTransport.put(e.getKey(), e.getValue()
-            + totalResponsesByTransport.get(e.getKey()));
-      }
-      for (Map.Entry<String, Double> e :
-          hist.responsesByVersion.entrySet()) {
-        if (!totalResponsesByVersion.containsKey(e.getKey())) {
-          totalResponsesByVersion.put(e.getKey(), 0.0);
-        }
-        totalResponsesByVersion.put(e.getKey(), e.getValue()
-            + totalResponsesByVersion.get(e.getKey()));
-      }
-      millis += (hist.endMillis - hist.startMillis);
-    }
-    dataPoints.add(millis * 2L < dataPointInterval
-        ? -1.0 : responses * ((double) ONE_DAY_MILLIS)
-        / (((double) millis) * 10.0));
-    double maxValue = 0.0;
-    int firstNonNullIndex = -1, lastNonNullIndex = -1;
-    for (int dataPointIndex = 0; dataPointIndex < dataPoints.size();
-        dataPointIndex++) {
-      double dataPoint = dataPoints.get(dataPointIndex);
-      if (dataPoint >= 0.0) {
-        if (firstNonNullIndex < 0) {
-          firstNonNullIndex = dataPointIndex;
-        }
-        lastNonNullIndex = dataPointIndex;
-        if (dataPoint > maxValue) {
-          maxValue = dataPoint;
-        }
-      }
-    }
-    if (firstNonNullIndex < 0) {
-      return null;
-    }
-    long firstDataPointMillis = (((this.now - graphInterval)
-        / dataPointInterval) + firstNonNullIndex) * dataPointInterval
-        + dataPointInterval / 2L;
-    if (graphIntervalIndex > 0 && firstDataPointMillis >=
-        this.now - graphIntervals[graphIntervalIndex - 1]) {
-      /* Skip clients history object, because it doesn't contain
-       * anything new that wasn't already contained in the last
-       * clients history object(s). */
-      return null;
-    }
-    long lastDataPointMillis = firstDataPointMillis
-        + (lastNonNullIndex - firstNonNullIndex) * dataPointInterval;
-    double factor = ((double) maxValue) / 999.0;
-    int count = lastNonNullIndex - firstNonNullIndex + 1;
-    StringBuilder sb = new StringBuilder();
-    SimpleDateFormat dateTimeFormat = new SimpleDateFormat(
-        "yyyy-MM-dd HH:mm:ss");
-    dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-    sb.append("\"" + graphName + "\":{"
-        + "\"first\":\"" + dateTimeFormat.format(firstDataPointMillis)
-        + "\",\"last\":\"" + dateTimeFormat.format(lastDataPointMillis)
-        + "\",\"interval\":" + String.valueOf(dataPointInterval / 1000L)
-        + ",\"factor\":" + String.format(Locale.US, "%.9f", factor)
-        + ",\"count\":" + String.valueOf(count) + ",\"values\":[");
-    int dataPointsWritten = 0, previousNonNullIndex = -2;
-    boolean foundTwoAdjacentDataPoints = false;
-    for (int dataPointIndex = firstNonNullIndex; dataPointIndex <=
-        lastNonNullIndex; dataPointIndex++) {
-      double dataPoint = dataPoints.get(dataPointIndex);
-      if (dataPoint >= 0.0) {
-        if (dataPointIndex - previousNonNullIndex == 1) {
-          foundTwoAdjacentDataPoints = true;
-        }
-        previousNonNullIndex = dataPointIndex;
-      }
-      sb.append((dataPointsWritten++ > 0 ? "," : "")
-          + (dataPoint < 0.0 ? "null" :
-          String.valueOf((long) ((dataPoint * 999.0) / maxValue))));
-    }
-    sb.append("]");
-    if (!totalResponsesByCountry.isEmpty()) {
-      sb.append(",\"countries\":{");
-      int written = 0;
-      for (Map.Entry<String, Double> e :
-          totalResponsesByCountry.entrySet()) {
-        if (e.getValue() > totalResponses / 100.0) {
-          sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
-              + "\":" + String.format(Locale.US, "%.4f",
-              e.getValue() / totalResponses));
-        }
-      }
-      sb.append("}");
-    }
-    if (!totalResponsesByTransport.isEmpty()) {
-      sb.append(",\"transports\":{");
-      int written = 0;
-      for (Map.Entry<String, Double> e :
-          totalResponsesByTransport.entrySet()) {
-        if (e.getValue() > totalResponses / 100.0) {
-          sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
-              + "\":" + String.format(Locale.US, "%.4f",
-              e.getValue() / totalResponses));
-        }
-      }
-      sb.append("}");
-    }
-    if (!totalResponsesByVersion.isEmpty()) {
-      sb.append(",\"versions\":{");
-      int written = 0;
-      for (Map.Entry<String, Double> e :
-          totalResponsesByVersion.entrySet()) {
-        if (e.getValue() > totalResponses / 100.0) {
-          sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
-              + "\":" + String.format(Locale.US, "%.4f",
-              e.getValue() / totalResponses));
-        }
-      }
-      sb.append("}");
-    }
-    sb.append("}");
-    if (foundTwoAdjacentDataPoints) {
-      return sb.toString();
-    } else {
-      return null;
-    }
-  }
-
-  public String getStatsString() {
-    int newIntervals = 0;
-    for (SortedSet<ClientsHistory> hist : this.newResponses.values()) {
-      newIntervals += hist.size();
-    }
-    StringBuilder sb = new StringBuilder();
-    sb.append("    "
-        + Logger.formatDecimalNumber(newIntervals / 2)
-        + " client statistics processed from extra-info descriptors\n");
-    sb.append("    "
-        + Logger.formatDecimalNumber(this.newResponses.size())
-        + " client status files updated\n");
-    sb.append("    "
-        + Logger.formatDecimalNumber(this.updateDocuments.size())
-        + " client document files updated\n");
-    return sb.toString();
-  }
-}
-
diff --git a/src/org/torproject/onionoo/ClientsDocumentWriter.java b/src/org/torproject/onionoo/ClientsDocumentWriter.java
new file mode 100644
index 0000000..9d3b8dc
--- /dev/null
+++ b/src/org/torproject/onionoo/ClientsDocumentWriter.java
@@ -0,0 +1,302 @@
+/* Copyright 2014 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.onionoo;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+/*
+ * Clients status file produced as intermediate output:
+ *
+ * 2014-02-15 16:42:11 2014-02-16 00:00:00
+ *   259.042 in=86.347,se=86.347  v4=259.042
+ * 2014-02-16 00:00:00 2014-02-16 16:42:11
+ *   592.958 in=197.653,se=197.653  v4=592.958
+ *
+ * Clients document file produced as output:
+ *
+ * "1_month":{
+ *   "first":"2014-02-03 12:00:00",
+ *   "last":"2014-02-28 12:00:00",
+ *   "interval":86400,
+ *   "factor":0.139049349,
+ *   "count":26,
+ *   "values":[371,354,349,374,432,null,485,458,493,536,null,null,524,576,
+ *             607,622,null,635,null,566,774,999,945,690,656,681],
+ *   "countries":{"cn":0.0192,"in":0.1768,"ir":0.2487,"ru":0.0104,
+ *                "se":0.1698,"sy":0.0325,"us":0.0406},
+ *   "transports":{"obfs2":0.4581},
+ *   "versions":{"v4":1.0000}}
+ */
+public class ClientsDocumentWriter implements FingerprintListener,
+    DocumentWriter {
+
+  private DescriptorSource descriptorSource;
+
+  private DocumentStore documentStore;
+
+  private long now;
+
+  public ClientsDocumentWriter(DescriptorSource descriptorSource,
+      DocumentStore documentStore, Time time) {
+    this.descriptorSource = descriptorSource;
+    this.documentStore = documentStore;
+    this.now = time.currentTimeMillis();
+    this.registerFingerprintListeners();
+  }
+
+  private void registerFingerprintListeners() {
+    this.descriptorSource.registerFingerprintListener(this,
+        DescriptorType.BRIDGE_EXTRA_INFOS);
+  }
+
+  private SortedSet<String> updateDocuments = new TreeSet<String>();
+
+  public void processFingerprints(SortedSet<String> fingerprints,
+      boolean relay) {
+    if (!relay) {
+      this.updateDocuments.addAll(fingerprints);
+    }
+  }
+
+  private int writtenDocuments = 0;
+
+  public void writeDocuments() {
+    for (String hashedFingerprint : this.updateDocuments) {
+      ClientsStatus clientsStatus = this.documentStore.retrieve(
+          ClientsStatus.class, true, hashedFingerprint);
+      if (clientsStatus == null) {
+        continue;
+      }
+      SortedSet<ClientsHistory> history = clientsStatus.history;
+      ClientsDocument clientsDocument = new ClientsDocument();
+      clientsDocument.documentString = this.formatHistoryString(
+          hashedFingerprint, history);
+      this.documentStore.store(clientsDocument, hashedFingerprint);
+      this.writtenDocuments++;
+    }
+    Logger.printStatusTime("Wrote clients document files");
+  }
+
+  private String[] graphNames = new String[] {
+      "1_week",
+      "1_month",
+      "3_months",
+      "1_year",
+      "5_years" };
+
+  private long[] graphIntervals = new long[] {
+      7L * 24L * 60L * 60L * 1000L,
+      31L * 24L * 60L * 60L * 1000L,
+      92L * 24L * 60L * 60L * 1000L,
+      366L * 24L * 60L * 60L * 1000L,
+      5L * 366L * 24L * 60L * 60L * 1000L };
+
+  private long[] dataPointIntervals = new long[] {
+      24L * 60L * 60L * 1000L,
+      24L * 60L * 60L * 1000L,
+      24L * 60L * 60L * 1000L,
+      2L * 24L * 60L * 60L * 1000L,
+      10L * 24L * 60L * 60L * 1000L };
+
+  private String formatHistoryString(String hashedFingerprint,
+      SortedSet<ClientsHistory> history) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{\"fingerprint\":\"" + hashedFingerprint + "\"");
+    sb.append(",\n\"average_clients\":{");
+    int graphIntervalsWritten = 0;
+    for (int graphIntervalIndex = 0; graphIntervalIndex <
+        this.graphIntervals.length; graphIntervalIndex++) {
+      String timeline = this.formatTimeline(graphIntervalIndex, history);
+      if (timeline != null) {
+        sb.append((graphIntervalsWritten++ > 0 ? "," : "") + "\n"
+            + timeline);
+      }
+    }
+    sb.append("}");
+    sb.append("\n}\n");
+    return sb.toString();
+  }
+
+  private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L,
+      ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS;
+
+  private String formatTimeline(int graphIntervalIndex,
+      SortedSet<ClientsHistory> history) {
+    String graphName = this.graphNames[graphIntervalIndex];
+    long graphInterval = this.graphIntervals[graphIntervalIndex];
+    long dataPointInterval =
+        this.dataPointIntervals[graphIntervalIndex];
+    List<Double> dataPoints = new ArrayList<Double>();
+    long intervalStartMillis = ((this.now - graphInterval)
+        / dataPointInterval) * dataPointInterval;
+    long millis = 0L;
+    double responses = 0.0, totalResponses = 0.0;
+    SortedMap<String, Double>
+        totalResponsesByCountry = new TreeMap<String, Double>(),
+        totalResponsesByTransport = new TreeMap<String, Double>(),
+        totalResponsesByVersion = new TreeMap<String, Double>();
+    for (ClientsHistory hist : history) {
+      if (hist.endMillis < intervalStartMillis) {
+        continue;
+      }
+      while ((intervalStartMillis / dataPointInterval) !=
+          (hist.endMillis / dataPointInterval)) {
+        dataPoints.add(millis * 2L < dataPointInterval
+            ? -1.0 : responses * ((double) ONE_DAY_MILLIS)
+            / (((double) millis) * 10.0));
+        responses = 0.0;
+        millis = 0L;
+        intervalStartMillis += dataPointInterval;
+      }
+      responses += hist.totalResponses;
+      totalResponses += hist.totalResponses;
+      for (Map.Entry<String, Double> e :
+          hist.responsesByCountry.entrySet()) {
+        if (!totalResponsesByCountry.containsKey(e.getKey())) {
+          totalResponsesByCountry.put(e.getKey(), 0.0);
+        }
+        totalResponsesByCountry.put(e.getKey(), e.getValue()
+            + totalResponsesByCountry.get(e.getKey()));
+      }
+      for (Map.Entry<String, Double> e :
+          hist.responsesByTransport.entrySet()) {
+        if (!totalResponsesByTransport.containsKey(e.getKey())) {
+          totalResponsesByTransport.put(e.getKey(), 0.0);
+        }
+        totalResponsesByTransport.put(e.getKey(), e.getValue()
+            + totalResponsesByTransport.get(e.getKey()));
+      }
+      for (Map.Entry<String, Double> e :
+          hist.responsesByVersion.entrySet()) {
+        if (!totalResponsesByVersion.containsKey(e.getKey())) {
+          totalResponsesByVersion.put(e.getKey(), 0.0);
+        }
+        totalResponsesByVersion.put(e.getKey(), e.getValue()
+            + totalResponsesByVersion.get(e.getKey()));
+      }
+      millis += (hist.endMillis - hist.startMillis);
+    }
+    dataPoints.add(millis * 2L < dataPointInterval
+        ? -1.0 : responses * ((double) ONE_DAY_MILLIS)
+        / (((double) millis) * 10.0));
+    double maxValue = 0.0;
+    int firstNonNullIndex = -1, lastNonNullIndex = -1;
+    for (int dataPointIndex = 0; dataPointIndex < dataPoints.size();
+        dataPointIndex++) {
+      double dataPoint = dataPoints.get(dataPointIndex);
+      if (dataPoint >= 0.0) {
+        if (firstNonNullIndex < 0) {
+          firstNonNullIndex = dataPointIndex;
+        }
+        lastNonNullIndex = dataPointIndex;
+        if (dataPoint > maxValue) {
+          maxValue = dataPoint;
+        }
+      }
+    }
+    if (firstNonNullIndex < 0) {
+      return null;
+    }
+    long firstDataPointMillis = (((this.now - graphInterval)
+        / dataPointInterval) + firstNonNullIndex) * dataPointInterval
+        + dataPointInterval / 2L;
+    if (graphIntervalIndex > 0 && firstDataPointMillis >=
+        this.now - graphIntervals[graphIntervalIndex - 1]) {
+      /* Skip clients history object, because it doesn't contain
+       * anything new that wasn't already contained in the last
+       * clients history object(s). */
+      return null;
+    }
+    long lastDataPointMillis = firstDataPointMillis
+        + (lastNonNullIndex - firstNonNullIndex) * dataPointInterval;
+    double factor = ((double) maxValue) / 999.0;
+    int count = lastNonNullIndex - firstNonNullIndex + 1;
+    StringBuilder sb = new StringBuilder();
+    SimpleDateFormat dateTimeFormat = new SimpleDateFormat(
+        "yyyy-MM-dd HH:mm:ss");
+    dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    sb.append("\"" + graphName + "\":{"
+        + "\"first\":\"" + dateTimeFormat.format(firstDataPointMillis)
+        + "\",\"last\":\"" + dateTimeFormat.format(lastDataPointMillis)
+        + "\",\"interval\":" + String.valueOf(dataPointInterval / 1000L)
+        + ",\"factor\":" + String.format(Locale.US, "%.9f", factor)
+        + ",\"count\":" + String.valueOf(count) + ",\"values\":[");
+    int dataPointsWritten = 0, previousNonNullIndex = -2;
+    boolean foundTwoAdjacentDataPoints = false;
+    for (int dataPointIndex = firstNonNullIndex; dataPointIndex <=
+        lastNonNullIndex; dataPointIndex++) {
+      double dataPoint = dataPoints.get(dataPointIndex);
+      if (dataPoint >= 0.0) {
+        if (dataPointIndex - previousNonNullIndex == 1) {
+          foundTwoAdjacentDataPoints = true;
+        }
+        previousNonNullIndex = dataPointIndex;
+      }
+      sb.append((dataPointsWritten++ > 0 ? "," : "")
+          + (dataPoint < 0.0 ? "null" :
+          String.valueOf((long) ((dataPoint * 999.0) / maxValue))));
+    }
+    sb.append("]");
+    if (!totalResponsesByCountry.isEmpty()) {
+      sb.append(",\"countries\":{");
+      int written = 0;
+      for (Map.Entry<String, Double> e :
+          totalResponsesByCountry.entrySet()) {
+        if (e.getValue() > totalResponses / 100.0) {
+          sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
+              + "\":" + String.format(Locale.US, "%.4f",
+              e.getValue() / totalResponses));
+        }
+      }
+      sb.append("}");
+    }
+    if (!totalResponsesByTransport.isEmpty()) {
+      sb.append(",\"transports\":{");
+      int written = 0;
+      for (Map.Entry<String, Double> e :
+          totalResponsesByTransport.entrySet()) {
+        if (e.getValue() > totalResponses / 100.0) {
+          sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
+              + "\":" + String.format(Locale.US, "%.4f",
+              e.getValue() / totalResponses));
+        }
+      }
+      sb.append("}");
+    }
+    if (!totalResponsesByVersion.isEmpty()) {
+      sb.append(",\"versions\":{");
+      int written = 0;
+      for (Map.Entry<String, Double> e :
+          totalResponsesByVersion.entrySet()) {
+        if (e.getValue() > totalResponses / 100.0) {
+          sb.append((written++ > 0 ? "," : "") + "\"" + e.getKey()
+              + "\":" + String.format(Locale.US, "%.4f",
+              e.getValue() / totalResponses));
+        }
+      }
+      sb.append("}");
+    }
+    sb.append("}");
+    if (foundTwoAdjacentDataPoints) {
+      return sb.toString();
+    } else {
+      return null;
+    }
+  }
+
+  public String getStatsString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("    " + Logger.formatDecimalNumber(this.writtenDocuments)
+        + " clients document files updated\n");
+    return sb.toString();
+  }
+}
diff --git a/src/org/torproject/onionoo/ClientsStatusUpdater.java b/src/org/torproject/onionoo/ClientsStatusUpdater.java
new file mode 100644
index 0000000..e15c11a
--- /dev/null
+++ b/src/org/torproject/onionoo/ClientsStatusUpdater.java
@@ -0,0 +1,230 @@
+/* Copyright 2014 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.onionoo;
+
+import java.text.SimpleDateFormat;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.ExtraInfoDescriptor;
+
+/*
+ * Example extra-info descriptor used as input:
+ *
+ * extra-info ndnop2 DE6397A047ABE5F78B4C87AF725047831B221AAB
+ * dirreq-stats-end 2014-02-16 16:42:11 (86400 s)
+ * dirreq-v3-resp ok=856,not-enough-sigs=0,unavailable=0,not-found=0,
+ *   not-modified=40,busy=0
+ * bridge-stats-end 2014-02-16 16:42:17 (86400 s)
+ * bridge-ips ??=8,in=8,se=8
+ * bridge-ip-versions v4=8,v6=0
+ *
+ * Clients status file produced as intermediate output:
+ *
+ * 2014-02-15 16:42:11 2014-02-16 00:00:00
+ *   259.042 in=86.347,se=86.347  v4=259.042
+ * 2014-02-16 00:00:00 2014-02-16 16:42:11
+ *   592.958 in=197.653,se=197.653  v4=592.958
+ */
+public class ClientsStatusUpdater implements DescriptorListener,
+    StatusUpdater {
+
+  private DescriptorSource descriptorSource;
+
+  private DocumentStore documentStore;
+
+  private long now;
+
+  public ClientsStatusUpdater(DescriptorSource descriptorSource,
+      DocumentStore documentStore, Time time) {
+    this.descriptorSource = descriptorSource;
+    this.documentStore = documentStore;
+    this.now = time.currentTimeMillis();
+    this.registerDescriptorListeners();
+  }
+
+  private void registerDescriptorListeners() {
+    this.descriptorSource.registerDescriptorListener(this,
+        DescriptorType.BRIDGE_EXTRA_INFOS);
+  }
+
+  public void processDescriptor(Descriptor descriptor, boolean relay) {
+    if (descriptor instanceof ExtraInfoDescriptor && !relay) {
+      this.processBridgeExtraInfoDescriptor(
+          (ExtraInfoDescriptor) descriptor);
+    }
+  }
+
+  private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L,
+      ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS;
+
+  private SortedMap<String, SortedSet<ClientsHistory>> newResponses =
+      new TreeMap<String, SortedSet<ClientsHistory>>();
+
+  private void processBridgeExtraInfoDescriptor(
+      ExtraInfoDescriptor descriptor) {
+    long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
+    long dirreqStatsIntervalLengthMillis =
+        descriptor.getDirreqStatsIntervalLength() * 1000L;
+    SortedMap<String, Integer> responses = descriptor.getDirreqV3Resp();
+    if (dirreqStatsEndMillis < 0L ||
+        dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS ||
+        responses == null || !responses.containsKey("ok")) {
+      return;
+    }
+    double okResponses = (double) (responses.get("ok") - 4);
+    if (okResponses < 0.0) {
+      return;
+    }
+    String hashedFingerprint = descriptor.getFingerprint().toUpperCase();
+    long dirreqStatsStartMillis = dirreqStatsEndMillis
+        - dirreqStatsIntervalLengthMillis;
+    long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
+        * ONE_DAY_MILLIS;
+    for (int i = 0; i < 2; i++) {
+      long startMillis = i == 0 ? dirreqStatsStartMillis : utcBreakMillis;
+      long endMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
+      if (startMillis >= endMillis) {
+        continue;
+      }
+      double totalResponses = okResponses
+          * ((double) (endMillis - startMillis))
+          / ((double) ONE_DAY_MILLIS);
+      SortedMap<String, Double> responsesByCountry =
+          this.weightResponsesWithUniqueIps(totalResponses,
+          descriptor.getBridgeIps(), "??");
+      SortedMap<String, Double> responsesByTransport =
+          this.weightResponsesWithUniqueIps(totalResponses,
+          descriptor.getBridgeIpTransports(), "<??>");
+      SortedMap<String, Double> responsesByVersion =
+          this.weightResponsesWithUniqueIps(totalResponses,
+          descriptor.getBridgeIpVersions(), "");
+      ClientsHistory newResponseHistory = new ClientsHistory(
+          startMillis, endMillis, totalResponses, responsesByCountry,
+          responsesByTransport, responsesByVersion); 
+      if (!this.newResponses.containsKey(hashedFingerprint)) {
+        this.newResponses.put(hashedFingerprint,
+            new TreeSet<ClientsHistory>());
+      }
+      this.newResponses.get(hashedFingerprint).add(
+          newResponseHistory);
+    }
+  }
+
+  private SortedMap<String, Double> weightResponsesWithUniqueIps(
+      double totalResponses, SortedMap<String, Integer> uniqueIps,
+      String omitString) {
+    SortedMap<String, Double> weightedResponses =
+        new TreeMap<String, Double>();
+    int totalUniqueIps = 0;
+    if (uniqueIps != null) {
+      for (Map.Entry<String, Integer> e : uniqueIps.entrySet()) {
+        if (e.getValue() > 4) {
+          totalUniqueIps += e.getValue() - 4;
+        }
+      }
+    }
+    if (totalUniqueIps > 0) {
+      for (Map.Entry<String, Integer> e : uniqueIps.entrySet()) {
+        if (!e.getKey().equals(omitString) && e.getValue() > 4) {
+          weightedResponses.put(e.getKey(),
+              (((double) (e.getValue() - 4)) * totalResponses)
+              / ((double) totalUniqueIps));
+        }
+      }
+    }
+    return weightedResponses;
+  }
+
+  public void updateStatuses() {
+    for (Map.Entry<String, SortedSet<ClientsHistory>> e :
+        this.newResponses.entrySet()) {
+      String hashedFingerprint = e.getKey();
+      ClientsStatus clientsStatus = this.documentStore.retrieve(
+          ClientsStatus.class, true, hashedFingerprint);
+      if (clientsStatus == null) {
+        clientsStatus = new ClientsStatus();
+      }
+      this.addToHistory(clientsStatus, e.getValue());
+      this.compressHistory(clientsStatus);
+      this.documentStore.store(clientsStatus, hashedFingerprint);
+    }
+    Logger.printStatusTime("Updated clients status files");
+  }
+
+  private void addToHistory(ClientsStatus clientsStatus,
+      SortedSet<ClientsHistory> newIntervals) {
+    SortedSet<ClientsHistory> history = clientsStatus.history;
+    for (ClientsHistory interval : newIntervals) {
+      if ((history.headSet(interval).isEmpty() ||
+          history.headSet(interval).last().endMillis <=
+          interval.startMillis) &&
+          (history.tailSet(interval).isEmpty() ||
+          history.tailSet(interval).first().startMillis >=
+          interval.endMillis)) {
+        history.add(interval);
+      }
+    }
+  }
+
+  private void compressHistory(ClientsStatus clientsStatus) {
+    SortedSet<ClientsHistory> history = clientsStatus.history;
+    SortedSet<ClientsHistory> compressedHistory =
+        new TreeSet<ClientsHistory>();
+    ClientsHistory lastResponses = null;
+    SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM");
+    dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+    String lastMonthString = "1970-01";
+    for (ClientsHistory responses : history) {
+      long intervalLengthMillis;
+      if (this.now - responses.endMillis <=
+          92L * 24L * 60L * 60L * 1000L) {
+        intervalLengthMillis = 24L * 60L * 60L * 1000L;
+      } else if (this.now - responses.endMillis <=
+          366L * 24L * 60L * 60L * 1000L) {
+        intervalLengthMillis = 2L * 24L * 60L * 60L * 1000L;
+      } else {
+        intervalLengthMillis = 10L * 24L * 60L * 60L * 1000L;
+      }
+      String monthString = dateTimeFormat.format(responses.startMillis);
+      if (lastResponses != null &&
+          lastResponses.endMillis == responses.startMillis &&
+          ((lastResponses.endMillis - 1L) / intervalLengthMillis) ==
+          ((responses.endMillis - 1L) / intervalLengthMillis) &&
+          lastMonthString.equals(monthString)) {
+        lastResponses.addResponses(responses);
+      } else {
+        if (lastResponses != null) {
+          compressedHistory.add(lastResponses);
+        }
+        lastResponses = responses;
+      }
+      lastMonthString = monthString;
+    }
+    if (lastResponses != null) {
+      compressedHistory.add(lastResponses);
+    }
+    clientsStatus.history = compressedHistory;
+  }
+
+  public String getStatsString() {
+    int newIntervals = 0;
+    for (SortedSet<ClientsHistory> hist : this.newResponses.values()) {
+      newIntervals += hist.size();
+    }
+    StringBuilder sb = new StringBuilder();
+    sb.append("    "
+        + Logger.formatDecimalNumber(newIntervals / 2)
+        + " client statistics processed from extra-info descriptors\n");
+    sb.append("    "
+        + Logger.formatDecimalNumber(this.newResponses.size())
+        + " client status files updated\n");
+    return sb.toString();
+  }
+}
+
diff --git a/src/org/torproject/onionoo/Main.java b/src/org/torproject/onionoo/Main.java
index 1a5d083..af33124 100644
--- a/src/org/torproject/onionoo/Main.java
+++ b/src/org/torproject/onionoo/Main.java
@@ -36,12 +36,14 @@ public class Main {
     Logger.printStatusTime("Initialized bandwidth data writer");
     WeightsDataWriter wdw = new WeightsDataWriter(dso, ds, t);
     Logger.printStatusTime("Initialized weights data writer");
-    ClientsDataWriter cdw = new ClientsDataWriter(dso, ds, t);
-    Logger.printStatusTime("Initialized clients data writer");
+    ClientsStatusUpdater csu = new ClientsStatusUpdater(dso, ds, t);
+    Logger.printStatusTime("Initialized clients status updater");
     UptimeStatusUpdater usu = new UptimeStatusUpdater(dso, ds);
     Logger.printStatusTime("Initialized uptime status updater");
-    StatusUpdater[] sus = new StatusUpdater[] { ndw, bdw, wdw, cdw, usu };
+    StatusUpdater[] sus = new StatusUpdater[] { ndw, bdw, wdw, csu, usu };
 
+    ClientsDocumentWriter cdw = new ClientsDocumentWriter(dso, ds, t);
+    Logger.printStatusTime("Initialized clients document writer");
     UptimeDocumentWriter udw = new UptimeDocumentWriter(dso, ds, t);
     Logger.printStatusTime("Initialized uptime document writer");
     DocumentWriter[] dws = new DocumentWriter[] { ndw, bdw, wdw, cdw,
@@ -91,7 +93,7 @@ public class Main {
     }
     /* TODO Print status updater statistics for *all* status updaters once
      * all data writers have been separated. */
-    for (DocumentWriter dw : new DocumentWriter[] { udw }) {
+    for (DocumentWriter dw : new DocumentWriter[] { cdw, udw }) {
       String statsString = dw.getStatsString();
       if (statsString != null) {
         Logger.printStatistics(dw.getClass().getSimpleName(),






More information about the tor-commits mailing list