commit ebff7354229b8b71a8a74aa6276de23fe09a4e99
Author: Karsten Loesing <karsten.loesing(a)gmx.net>
Date: Wed May 28 14:36:50 2014 +0200
Avoid storing full consensuses in memory.
We stored full consensuses in memory, so that we could compute weights
when all descriptors have been parsed. Obviously, this does not scale
with more than a few days of consensuses.
The new approach is to parse server descriptors *before* consensuses and
store advertised bandwidths in weights status files when doing so. Later,
when we parse consensuses, we are sure that if there's a server descriptor
for a relay, it'll be contained in the weights status file.
---
src/org/torproject/onionoo/DescriptorSource.java | 10 +-
.../torproject/onionoo/WeightsStatusUpdater.java | 117 +++++---------------
2 files changed, 33 insertions(+), 94 deletions(-)
diff --git a/src/org/torproject/onionoo/DescriptorSource.java b/src/org/torproject/onionoo/DescriptorSource.java
index 0692854..be17fc4 100644
--- a/src/org/torproject/onionoo/DescriptorSource.java
+++ b/src/org/torproject/onionoo/DescriptorSource.java
@@ -328,22 +328,24 @@ public class DescriptorSource {
}
public void readDescriptors() {
- this.readDescriptors(DescriptorType.RELAY_CONSENSUSES,
- DescriptorHistory.RELAY_CONSENSUS_HISTORY, true);
+ /* Careful when changing the order of parsing descriptor types! The
+ * various status updaters may base assumptions on this order. */
this.readDescriptors(DescriptorType.RELAY_SERVER_DESCRIPTORS,
DescriptorHistory.RELAY_SERVER_HISTORY, true);
this.readDescriptors(DescriptorType.RELAY_EXTRA_INFOS,
DescriptorHistory.RELAY_EXTRAINFO_HISTORY, true);
this.readDescriptors(DescriptorType.EXIT_LISTS,
DescriptorHistory.EXIT_LIST_HISTORY, true);
- this.readDescriptors(DescriptorType.BRIDGE_STATUSES,
- DescriptorHistory.BRIDGE_STATUS_HISTORY, false);
+ this.readDescriptors(DescriptorType.RELAY_CONSENSUSES,
+ DescriptorHistory.RELAY_CONSENSUS_HISTORY, true);
this.readDescriptors(DescriptorType.BRIDGE_SERVER_DESCRIPTORS,
DescriptorHistory.BRIDGE_SERVER_HISTORY, false);
this.readDescriptors(DescriptorType.BRIDGE_EXTRA_INFOS,
DescriptorHistory.BRIDGE_EXTRAINFO_HISTORY, false);
this.readDescriptors(DescriptorType.BRIDGE_POOL_ASSIGNMENTS,
DescriptorHistory.BRIDGE_POOLASSIGN_HISTORY, false);
+ this.readDescriptors(DescriptorType.BRIDGE_STATUSES,
+ DescriptorHistory.BRIDGE_STATUS_HISTORY, false);
}
private void readDescriptors(DescriptorType descriptorType,
diff --git a/src/org/torproject/onionoo/WeightsStatusUpdater.java b/src/org/torproject/onionoo/WeightsStatusUpdater.java
index d3ddff1..9b45f95 100644
--- a/src/org/torproject/onionoo/WeightsStatusUpdater.java
+++ b/src/org/torproject/onionoo/WeightsStatusUpdater.java
@@ -3,10 +3,7 @@
package org.torproject.onionoo;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
@@ -50,29 +47,19 @@ public class WeightsStatusUpdater implements DescriptorListener,
}
public void updateStatuses() {
- this.updateWeightsHistories();
- Logger.printStatusTime("Updated weights histories");
- this.updateWeightsStatuses();
- Logger.printStatusTime("Updated weights status files");
+ /* Nothing to do. */
}
- private Set<RelayNetworkStatusConsensus> consensuses =
- new HashSet<RelayNetworkStatusConsensus>();
-
private void processRelayNetworkConsensus(
RelayNetworkStatusConsensus consensus) {
- // TODO This does not scale for bulk imports.
- this.consensuses.add(consensus);
+ long validAfterMillis = consensus.getValidAfterMillis(),
+ freshUntilMillis = consensus.getFreshUntilMillis();
+ SortedMap<String, double[]> pathSelectionWeights =
+ this.calculatePathSelectionProbabilities(consensus);
+ this.updateWeightsHistory(validAfterMillis, freshUntilMillis,
+ pathSelectionWeights);
}
- private Set<String> updateWeightsStatuses = new HashSet<String>();
-
- private Map<String, Set<String>> descriptorDigestsByFingerprint =
- new HashMap<String, Set<String>>();
-
- private Map<String, Integer> advertisedBandwidths =
- new HashMap<String, Integer>();
-
private void processRelayServerDescriptor(
ServerDescriptor serverDescriptor) {
String digest = serverDescriptor.getServerDescriptorDigest().
@@ -81,27 +68,16 @@ public class WeightsStatusUpdater implements DescriptorListener,
serverDescriptor.getBandwidthBurst(),
serverDescriptor.getBandwidthObserved()),
serverDescriptor.getBandwidthRate());
- this.advertisedBandwidths.put(digest, advertisedBandwidth);
String fingerprint = serverDescriptor.getFingerprint();
- this.updateWeightsStatuses.add(fingerprint);
- if (!this.descriptorDigestsByFingerprint.containsKey(
- fingerprint)) {
- this.descriptorDigestsByFingerprint.put(fingerprint,
- new HashSet<String>());
- }
- this.descriptorDigestsByFingerprint.get(fingerprint).add(digest);
- }
-
- private void updateWeightsHistories() {
- for (RelayNetworkStatusConsensus consensus : this.consensuses) {
- long validAfterMillis = consensus.getValidAfterMillis(),
- freshUntilMillis = consensus.getFreshUntilMillis();
- SortedMap<String, double[]> pathSelectionWeights =
- this.calculatePathSelectionProbabilities(consensus);
- this.updateWeightsHistory(validAfterMillis, freshUntilMillis,
- pathSelectionWeights);
+ WeightsStatus weightsStatus = this.documentStore.retrieve(
+ WeightsStatus.class, true, fingerprint);
+ if (weightsStatus == null) {
+ weightsStatus = new WeightsStatus();
}
- }
+ weightsStatus.getAdvertisedBandwidths().put(digest,
+ advertisedBandwidth);
+ this.documentStore.store(weightsStatus, fingerprint);
+}
private void updateWeightsHistory(long validAfterMillis,
long freshUntilMillis,
@@ -164,29 +140,19 @@ public class WeightsStatusUpdater implements DescriptorListener,
boolean isExit = relay.getFlags().contains("Exit") &&
!relay.getFlags().contains("BadExit");
boolean isGuard = relay.getFlags().contains("Guard");
- String serverDescriptorDigest = relay.getDescriptor().
- toUpperCase();
+ String digest = relay.getDescriptor().toUpperCase();
double advertisedBandwidth = 0.0;
- if (!this.advertisedBandwidths.containsKey(
- serverDescriptorDigest)) {
- WeightsStatus weightsStatus = this.documentStore.retrieve(
- WeightsStatus.class, true, fingerprint);
- if (weightsStatus != null) {
- if (!this.descriptorDigestsByFingerprint.containsKey(
- fingerprint)) {
- this.descriptorDigestsByFingerprint.put(fingerprint,
- new HashSet<String>());
- }
- this.descriptorDigestsByFingerprint.get(fingerprint).addAll(
- weightsStatus.getAdvertisedBandwidths().keySet());
- this.advertisedBandwidths.putAll(
- weightsStatus.getAdvertisedBandwidths());
- }
- }
- if (this.advertisedBandwidths.containsKey(
- serverDescriptorDigest)) {
- advertisedBandwidth = (double) this.advertisedBandwidths.get(
- serverDescriptorDigest);
+ WeightsStatus weightsStatus = this.documentStore.retrieve(
+ WeightsStatus.class, true, fingerprint);
+ if (weightsStatus != null &&
+ weightsStatus.getAdvertisedBandwidths() != null &&
+ weightsStatus.getAdvertisedBandwidths().containsKey(digest)) {
+ /* Read advertised bandwidth from weights status file. Server
+ * descriptors are parsed before consensuses, so we're sure that
+ * if there's a server descriptor for this relay, it'll be
+ * contained in the weights status file by now. */
+ advertisedBandwidth =
+ (double) weightsStatus.getAdvertisedBandwidths().get(digest);
}
double consensusWeight = (double) relay.getBandwidth();
double guardWeight = (double) relay.getBandwidth();
@@ -252,24 +218,7 @@ public class WeightsStatusUpdater implements DescriptorListener,
history.tailMap(interval).firstKey()[0] >= freshUntilMillis)) {
history.put(interval, weights);
this.compressHistory(weightsStatus);
- this.addAdvertisedBandwidths(weightsStatus, fingerprint);
this.documentStore.store(weightsStatus, fingerprint);
- this.updateWeightsStatuses.remove(fingerprint);
- }
- }
-
- private void addAdvertisedBandwidths(WeightsStatus weightsStatus,
- String fingerprint) {
- if (this.descriptorDigestsByFingerprint.containsKey(fingerprint)) {
- for (String descriptorDigest :
- this.descriptorDigestsByFingerprint.get(fingerprint)) {
- if (this.advertisedBandwidths.containsKey(descriptorDigest)) {
- int advertisedBandwidth =
- this.advertisedBandwidths.get(descriptorDigest);
- weightsStatus.getAdvertisedBandwidths().put(descriptorDigest,
- advertisedBandwidth);
- }
- }
}
}
@@ -334,18 +283,6 @@ public class WeightsStatusUpdater implements DescriptorListener,
weightsStatus.setHistory(compressedHistory);
}
- private void updateWeightsStatuses() {
- for (String fingerprint : this.updateWeightsStatuses) {
- WeightsStatus weightsStatus = this.documentStore.retrieve(
- WeightsStatus.class, true, fingerprint);
- if (weightsStatus == null) {
- weightsStatus = new WeightsStatus();
- }
- this.addAdvertisedBandwidths(weightsStatus, fingerprint);
- this.documentStore.store(weightsStatus, fingerprint);
- }
- }
-
public String getStatsString() {
/* TODO Add statistics string. */
return null;