commit 2f305253463c3feb42aad4e8477f4aac199b026e Author: Karsten Loesing karsten.loesing@gmx.net Date: Thu Jul 11 14:18:06 2013 +0200
Avoid parsing server descriptors for weights histories.
Instead, store advertised bandwidths in weights status files. Prepares using a parse history for server descriptors. --- src/org/torproject/onionoo/Main.java | 4 + src/org/torproject/onionoo/WeightsDataWriter.java | 92 +++++++++++++++++---- 2 files changed, 79 insertions(+), 17 deletions(-)
diff --git a/src/org/torproject/onionoo/Main.java b/src/org/torproject/onionoo/Main.java index 9c02319..0d29ab5 100644 --- a/src/org/torproject/onionoo/Main.java +++ b/src/org/torproject/onionoo/Main.java @@ -92,6 +92,8 @@ public class Main { printStatus("Updating weights data."); wdw.setCurrentNodes(currentNodes); printStatusTime("Set current node fingerprints"); + wdw.updateWeightsHistories(); + printStatusTime("Updated weights histories"); wdw.writeWeightsDataFiles(); printStatusTime("Wrote weights data files"); // TODO Evaluate overhead of not deleting obsolete weights files. An @@ -99,6 +101,8 @@ public class Main { // which allows us to run ndw and wdw in parallel in the future. wdw.deleteObsoleteWeightsDataFiles(); printStatusTime("Deleted obsolete weights files"); + wdw.updateAdvertisedBandwidths(); + printStatusTime("Updated advertised bandwidths");
printStatus("Shutting down."); dso.writeHistoryFiles(); diff --git a/src/org/torproject/onionoo/WeightsDataWriter.java b/src/org/torproject/onionoo/WeightsDataWriter.java index 260cdc8..ba4482b 100644 --- a/src/org/torproject/onionoo/WeightsDataWriter.java +++ b/src/org/torproject/onionoo/WeightsDataWriter.java @@ -8,10 +8,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Scanner; +import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TimeZone; @@ -54,21 +56,25 @@ public class WeightsDataWriter implements DescriptorListener { } }
- public void setCurrentNodes( - SortedMap<String, NodeStatus> currentNodes) { - this.currentFingerprints.addAll(currentNodes.keySet()); + private Set<RelayNetworkStatusConsensus> consensuses = + new HashSet<RelayNetworkStatusConsensus>(); + + private void processRelayNetworkConsensus( + RelayNetworkStatusConsensus consensus) { + this.consensuses.add(consensus); }
+ private Set<String> updateAdvertisedBandwidths = + new HashSet<String>(); + + private Map<String, Set<String>> descriptorDigestsByFingerprint = + new HashMap<String, Set<String>>(); + private Map<String, Integer> advertisedBandwidths = new HashMap<String, Integer>();
private void processRelayServerDescriptor( ServerDescriptor serverDescriptor) { - /* Read advertised bandwidths of all server descriptors in - * in/relay-descriptors/server-descriptors/ to memory. Ideally, we'd - * skip descriptors that we read before and obtain their advertised - * bandwidths from some temp file. This approach should do for now, - * though. */ String digest = serverDescriptor.getServerDescriptorDigest(). toUpperCase(); int advertisedBandwidth = Math.min(Math.min( @@ -76,19 +82,34 @@ public class WeightsDataWriter implements DescriptorListener { serverDescriptor.getBandwidthObserved()), serverDescriptor.getBandwidthRate()); this.advertisedBandwidths.put(digest, advertisedBandwidth); + String fingerprint = serverDescriptor.getFingerprint(); + this.updateAdvertisedBandwidths.add(fingerprint); + if (!this.descriptorDigestsByFingerprint.containsKey( + fingerprint)) { + this.descriptorDigestsByFingerprint.put(fingerprint, + new HashSet<String>()); + } + this.descriptorDigestsByFingerprint.get(fingerprint).add(digest); }
- private void processRelayNetworkConsensus( - RelayNetworkStatusConsensus consensus) { - long validAfterMillis = consensus.getValidAfterMillis(), - freshUntilMillis = consensus.getFreshUntilMillis(); - SortedMap<String, double[]> pathSelectionWeights = - this.calculatePathSelectionProbabilities(consensus); - this.updateWeightsHistory(validAfterMillis, freshUntilMillis, - pathSelectionWeights); + public void setCurrentNodes( + SortedMap<String, NodeStatus> currentNodes) { + this.currentFingerprints.addAll(currentNodes.keySet()); }
- private static final int HISTORY_UPDATER_WORKERS_NUM = 4; + public void updateWeightsHistories() { + for (RelayNetworkStatusConsensus consensus : this.consensuses) { + long validAfterMillis = consensus.getValidAfterMillis(), + freshUntilMillis = consensus.getFreshUntilMillis(); + SortedMap<String, double[]> pathSelectionWeights = + this.calculatePathSelectionProbabilities(consensus); + this.updateWeightsHistory(validAfterMillis, freshUntilMillis, + pathSelectionWeights); + } + } + + // TODO Use 4 workers once threading problems are solved. + private static final int HISTORY_UPDATER_WORKERS_NUM = 1; private void updateWeightsHistory(long validAfterMillis, long freshUntilMillis, SortedMap<String, double[]> pathSelectionWeights) { @@ -189,6 +210,10 @@ public class WeightsDataWriter implements DescriptorListener { String serverDescriptorDigest = relay.getDescriptor(). toUpperCase(); double advertisedBandwidth = 0.0; + if (!this.advertisedBandwidths.containsKey( + serverDescriptorDigest)) { + this.readHistoryFromDisk(fingerprint); + } if (this.advertisedBandwidths.containsKey( serverDescriptorDigest)) { advertisedBandwidth = (double) this.advertisedBandwidths.get( @@ -279,6 +304,20 @@ public class WeightsDataWriter implements DescriptorListener { while (s.hasNextLine()) { String line = s.nextLine(); String[] parts = line.split(" "); + if (parts.length == 2) { + String descriptorDigest = parts[0]; + int advertisedBandwidth = Integer.parseInt(parts[1]); + if (!this.descriptorDigestsByFingerprint.containsKey( + fingerprint)) { + this.descriptorDigestsByFingerprint.put(fingerprint, + new HashSet<String>()); + } + this.descriptorDigestsByFingerprint.get(fingerprint).add( + descriptorDigest); + this.advertisedBandwidths.put(descriptorDigest, + advertisedBandwidth); + continue; + } if (parts.length != 9) { System.err.println("Illegal line '" + line + "' in weights " + "history for fingerprint '" + fingerprint + "'. " @@ -372,6 +411,17 @@ public class WeightsDataWriter implements DescriptorListener { private void writeHistoryToDisk(String fingerprint, SortedMap<long[], double[]> history) { StringBuilder sb = new StringBuilder(); + if (this.descriptorDigestsByFingerprint.containsKey(fingerprint)) { + for (String descriptorDigest : + this.descriptorDigestsByFingerprint.get(fingerprint)) { + if (this.advertisedBandwidths.containsKey(descriptorDigest)) { + int advertisedBandwidth = + this.advertisedBandwidths.get(descriptorDigest); + sb.append(descriptorDigest + " " + + String.valueOf(advertisedBandwidth) + "\n"); + } + } + } SimpleDateFormat dateTimeFormat = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); @@ -568,5 +618,13 @@ public class WeightsDataWriter implements DescriptorListener { this.documentStore.remove(WeightsDocument.class, fingerprint); } } + + public void updateAdvertisedBandwidths() { + for (String fingerprint : this.updateAdvertisedBandwidths) { + SortedMap<long[], double[]> history = + this.readHistoryFromDisk(fingerprint); + this.writeHistoryToDisk(fingerprint, history); + } + } }