commit c2117e85a31bf38bb01bb7eeaff34b210786e386 Author: Karsten Loesing karsten.loesing@gmx.net Date: Tue Apr 8 20:42:18 2014 +0200
Split uptime data writer into two classes. --- src/org/torproject/onionoo/Main.java | 20 +- src/org/torproject/onionoo/UptimeDataWriter.java | 469 -------------------- .../torproject/onionoo/UptimeDocumentWriter.java | 299 +++++++++++++ .../torproject/onionoo/UptimeStatusUpdater.java | 200 +++++++++ 4 files changed, 514 insertions(+), 474 deletions(-)
diff --git a/src/org/torproject/onionoo/Main.java b/src/org/torproject/onionoo/Main.java index abae5cf..1a5d083 100644 --- a/src/org/torproject/onionoo/Main.java +++ b/src/org/torproject/onionoo/Main.java @@ -38,9 +38,12 @@ public class Main { Logger.printStatusTime("Initialized weights data writer"); ClientsDataWriter cdw = new ClientsDataWriter(dso, ds, t); Logger.printStatusTime("Initialized clients data writer"); - UptimeDataWriter udw = new UptimeDataWriter(dso, ds, t); - Logger.printStatusTime("Initialized uptime data writer"); - StatusUpdater[] sus = new StatusUpdater[] { ndw, bdw, wdw, cdw, udw }; + UptimeStatusUpdater usu = new UptimeStatusUpdater(dso, ds); + Logger.printStatusTime("Initialized uptime status updater"); + StatusUpdater[] sus = new StatusUpdater[] { ndw, bdw, wdw, cdw, usu }; + + UptimeDocumentWriter udw = new UptimeDocumentWriter(dso, ds, t); + Logger.printStatusTime("Initialized uptime document writer"); DocumentWriter[] dws = new DocumentWriter[] { ndw, bdw, wdw, cdw, udw };
@@ -86,8 +89,15 @@ public class Main { statsString); } } - /* TODO Print status updater statistics once all data writers have - * been separated. */ + /* TODO Print status updater statistics for *all* status updaters once + * all data writers have been separated. */ + for (DocumentWriter dw : new DocumentWriter[] { udw }) { + String statsString = dw.getStatsString(); + if (statsString != null) { + Logger.printStatistics(dw.getClass().getSimpleName(), + statsString); + } + } Logger.printStatistics("Descriptor source", dso.getStatsString()); Logger.printStatistics("Document store", ds.getStatsString()); Logger.printStatistics("GeoIP lookup service", ls.getStatsString()); diff --git a/src/org/torproject/onionoo/UptimeDataWriter.java b/src/org/torproject/onionoo/UptimeDataWriter.java deleted file mode 100644 index a2b2d29..0000000 --- a/src/org/torproject/onionoo/UptimeDataWriter.java +++ /dev/null @@ -1,469 +0,0 @@ -/* Copyright 2014 The Tor Project - * See LICENSE for licensing information */ -package org.torproject.onionoo; - -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TimeZone; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.torproject.descriptor.BridgeNetworkStatus; -import org.torproject.descriptor.Descriptor; -import org.torproject.descriptor.NetworkStatusEntry; -import org.torproject.descriptor.RelayNetworkStatusConsensus; - -public class UptimeDataWriter implements DescriptorListener, - StatusUpdater, FingerprintListener, DocumentWriter { - - private DescriptorSource descriptorSource; - - private DocumentStore documentStore; - - private long now; - - public UptimeDataWriter(DescriptorSource descriptorSource, - DocumentStore documentStore, Time time) { - this.descriptorSource = descriptorSource; - this.documentStore = documentStore; - this.now = time.currentTimeMillis(); - this.registerDescriptorListeners(); - this.registerFingerprintListeners(); - } - - private void registerDescriptorListeners() { - this.descriptorSource.registerDescriptorListener(this, - DescriptorType.RELAY_CONSENSUSES); - this.descriptorSource.registerDescriptorListener(this, - DescriptorType.BRIDGE_STATUSES); - } - - public void registerFingerprintListeners() { - this.descriptorSource.registerFingerprintListener(this, - DescriptorType.RELAY_CONSENSUSES); - this.descriptorSource.registerFingerprintListener(this, - DescriptorType.BRIDGE_STATUSES); - } - - public void processDescriptor(Descriptor descriptor, boolean relay) { - if (descriptor instanceof RelayNetworkStatusConsensus) { - this.processRelayNetworkStatusConsensus( - (RelayNetworkStatusConsensus) descriptor); - } else if (descriptor instanceof BridgeNetworkStatus) { - this.processBridgeNetworkStatus( - (BridgeNetworkStatus) descriptor); - } - } - - private SortedSet<Long> newRelayStatuses = new TreeSet<Long>(), - newBridgeStatuses = new TreeSet<Long>(); - private SortedMap<String, SortedSet<Long>> - newRunningRelays = new TreeMap<String, SortedSet<Long>>(), - newRunningBridges = new TreeMap<String, SortedSet<Long>>(); - - private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L; - - private void processRelayNetworkStatusConsensus( - RelayNetworkStatusConsensus consensus) { - SortedSet<String> fingerprints = new TreeSet<String>(); - for (NetworkStatusEntry entry : - consensus.getStatusEntries().values()) { - if (entry.getFlags().contains("Running")) { - fingerprints.add(entry.getFingerprint()); - } - } - if (!fingerprints.isEmpty()) { - long dateHourMillis = (consensus.getValidAfterMillis() - / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS; - for (String fingerprint : fingerprints) { - if (!this.newRunningRelays.containsKey(fingerprint)) { - this.newRunningRelays.put(fingerprint, new TreeSet<Long>()); - } - this.newRunningRelays.get(fingerprint).add(dateHourMillis); - } - this.newRelayStatuses.add(dateHourMillis); - } - } - - private void processBridgeNetworkStatus(BridgeNetworkStatus status) { - SortedSet<String> fingerprints = new TreeSet<String>(); - for (NetworkStatusEntry entry : - status.getStatusEntries().values()) { - if (entry.getFlags().contains("Running")) { - fingerprints.add(entry.getFingerprint()); - } - } - if (!fingerprints.isEmpty()) { - long dateHourMillis = (status.getPublishedMillis() - / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS; - for (String fingerprint : fingerprints) { - if (!this.newRunningBridges.containsKey(fingerprint)) { - this.newRunningBridges.put(fingerprint, new TreeSet<Long>()); - } - this.newRunningBridges.get(fingerprint).add(dateHourMillis); - } - this.newBridgeStatuses.add(dateHourMillis); - } - } - - public void updateStatuses() { - for (Map.Entry<String, SortedSet<Long>> e : - this.newRunningRelays.entrySet()) { - this.updateStatus(true, e.getKey(), e.getValue()); - } - this.updateStatus(true, null, this.newRelayStatuses); - for (Map.Entry<String, SortedSet<Long>> e : - this.newRunningBridges.entrySet()) { - this.updateStatus(false, e.getKey(), e.getValue()); - } - this.updateStatus(false, null, this.newBridgeStatuses); - Logger.printStatusTime("Updated uptime status files"); - } - - private void updateStatus(boolean relay, String fingerprint, - SortedSet<Long> newUptimeHours) { - UptimeStatus uptimeStatus = this.readHistory(fingerprint); - if (uptimeStatus == null) { - uptimeStatus = new UptimeStatus(); - } - this.addToHistory(uptimeStatus, relay, newUptimeHours); - this.compressHistory(uptimeStatus); - this.writeHistory(fingerprint, uptimeStatus); - } - - private UptimeStatus readHistory(String fingerprint) { - return fingerprint == null ? - documentStore.retrieve(UptimeStatus.class, true) : - documentStore.retrieve(UptimeStatus.class, true, fingerprint); - } - - private void addToHistory(UptimeStatus uptimeStatus, boolean relay, - SortedSet<Long> newIntervals) { - SortedSet<UptimeHistory> history = uptimeStatus.history; - for (long startMillis : newIntervals) { - UptimeHistory interval = new UptimeHistory(relay, startMillis, 1); - if (!history.headSet(interval).isEmpty()) { - UptimeHistory prev = history.headSet(interval).last(); - if (prev.relay == interval.relay && - prev.startMillis + ONE_HOUR_MILLIS * prev.uptimeHours > - interval.startMillis) { - continue; - } - } - if (!history.tailSet(interval).isEmpty()) { - UptimeHistory next = history.tailSet(interval).first(); - if (next.relay == interval.relay && - next.startMillis < interval.startMillis + ONE_HOUR_MILLIS) { - continue; - } - } - history.add(interval); - } - } - - private void compressHistory(UptimeStatus uptimeStatus) { - SortedSet<UptimeHistory> history = uptimeStatus.history; - SortedSet<UptimeHistory> compressedHistory = - new TreeSet<UptimeHistory>(); - UptimeHistory lastInterval = null; - for (UptimeHistory interval : history) { - if (lastInterval != null && - lastInterval.startMillis + ONE_HOUR_MILLIS - * lastInterval.uptimeHours == interval.startMillis && - lastInterval.relay == interval.relay) { - lastInterval.addUptime(interval); - } else { - if (lastInterval != null) { - compressedHistory.add(lastInterval); - } - lastInterval = interval; - } - } - if (lastInterval != null) { - compressedHistory.add(lastInterval); - } - uptimeStatus.history = compressedHistory; - } - - private void writeHistory(String fingerprint, - UptimeStatus uptimeStatus) { - if (fingerprint == null) { - this.documentStore.store(uptimeStatus); - } else { - this.documentStore.store(uptimeStatus, fingerprint); - } - } - - private SortedSet<String> newRelayFingerprints = new TreeSet<String>(), - newBridgeFingerprints = new TreeSet<String>(); - - public void processFingerprints(SortedSet<String> fingerprints, - boolean relay) { - if (relay) { - this.newRelayFingerprints.addAll(fingerprints); - } else { - this.newBridgeFingerprints.addAll(fingerprints); - } - } - - public void writeDocuments() { - SortedSet<UptimeHistory> - knownRelayStatuses = new TreeSet<UptimeHistory>(), - knownBridgeStatuses = new TreeSet<UptimeHistory>(); - UptimeStatus uptimeStatus = this.documentStore.retrieve( - UptimeStatus.class, true); - if (uptimeStatus == null) { - return; - } - SortedSet<UptimeHistory> knownStatuses = uptimeStatus.history; - for (UptimeHistory status : knownStatuses) { - if (status.relay) { - knownRelayStatuses.add(status); - } else { - knownBridgeStatuses.add(status); - } - } - for (String fingerprint : this.newRelayFingerprints) { - this.updateDocument(true, fingerprint, knownRelayStatuses); - } - for (String fingerprint : this.newBridgeFingerprints) { - this.updateDocument(false, fingerprint, knownBridgeStatuses); - } - Logger.printStatusTime("Wrote uptime document files"); - } - - private void updateDocument(boolean relay, String fingerprint, - SortedSet<UptimeHistory> knownStatuses) { - UptimeStatus uptimeStatus = this.documentStore.retrieve( - UptimeStatus.class, true, fingerprint); - if (uptimeStatus != null) { - SortedSet<UptimeHistory> history = uptimeStatus.history; - UptimeDocument uptimeDocument = new UptimeDocument(); - uptimeDocument.documentString = this.formatHistoryString(relay, - fingerprint, history, knownStatuses); - this.documentStore.store(uptimeDocument, fingerprint); - } - } - - private String[] graphNames = new String[] { - "1_week", - "1_month", - "3_months", - "1_year", - "5_years" }; - - private long[] graphIntervals = new long[] { - 7L * 24L * 60L * 60L * 1000L, - 31L * 24L * 60L * 60L * 1000L, - 92L * 24L * 60L * 60L * 1000L, - 366L * 24L * 60L * 60L * 1000L, - 5L * 366L * 24L * 60L * 60L * 1000L }; - - private long[] dataPointIntervals = new long[] { - 60L * 60L * 1000L, - 4L * 60L * 60L * 1000L, - 12L * 60L * 60L * 1000L, - 2L * 24L * 60L * 60L * 1000L, - 10L * 24L * 60L * 60L * 1000L }; - - private String formatHistoryString(boolean relay, String fingerprint, - SortedSet<UptimeHistory> history, - SortedSet<UptimeHistory> knownStatuses) { - StringBuilder sb = new StringBuilder(); - sb.append("{"fingerprint":"" + fingerprint + """); - sb.append(",\n"uptime":{"); - int graphIntervalsWritten = 0; - for (int graphIntervalIndex = 0; graphIntervalIndex < - this.graphIntervals.length; graphIntervalIndex++) { - String timeline = this.formatTimeline(graphIntervalIndex, relay, - history, knownStatuses); - if (timeline != null) { - sb.append((graphIntervalsWritten++ > 0 ? "," : "") + "\n" - + timeline); - } - } - sb.append("}"); - sb.append("\n}\n"); - return sb.toString(); - } - - private String formatTimeline(int graphIntervalIndex, boolean relay, - SortedSet<UptimeHistory> history, - SortedSet<UptimeHistory> knownStatuses) { - String graphName = this.graphNames[graphIntervalIndex]; - long graphInterval = this.graphIntervals[graphIntervalIndex]; - long dataPointInterval = - this.dataPointIntervals[graphIntervalIndex]; - int dataPointIntervalHours = (int) (dataPointInterval - / ONE_HOUR_MILLIS); - List<Integer> statusDataPoints = new ArrayList<Integer>(); - long intervalStartMillis = ((this.now - graphInterval) - / dataPointInterval) * dataPointInterval; - int statusHours = 0; - for (UptimeHistory hist : knownStatuses) { - if (hist.relay != relay) { - continue; - } - long histEndMillis = hist.startMillis + ONE_HOUR_MILLIS - * hist.uptimeHours; - if (histEndMillis < intervalStartMillis) { - continue; - } - while (hist.startMillis >= intervalStartMillis - + dataPointInterval) { - statusDataPoints.add(statusHours * 5 > dataPointIntervalHours - ? statusHours : -1); - statusHours = 0; - intervalStartMillis += dataPointInterval; - } - while (histEndMillis >= intervalStartMillis + dataPointInterval) { - statusHours += (int) ((intervalStartMillis + dataPointInterval - - Math.max(hist.startMillis, intervalStartMillis)) - / ONE_HOUR_MILLIS); - statusDataPoints.add(statusHours * 5 > dataPointIntervalHours - ? statusHours : -1); - statusHours = 0; - intervalStartMillis += dataPointInterval; - } - statusHours += (int) ((histEndMillis - Math.max(hist.startMillis, - intervalStartMillis)) / ONE_HOUR_MILLIS); - } - statusDataPoints.add(statusHours * 5 > dataPointIntervalHours - ? statusHours : -1); - List<Integer> uptimeDataPoints = new ArrayList<Integer>(); - intervalStartMillis = ((this.now - graphInterval) - / dataPointInterval) * dataPointInterval; - int uptimeHours = 0; - long firstStatusStartMillis = -1L; - for (UptimeHistory hist : history) { - if (hist.relay != relay) { - continue; - } - if (firstStatusStartMillis < 0L) { - firstStatusStartMillis = hist.startMillis; - } - long histEndMillis = hist.startMillis + ONE_HOUR_MILLIS - * hist.uptimeHours; - if (histEndMillis < intervalStartMillis) { - continue; - } - while (hist.startMillis >= intervalStartMillis - + dataPointInterval) { - if (firstStatusStartMillis < intervalStartMillis - + dataPointInterval) { - uptimeDataPoints.add(uptimeHours); - } else { - uptimeDataPoints.add(-1); - } - uptimeHours = 0; - intervalStartMillis += dataPointInterval; - } - while (histEndMillis >= intervalStartMillis + dataPointInterval) { - uptimeHours += (int) ((intervalStartMillis + dataPointInterval - - Math.max(hist.startMillis, intervalStartMillis)) - / ONE_HOUR_MILLIS); - uptimeDataPoints.add(uptimeHours); - uptimeHours = 0; - intervalStartMillis += dataPointInterval; - } - uptimeHours += (int) ((histEndMillis - Math.max(hist.startMillis, - intervalStartMillis)) / ONE_HOUR_MILLIS); - } - uptimeDataPoints.add(uptimeHours); - List<Double> dataPoints = new ArrayList<Double>(); - for (int dataPointIndex = 0; dataPointIndex < statusDataPoints.size(); - dataPointIndex++) { - if (dataPointIndex >= uptimeDataPoints.size()) { - dataPoints.add(0.0); - } else if (uptimeDataPoints.get(dataPointIndex) >= 0 && - statusDataPoints.get(dataPointIndex) > 0) { - dataPoints.add(((double) uptimeDataPoints.get(dataPointIndex)) - / ((double) statusDataPoints.get(dataPointIndex))); - } else { - dataPoints.add(-1.0); - } - } - int firstNonNullIndex = -1, lastNonNullIndex = -1; - for (int dataPointIndex = 0; dataPointIndex < dataPoints.size(); - dataPointIndex++) { - double dataPoint = dataPoints.get(dataPointIndex); - if (dataPoint >= 0.0) { - if (firstNonNullIndex < 0) { - firstNonNullIndex = dataPointIndex; - } - lastNonNullIndex = dataPointIndex; - } - } - if (firstNonNullIndex < 0) { - return null; - } - long firstDataPointMillis = (((this.now - graphInterval) - / dataPointInterval) + firstNonNullIndex) - * dataPointInterval + dataPointInterval / 2L; - if (graphIntervalIndex > 0 && firstDataPointMillis >= - this.now - graphIntervals[graphIntervalIndex - 1]) { - /* Skip uptime history object, because it doesn't contain - * anything new that wasn't already contained in the last - * uptime history object(s). */ - return null; - } - long lastDataPointMillis = firstDataPointMillis - + (lastNonNullIndex - firstNonNullIndex) * dataPointInterval; - double factor = 1.0 / 999.0; - int count = lastNonNullIndex - firstNonNullIndex + 1; - StringBuilder sb = new StringBuilder(); - SimpleDateFormat dateTimeFormat = new SimpleDateFormat( - "yyyy-MM-dd HH:mm:ss"); - dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - sb.append(""" + graphName + "":{" - + ""first":"" + dateTimeFormat.format(firstDataPointMillis) - + "","last":"" + dateTimeFormat.format(lastDataPointMillis) - + "","interval":" + String.valueOf(dataPointInterval / 1000L) - + ","factor":" + String.format(Locale.US, "%.9f", factor) - + ","count":" + String.valueOf(count) + ","values":["); - int dataPointsWritten = 0, previousNonNullIndex = -2; - boolean foundTwoAdjacentDataPoints = false; - for (int dataPointIndex = firstNonNullIndex; dataPointIndex <= - lastNonNullIndex; dataPointIndex++) { - double dataPoint = dataPoints.get(dataPointIndex); - if (dataPoint >= 0.0) { - if (dataPointIndex - previousNonNullIndex == 1) { - foundTwoAdjacentDataPoints = true; - } - previousNonNullIndex = dataPointIndex; - } - sb.append((dataPointsWritten++ > 0 ? "," : "") - + (dataPoint < -0.5 ? "null" : - String.valueOf((long) (dataPoint * 999.0)))); - } - sb.append("]}"); - if (foundTwoAdjacentDataPoints) { - return sb.toString(); - } else { - return null; - } - } - - public String getStatsString() { - StringBuilder sb = new StringBuilder(); - sb.append(" " + Logger.formatDecimalNumber( - this.newRelayStatuses.size()) + " hours of relay uptimes " - + "processed\n"); - sb.append(" " + Logger.formatDecimalNumber( - this.newBridgeStatuses.size()) + " hours of bridge uptimes " - + "processed\n"); - sb.append(" " + Logger.formatDecimalNumber( - this.newRunningRelays.size() + this.newRunningBridges.size()) - + " uptime status files updated\n"); - sb.append(" " + Logger.formatDecimalNumber( - this.newRunningRelays.size() + this.newRunningBridges.size()) - + " uptime document files updated\n"); - return sb.toString(); - } -} - diff --git a/src/org/torproject/onionoo/UptimeDocumentWriter.java b/src/org/torproject/onionoo/UptimeDocumentWriter.java new file mode 100644 index 0000000..5b03153 --- /dev/null +++ b/src/org/torproject/onionoo/UptimeDocumentWriter.java @@ -0,0 +1,299 @@ +/* Copyright 2014 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.onionoo; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.SortedSet; +import java.util.TimeZone; +import java.util.TreeSet; + +public class UptimeDocumentWriter implements FingerprintListener, + DocumentWriter { + + private DescriptorSource descriptorSource; + + private DocumentStore documentStore; + + private long now; + + public UptimeDocumentWriter(DescriptorSource descriptorSource, + DocumentStore documentStore, Time time) { + this.descriptorSource = descriptorSource; + this.documentStore = documentStore; + this.now = time.currentTimeMillis(); + this.registerFingerprintListeners(); + } + + public void registerFingerprintListeners() { + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.RELAY_CONSENSUSES); + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.BRIDGE_STATUSES); + } + + private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L; + + private SortedSet<String> newRelayFingerprints = new TreeSet<String>(), + newBridgeFingerprints = new TreeSet<String>(); + + public void processFingerprints(SortedSet<String> fingerprints, + boolean relay) { + if (relay) { + this.newRelayFingerprints.addAll(fingerprints); + } else { + this.newBridgeFingerprints.addAll(fingerprints); + } + } + + public void writeDocuments() { + SortedSet<UptimeHistory> + knownRelayStatuses = new TreeSet<UptimeHistory>(), + knownBridgeStatuses = new TreeSet<UptimeHistory>(); + UptimeStatus uptimeStatus = this.documentStore.retrieve( + UptimeStatus.class, true); + if (uptimeStatus == null) { + return; + } + SortedSet<UptimeHistory> knownStatuses = uptimeStatus.history; + for (UptimeHistory status : knownStatuses) { + if (status.relay) { + knownRelayStatuses.add(status); + } else { + knownBridgeStatuses.add(status); + } + } + for (String fingerprint : this.newRelayFingerprints) { + this.updateDocument(true, fingerprint, knownRelayStatuses); + } + for (String fingerprint : this.newBridgeFingerprints) { + this.updateDocument(false, fingerprint, knownBridgeStatuses); + } + Logger.printStatusTime("Wrote uptime document files"); + } + + private int writtenDocuments = 0; + + private void updateDocument(boolean relay, String fingerprint, + SortedSet<UptimeHistory> knownStatuses) { + UptimeStatus uptimeStatus = this.documentStore.retrieve( + UptimeStatus.class, true, fingerprint); + if (uptimeStatus != null) { + SortedSet<UptimeHistory> history = uptimeStatus.history; + UptimeDocument uptimeDocument = new UptimeDocument(); + uptimeDocument.documentString = this.formatHistoryString(relay, + fingerprint, history, knownStatuses); + this.documentStore.store(uptimeDocument, fingerprint); + this.writtenDocuments++; + } + } + + private String[] graphNames = new String[] { + "1_week", + "1_month", + "3_months", + "1_year", + "5_years" }; + + private long[] graphIntervals = new long[] { + 7L * 24L * 60L * 60L * 1000L, + 31L * 24L * 60L * 60L * 1000L, + 92L * 24L * 60L * 60L * 1000L, + 366L * 24L * 60L * 60L * 1000L, + 5L * 366L * 24L * 60L * 60L * 1000L }; + + private long[] dataPointIntervals = new long[] { + 60L * 60L * 1000L, + 4L * 60L * 60L * 1000L, + 12L * 60L * 60L * 1000L, + 2L * 24L * 60L * 60L * 1000L, + 10L * 24L * 60L * 60L * 1000L }; + + private String formatHistoryString(boolean relay, String fingerprint, + SortedSet<UptimeHistory> history, + SortedSet<UptimeHistory> knownStatuses) { + StringBuilder sb = new StringBuilder(); + sb.append("{"fingerprint":"" + fingerprint + """); + sb.append(",\n"uptime":{"); + int graphIntervalsWritten = 0; + for (int graphIntervalIndex = 0; graphIntervalIndex < + this.graphIntervals.length; graphIntervalIndex++) { + String timeline = this.formatTimeline(graphIntervalIndex, relay, + history, knownStatuses); + if (timeline != null) { + sb.append((graphIntervalsWritten++ > 0 ? "," : "") + "\n" + + timeline); + } + } + sb.append("}"); + sb.append("\n}\n"); + return sb.toString(); + } + + private String formatTimeline(int graphIntervalIndex, boolean relay, + SortedSet<UptimeHistory> history, + SortedSet<UptimeHistory> knownStatuses) { + String graphName = this.graphNames[graphIntervalIndex]; + long graphInterval = this.graphIntervals[graphIntervalIndex]; + long dataPointInterval = + this.dataPointIntervals[graphIntervalIndex]; + int dataPointIntervalHours = (int) (dataPointInterval + / ONE_HOUR_MILLIS); + List<Integer> statusDataPoints = new ArrayList<Integer>(); + long intervalStartMillis = ((this.now - graphInterval) + / dataPointInterval) * dataPointInterval; + int statusHours = 0; + for (UptimeHistory hist : knownStatuses) { + if (hist.relay != relay) { + continue; + } + long histEndMillis = hist.startMillis + ONE_HOUR_MILLIS + * hist.uptimeHours; + if (histEndMillis < intervalStartMillis) { + continue; + } + while (hist.startMillis >= intervalStartMillis + + dataPointInterval) { + statusDataPoints.add(statusHours * 5 > dataPointIntervalHours + ? statusHours : -1); + statusHours = 0; + intervalStartMillis += dataPointInterval; + } + while (histEndMillis >= intervalStartMillis + dataPointInterval) { + statusHours += (int) ((intervalStartMillis + dataPointInterval + - Math.max(hist.startMillis, intervalStartMillis)) + / ONE_HOUR_MILLIS); + statusDataPoints.add(statusHours * 5 > dataPointIntervalHours + ? statusHours : -1); + statusHours = 0; + intervalStartMillis += dataPointInterval; + } + statusHours += (int) ((histEndMillis - Math.max(hist.startMillis, + intervalStartMillis)) / ONE_HOUR_MILLIS); + } + statusDataPoints.add(statusHours * 5 > dataPointIntervalHours + ? statusHours : -1); + List<Integer> uptimeDataPoints = new ArrayList<Integer>(); + intervalStartMillis = ((this.now - graphInterval) + / dataPointInterval) * dataPointInterval; + int uptimeHours = 0; + long firstStatusStartMillis = -1L; + for (UptimeHistory hist : history) { + if (hist.relay != relay) { + continue; + } + if (firstStatusStartMillis < 0L) { + firstStatusStartMillis = hist.startMillis; + } + long histEndMillis = hist.startMillis + ONE_HOUR_MILLIS + * hist.uptimeHours; + if (histEndMillis < intervalStartMillis) { + continue; + } + while (hist.startMillis >= intervalStartMillis + + dataPointInterval) { + if (firstStatusStartMillis < intervalStartMillis + + dataPointInterval) { + uptimeDataPoints.add(uptimeHours); + } else { + uptimeDataPoints.add(-1); + } + uptimeHours = 0; + intervalStartMillis += dataPointInterval; + } + while (histEndMillis >= intervalStartMillis + dataPointInterval) { + uptimeHours += (int) ((intervalStartMillis + dataPointInterval + - Math.max(hist.startMillis, intervalStartMillis)) + / ONE_HOUR_MILLIS); + uptimeDataPoints.add(uptimeHours); + uptimeHours = 0; + intervalStartMillis += dataPointInterval; + } + uptimeHours += (int) ((histEndMillis - Math.max(hist.startMillis, + intervalStartMillis)) / ONE_HOUR_MILLIS); + } + uptimeDataPoints.add(uptimeHours); + List<Double> dataPoints = new ArrayList<Double>(); + for (int dataPointIndex = 0; dataPointIndex < statusDataPoints.size(); + dataPointIndex++) { + if (dataPointIndex >= uptimeDataPoints.size()) { + dataPoints.add(0.0); + } else if (uptimeDataPoints.get(dataPointIndex) >= 0 && + statusDataPoints.get(dataPointIndex) > 0) { + dataPoints.add(((double) uptimeDataPoints.get(dataPointIndex)) + / ((double) statusDataPoints.get(dataPointIndex))); + } else { + dataPoints.add(-1.0); + } + } + int firstNonNullIndex = -1, lastNonNullIndex = -1; + for (int dataPointIndex = 0; dataPointIndex < dataPoints.size(); + dataPointIndex++) { + double dataPoint = dataPoints.get(dataPointIndex); + if (dataPoint >= 0.0) { + if (firstNonNullIndex < 0) { + firstNonNullIndex = dataPointIndex; + } + lastNonNullIndex = dataPointIndex; + } + } + if (firstNonNullIndex < 0) { + return null; + } + long firstDataPointMillis = (((this.now - graphInterval) + / dataPointInterval) + firstNonNullIndex) + * dataPointInterval + dataPointInterval / 2L; + if (graphIntervalIndex > 0 && firstDataPointMillis >= + this.now - graphIntervals[graphIntervalIndex - 1]) { + /* Skip uptime history object, because it doesn't contain + * anything new that wasn't already contained in the last + * uptime history object(s). */ + return null; + } + long lastDataPointMillis = firstDataPointMillis + + (lastNonNullIndex - firstNonNullIndex) * dataPointInterval; + double factor = 1.0 / 999.0; + int count = lastNonNullIndex - firstNonNullIndex + 1; + StringBuilder sb = new StringBuilder(); + SimpleDateFormat dateTimeFormat = new SimpleDateFormat( + "yyyy-MM-dd HH:mm:ss"); + dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + sb.append(""" + graphName + "":{" + + ""first":"" + dateTimeFormat.format(firstDataPointMillis) + + "","last":"" + dateTimeFormat.format(lastDataPointMillis) + + "","interval":" + String.valueOf(dataPointInterval / 1000L) + + ","factor":" + String.format(Locale.US, "%.9f", factor) + + ","count":" + String.valueOf(count) + ","values":["); + int dataPointsWritten = 0, previousNonNullIndex = -2; + boolean foundTwoAdjacentDataPoints = false; + for (int dataPointIndex = firstNonNullIndex; dataPointIndex <= + lastNonNullIndex; dataPointIndex++) { + double dataPoint = dataPoints.get(dataPointIndex); + if (dataPoint >= 0.0) { + if (dataPointIndex - previousNonNullIndex == 1) { + foundTwoAdjacentDataPoints = true; + } + previousNonNullIndex = dataPointIndex; + } + sb.append((dataPointsWritten++ > 0 ? "," : "") + + (dataPoint < -0.5 ? "null" : + String.valueOf((long) (dataPoint * 999.0)))); + } + sb.append("]}"); + if (foundTwoAdjacentDataPoints) { + return sb.toString(); + } else { + return null; + } + } + + public String getStatsString() { + StringBuilder sb = new StringBuilder(); + sb.append(" " + Logger.formatDecimalNumber(this.writtenDocuments) + + " uptime document files written\n"); + return sb.toString(); + } +} + diff --git a/src/org/torproject/onionoo/UptimeStatusUpdater.java b/src/org/torproject/onionoo/UptimeStatusUpdater.java new file mode 100644 index 0000000..30ab703 --- /dev/null +++ b/src/org/torproject/onionoo/UptimeStatusUpdater.java @@ -0,0 +1,200 @@ +/* Copyright 2014 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.onionoo; + +import java.util.Map; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.torproject.descriptor.BridgeNetworkStatus; +import org.torproject.descriptor.Descriptor; +import org.torproject.descriptor.NetworkStatusEntry; +import org.torproject.descriptor.RelayNetworkStatusConsensus; + +public class UptimeStatusUpdater implements DescriptorListener, + StatusUpdater { + + private DescriptorSource descriptorSource; + + private DocumentStore documentStore; + + public UptimeStatusUpdater(DescriptorSource descriptorSource, + DocumentStore documentStore) { + this.descriptorSource = descriptorSource; + this.documentStore = documentStore; + this.registerDescriptorListeners(); + } + + private void registerDescriptorListeners() { + this.descriptorSource.registerDescriptorListener(this, + DescriptorType.RELAY_CONSENSUSES); + this.descriptorSource.registerDescriptorListener(this, + DescriptorType.BRIDGE_STATUSES); + } + + public void processDescriptor(Descriptor descriptor, boolean relay) { + if (descriptor instanceof RelayNetworkStatusConsensus) { + this.processRelayNetworkStatusConsensus( + (RelayNetworkStatusConsensus) descriptor); + } else if (descriptor instanceof BridgeNetworkStatus) { + this.processBridgeNetworkStatus( + (BridgeNetworkStatus) descriptor); + } + } + + private SortedSet<Long> newRelayStatuses = new TreeSet<Long>(), + newBridgeStatuses = new TreeSet<Long>(); + private SortedMap<String, SortedSet<Long>> + newRunningRelays = new TreeMap<String, SortedSet<Long>>(), + newRunningBridges = new TreeMap<String, SortedSet<Long>>(); + + private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L; + + private void processRelayNetworkStatusConsensus( + RelayNetworkStatusConsensus consensus) { + SortedSet<String> fingerprints = new TreeSet<String>(); + for (NetworkStatusEntry entry : + consensus.getStatusEntries().values()) { + if (entry.getFlags().contains("Running")) { + fingerprints.add(entry.getFingerprint()); + } + } + if (!fingerprints.isEmpty()) { + long dateHourMillis = (consensus.getValidAfterMillis() + / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS; + for (String fingerprint : fingerprints) { + if (!this.newRunningRelays.containsKey(fingerprint)) { + this.newRunningRelays.put(fingerprint, new TreeSet<Long>()); + } + this.newRunningRelays.get(fingerprint).add(dateHourMillis); + } + this.newRelayStatuses.add(dateHourMillis); + } + } + + private void processBridgeNetworkStatus(BridgeNetworkStatus status) { + SortedSet<String> fingerprints = new TreeSet<String>(); + for (NetworkStatusEntry entry : + status.getStatusEntries().values()) { + if (entry.getFlags().contains("Running")) { + fingerprints.add(entry.getFingerprint()); + } + } + if (!fingerprints.isEmpty()) { + long dateHourMillis = (status.getPublishedMillis() + / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS; + for (String fingerprint : fingerprints) { + if (!this.newRunningBridges.containsKey(fingerprint)) { + this.newRunningBridges.put(fingerprint, new TreeSet<Long>()); + } + this.newRunningBridges.get(fingerprint).add(dateHourMillis); + } + this.newBridgeStatuses.add(dateHourMillis); + } + } + + public void updateStatuses() { + for (Map.Entry<String, SortedSet<Long>> e : + this.newRunningRelays.entrySet()) { + this.updateStatus(true, e.getKey(), e.getValue()); + } + this.updateStatus(true, null, this.newRelayStatuses); + for (Map.Entry<String, SortedSet<Long>> e : + this.newRunningBridges.entrySet()) { + this.updateStatus(false, e.getKey(), e.getValue()); + } + this.updateStatus(false, null, this.newBridgeStatuses); + Logger.printStatusTime("Updated uptime status files"); + } + + private void updateStatus(boolean relay, String fingerprint, + SortedSet<Long> newUptimeHours) { + UptimeStatus uptimeStatus = this.readHistory(fingerprint); + if (uptimeStatus == null) { + uptimeStatus = new UptimeStatus(); + } + this.addToHistory(uptimeStatus, relay, newUptimeHours); + this.compressHistory(uptimeStatus); + this.writeHistory(fingerprint, uptimeStatus); + } + + private UptimeStatus readHistory(String fingerprint) { + return fingerprint == null ? + documentStore.retrieve(UptimeStatus.class, true) : + documentStore.retrieve(UptimeStatus.class, true, fingerprint); + } + + private void addToHistory(UptimeStatus uptimeStatus, boolean relay, + SortedSet<Long> newIntervals) { + SortedSet<UptimeHistory> history = uptimeStatus.history; + for (long startMillis : newIntervals) { + UptimeHistory interval = new UptimeHistory(relay, startMillis, 1); + if (!history.headSet(interval).isEmpty()) { + UptimeHistory prev = history.headSet(interval).last(); + if (prev.relay == interval.relay && + prev.startMillis + ONE_HOUR_MILLIS * prev.uptimeHours > + interval.startMillis) { + continue; + } + } + if (!history.tailSet(interval).isEmpty()) { + UptimeHistory next = history.tailSet(interval).first(); + if (next.relay == interval.relay && + next.startMillis < interval.startMillis + ONE_HOUR_MILLIS) { + continue; + } + } + history.add(interval); + } + } + + private void compressHistory(UptimeStatus uptimeStatus) { + SortedSet<UptimeHistory> history = uptimeStatus.history; + SortedSet<UptimeHistory> compressedHistory = + new TreeSet<UptimeHistory>(); + UptimeHistory lastInterval = null; + for (UptimeHistory interval : history) { + if (lastInterval != null && + lastInterval.startMillis + ONE_HOUR_MILLIS + * lastInterval.uptimeHours == interval.startMillis && + lastInterval.relay == interval.relay) { + lastInterval.addUptime(interval); + } else { + if (lastInterval != null) { + compressedHistory.add(lastInterval); + } + lastInterval = interval; + } + } + if (lastInterval != null) { + compressedHistory.add(lastInterval); + } + uptimeStatus.history = compressedHistory; + } + + private void writeHistory(String fingerprint, + UptimeStatus uptimeStatus) { + if (fingerprint == null) { + this.documentStore.store(uptimeStatus); + } else { + this.documentStore.store(uptimeStatus, fingerprint); + } + } + + public String getStatsString() { + StringBuilder sb = new StringBuilder(); + sb.append(" " + Logger.formatDecimalNumber( + this.newRelayStatuses.size()) + " hours of relay uptimes " + + "processed\n"); + sb.append(" " + Logger.formatDecimalNumber( + this.newBridgeStatuses.size()) + " hours of bridge uptimes " + + "processed\n"); + sb.append(" " + Logger.formatDecimalNumber( + this.newRunningRelays.size() + this.newRunningBridges.size()) + + " uptime status files updated\n"); + return sb.toString(); + } +} +
tor-commits@lists.torproject.org