commit 64fecd018ad2373635c94fa167523ef476a349ef Author: Karsten Loesing karsten.loesing@gmx.net Date: Wed Mar 12 20:16:34 2014 +0100
Add fingerprint listener to learn which documents need updating.
Our data writers register at the descriptor source as descriptor listeners to be notified when new descriptors are parsed. They use these descriptors to update their internal status files and to know which output documents need to be rewritten.
We now add a second type of listener, the fingerprint listener, which can be used to learn relay or bridge fingerprints from new descriptors. Knowing the fingerprints is sufficient to rewrite output documents, because their content is only based on internal status files.
This will enable us to later split data writers into status updaters and document writers. --- .../torproject/onionoo/BandwidthDataWriter.java | 21 ++++- src/org/torproject/onionoo/ClientsDataWriter.java | 24 +++++- src/org/torproject/onionoo/DescriptorSource.java | 82 +++++++++++++++++++- src/org/torproject/onionoo/NodeDataWriter.java | 37 +++++++-- src/org/torproject/onionoo/UptimeDataWriter.java | 31 ++++++-- src/org/torproject/onionoo/WeightsDataWriter.java | 23 +++++- 6 files changed, 191 insertions(+), 27 deletions(-)
diff --git a/src/org/torproject/onionoo/BandwidthDataWriter.java b/src/org/torproject/onionoo/BandwidthDataWriter.java index 9f2f97e..67dc427 100644 --- a/src/org/torproject/onionoo/BandwidthDataWriter.java +++ b/src/org/torproject/onionoo/BandwidthDataWriter.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Locale; import java.util.Scanner; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TimeZone; import java.util.TreeMap;
@@ -35,7 +36,7 @@ import org.torproject.descriptor.ExtraInfoDescriptor; * 3 days as of publishing the document, but that's something clients can * work around. */ public class BandwidthDataWriter implements DataWriter, - DescriptorListener { + DescriptorListener, FingerprintListener {
private DescriptorSource descriptorSource;
@@ -54,21 +55,35 @@ public class BandwidthDataWriter implements DataWriter, this.dateTimeFormat.setLenient(false); this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); this.registerDescriptorListeners(); + this.registerFingerprintListeners(); }
private void registerDescriptorListeners() { - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, DescriptorType.RELAY_EXTRA_INFOS); - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, DescriptorType.BRIDGE_EXTRA_INFOS); }
+ private void registerFingerprintListeners() { + /* TODO Not used yet. + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.RELAY_EXTRA_INFOS); + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.BRIDGE_EXTRA_INFOS);*/ + } + public void processDescriptor(Descriptor descriptor, boolean relay) { if (descriptor instanceof ExtraInfoDescriptor) { this.parseDescriptor((ExtraInfoDescriptor) descriptor); } }
+ public void processFingerprints(SortedSet<String> fingerprints, + boolean relay) { + /* TODO Not used yet. */ + } + public void updateStatuses() { /* Status files are already updated while processing descriptors. */ } diff --git a/src/org/torproject/onionoo/ClientsDataWriter.java b/src/org/torproject/onionoo/ClientsDataWriter.java index b956f59..b045bfa 100644 --- a/src/org/torproject/onionoo/ClientsDataWriter.java +++ b/src/org/torproject/onionoo/ClientsDataWriter.java @@ -51,7 +51,8 @@ import org.torproject.descriptor.ExtraInfoDescriptor; * "transports":{"obfs2":0.4581}, * "versions":{"v4":1.0000}} */ -public class ClientsDataWriter implements DataWriter, DescriptorListener { +public class ClientsDataWriter implements DataWriter, DescriptorListener, + FingerprintListener {
private static class ResponseHistory implements Comparable<ResponseHistory> { @@ -206,10 +207,16 @@ public class ClientsDataWriter implements DataWriter, DescriptorListener { this.documentStore = documentStore; this.now = time.currentTimeMillis(); this.registerDescriptorListeners(); + this.registerFingerprintListeners(); }
private void registerDescriptorListeners() { - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, + DescriptorType.BRIDGE_EXTRA_INFOS); + } + + private void registerFingerprintListeners() { + this.descriptorSource.registerFingerprintListener(this, DescriptorType.BRIDGE_EXTRA_INFOS); }
@@ -402,8 +409,17 @@ public class ClientsDataWriter implements DataWriter, DescriptorListener { this.documentStore.store(clientsStatus, hashedFingerprint); }
+ public void processFingerprints(SortedSet<String> fingerprints, + boolean relay) { + if (!relay) { + this.updateDocuments.addAll(fingerprints); + } + } + + private SortedSet<String> updateDocuments = new TreeSet<String>(); + public void updateDocuments() { - for (String hashedFingerprint : this.newResponses.keySet()) { + for (String hashedFingerprint : this.updateDocuments) { SortedSet<ResponseHistory> history = this.readHistory(hashedFingerprint); ClientsDocument clientsDocument = new ClientsDocument(); @@ -631,7 +647,7 @@ public class ClientsDataWriter implements DataWriter, DescriptorListener { + Logger.formatDecimalNumber(this.newResponses.size()) + " client status files updated\n"); sb.append(" " - + Logger.formatDecimalNumber(this.newResponses.size()) + + Logger.formatDecimalNumber(this.updateDocuments.size()) + " client document files updated\n"); return sb.toString(); } diff --git a/src/org/torproject/onionoo/DescriptorSource.java b/src/org/torproject/onionoo/DescriptorSource.java index bdf9be8..c72e8ce 100644 --- a/src/org/torproject/onionoo/DescriptorSource.java +++ b/src/org/torproject/onionoo/DescriptorSource.java @@ -16,12 +16,21 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet;
+import org.torproject.descriptor.BridgeNetworkStatus; +import org.torproject.descriptor.BridgePoolAssignment; import org.torproject.descriptor.Descriptor; import org.torproject.descriptor.DescriptorFile; import org.torproject.descriptor.DescriptorReader; import org.torproject.descriptor.DescriptorSourceFactory; +import org.torproject.descriptor.ExitList; +import org.torproject.descriptor.ExitListEntry; +import org.torproject.descriptor.ExtraInfoDescriptor; +import org.torproject.descriptor.RelayNetworkStatusConsensus; +import org.torproject.descriptor.ServerDescriptor;
enum DescriptorType { RELAY_CONSENSUSES, @@ -38,6 +47,11 @@ interface DescriptorListener { abstract void processDescriptor(Descriptor descriptor, boolean relay); }
+interface FingerprintListener { + abstract void processFingerprints(SortedSet<String> fingerprints, + boolean relay); +} + enum DescriptorHistory { RELAY_CONSENSUS_HISTORY, RELAY_SERVER_HISTORY, @@ -253,6 +267,8 @@ public class DescriptorSource { this.descriptorQueues = new ArrayList<DescriptorQueue>(); this.descriptorListeners = new HashMap<DescriptorType, Set<DescriptorListener>>(); + this.fingerprintListeners = + new HashMap<DescriptorType, Set<FingerprintListener>>(); }
private DescriptorQueue getDescriptorQueue( @@ -271,7 +287,10 @@ public class DescriptorSource { private Map<DescriptorType, Set<DescriptorListener>> descriptorListeners;
- public void registerListener(DescriptorListener listener, + private Map<DescriptorType, Set<FingerprintListener>> + fingerprintListeners; + + public void registerDescriptorListener(DescriptorListener listener, DescriptorType descriptorType) { if (!this.descriptorListeners.containsKey(descriptorType)) { this.descriptorListeners.put(descriptorType, @@ -280,6 +299,15 @@ public class DescriptorSource { this.descriptorListeners.get(descriptorType).add(listener); }
+ public void registerFingerprintListener(FingerprintListener listener, + DescriptorType descriptorType) { + if (!this.fingerprintListeners.containsKey(descriptorType)) { + this.fingerprintListeners.put(descriptorType, + new HashSet<FingerprintListener>()); + } + this.fingerprintListeners.get(descriptorType).add(listener); + } + public void readRelayNetworkConsensuses() { this.readDescriptors(DescriptorType.RELAY_CONSENSUSES, DescriptorHistory.RELAY_CONSENSUS_HISTORY, true); @@ -322,11 +350,14 @@ public class DescriptorSource {
private void readDescriptors(DescriptorType descriptorType, DescriptorHistory descriptorHistory, boolean relay) { - Set<DescriptorListener> descriptorListeners = - this.descriptorListeners.get(descriptorType); - if (descriptorListeners == null || descriptorListeners.isEmpty()) { + if (!this.descriptorListeners.containsKey(descriptorType) && + !this.fingerprintListeners.containsKey(descriptorType)) { return; } + Set<DescriptorListener> descriptorListeners = + this.descriptorListeners.get(descriptorType); + Set<FingerprintListener> fingerprintListeners = + this.fingerprintListeners.get(descriptorType); DescriptorQueue descriptorQueue = this.getDescriptorQueue( descriptorType, descriptorHistory); Descriptor descriptor; @@ -334,6 +365,49 @@ public class DescriptorSource { for (DescriptorListener descriptorListener : descriptorListeners) { descriptorListener.processDescriptor(descriptor, relay); } + SortedSet<String> fingerprints = new TreeSet<String>(); + if (descriptorType == DescriptorType.RELAY_CONSENSUSES && + descriptor instanceof RelayNetworkStatusConsensus) { + fingerprints.addAll(((RelayNetworkStatusConsensus) descriptor). + getStatusEntries().keySet()); + } else if (descriptorType + == DescriptorType.RELAY_SERVER_DESCRIPTORS && + descriptor instanceof ServerDescriptor) { + fingerprints.add(((ServerDescriptor) descriptor). + getFingerprint()); + } else if (descriptorType == DescriptorType.RELAY_EXTRA_INFOS && + descriptor instanceof ExtraInfoDescriptor) { + fingerprints.add(((ExtraInfoDescriptor) descriptor). + getFingerprint()); + } else if (descriptorType == DescriptorType.EXIT_LISTS && + descriptor instanceof ExitList) { + for (ExitListEntry entry : + ((ExitList) descriptor).getExitListEntries()) { + fingerprints.add(entry.getFingerprint()); + } + } else if (descriptorType == DescriptorType.BRIDGE_STATUSES && + descriptor instanceof BridgeNetworkStatus) { + fingerprints.addAll(((BridgeNetworkStatus) descriptor). + getStatusEntries().keySet()); + } else if (descriptorType == + DescriptorType.BRIDGE_SERVER_DESCRIPTORS && + descriptor instanceof ServerDescriptor) { + fingerprints.add(((ServerDescriptor) descriptor). + getFingerprint()); + } else if (descriptorType == DescriptorType.BRIDGE_EXTRA_INFOS && + descriptor instanceof ExtraInfoDescriptor) { + fingerprints.add(((ExtraInfoDescriptor) descriptor). + getFingerprint()); + } else if (descriptorType == + DescriptorType.BRIDGE_POOL_ASSIGNMENTS && + descriptor instanceof BridgePoolAssignment) { + fingerprints.addAll(((BridgePoolAssignment) descriptor). + getEntries().keySet()); + } + for (FingerprintListener fingerprintListener : + fingerprintListeners) { + fingerprintListener.processFingerprints(fingerprints, relay); + } } }
diff --git a/src/org/torproject/onionoo/NodeDataWriter.java b/src/org/torproject/onionoo/NodeDataWriter.java index cc9addb..b59500a 100644 --- a/src/org/torproject/onionoo/NodeDataWriter.java +++ b/src/org/torproject/onionoo/NodeDataWriter.java @@ -33,7 +33,8 @@ import org.torproject.onionoo.LookupService.LookupResult; * The parts of details files coming from server descriptors always come * from the last known descriptor of a relay or bridge, not from the * descriptor that was last referenced in a network status. */ -public class NodeDataWriter implements DataWriter, DescriptorListener { +public class NodeDataWriter implements DataWriter, DescriptorListener, + FingerprintListener {
private DescriptorSource descriptorSource;
@@ -70,23 +71,40 @@ public class NodeDataWriter implements DataWriter, DescriptorListener { this.documentStore = documentStore; this.now = time.currentTimeMillis(); this.registerDescriptorListeners(); + this.registerFingerprintListeners(); }
private void registerDescriptorListeners() { - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, DescriptorType.RELAY_CONSENSUSES); - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, DescriptorType.RELAY_SERVER_DESCRIPTORS); - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, DescriptorType.BRIDGE_STATUSES); - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, DescriptorType.BRIDGE_SERVER_DESCRIPTORS); - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, DescriptorType.BRIDGE_POOL_ASSIGNMENTS); - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, DescriptorType.EXIT_LISTS); }
+ private void registerFingerprintListeners() { + /* TODO Not used yet. + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.RELAY_CONSENSUSES); + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.RELAY_SERVER_DESCRIPTORS); + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.BRIDGE_STATUSES); + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.BRIDGE_SERVER_DESCRIPTORS); + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.BRIDGE_POOL_ASSIGNMENTS); + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.EXIT_LISTS);*/ + } + public void processDescriptor(Descriptor descriptor, boolean relay) { if (descriptor instanceof RelayNetworkStatusConsensus) { this.updateRelayNetworkStatusConsensus( @@ -104,6 +122,11 @@ public class NodeDataWriter implements DataWriter, DescriptorListener { } }
+ public void processFingerprints(SortedSet<String> fingerprints, + boolean relay) { + /* TODO Not used yet. */ + } + public void updateStatuses() { this.readStatusSummary(); Logger.printStatusTime("Read status summary"); diff --git a/src/org/torproject/onionoo/UptimeDataWriter.java b/src/org/torproject/onionoo/UptimeDataWriter.java index 7a8b3af..e2b1bee 100644 --- a/src/org/torproject/onionoo/UptimeDataWriter.java +++ b/src/org/torproject/onionoo/UptimeDataWriter.java @@ -20,7 +20,8 @@ import org.torproject.descriptor.Descriptor; import org.torproject.descriptor.NetworkStatusEntry; import org.torproject.descriptor.RelayNetworkStatusConsensus;
-public class UptimeDataWriter implements DataWriter, DescriptorListener { +public class UptimeDataWriter implements DataWriter, DescriptorListener, + FingerprintListener {
private DescriptorSource descriptorSource;
@@ -34,12 +35,20 @@ public class UptimeDataWriter implements DataWriter, DescriptorListener { this.documentStore = documentStore; this.now = time.currentTimeMillis(); this.registerDescriptorListeners(); + this.registerFingerprintListeners(); }
private void registerDescriptorListeners() { - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, DescriptorType.RELAY_CONSENSUSES); - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, + DescriptorType.BRIDGE_STATUSES); + } + + public void registerFingerprintListeners() { + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.RELAY_CONSENSUSES); + this.descriptorSource.registerFingerprintListener(this, DescriptorType.BRIDGE_STATUSES); }
@@ -284,6 +293,18 @@ public class UptimeDataWriter implements DataWriter, DescriptorListener { } }
+ private SortedSet<String> newRelayFingerprints = new TreeSet<String>(), + newBridgeFingerprints = new TreeSet<String>(); + + public void processFingerprints(SortedSet<String> fingerprints, + boolean relay) { + if (relay) { + this.newRelayFingerprints.addAll(fingerprints); + } else { + this.newBridgeFingerprints.addAll(fingerprints); + } + } + public void updateDocuments() { SortedSet<UptimeHistory> knownRelayStatuses = new TreeSet<UptimeHistory>(), @@ -296,10 +317,10 @@ public class UptimeDataWriter implements DataWriter, DescriptorListener { knownBridgeStatuses.add(status); } } - for (String fingerprint : this.newRunningRelays.keySet()) { + for (String fingerprint : this.newRelayFingerprints) { this.updateDocument(true, fingerprint, knownRelayStatuses); } - for (String fingerprint : this.newRunningBridges.keySet()) { + for (String fingerprint : this.newBridgeFingerprints) { this.updateDocument(false, fingerprint, knownBridgeStatuses); } Logger.printStatusTime("Wrote uptime document files"); diff --git a/src/org/torproject/onionoo/WeightsDataWriter.java b/src/org/torproject/onionoo/WeightsDataWriter.java index 855e3e9..21c3272 100644 --- a/src/org/torproject/onionoo/WeightsDataWriter.java +++ b/src/org/torproject/onionoo/WeightsDataWriter.java @@ -25,7 +25,8 @@ import org.torproject.descriptor.NetworkStatusEntry; import org.torproject.descriptor.RelayNetworkStatusConsensus; import org.torproject.descriptor.ServerDescriptor;
-public class WeightsDataWriter implements DataWriter, DescriptorListener { +public class WeightsDataWriter implements DataWriter, DescriptorListener, + FingerprintListener {
private DescriptorSource descriptorSource;
@@ -39,12 +40,20 @@ public class WeightsDataWriter implements DataWriter, DescriptorListener { this.documentStore = documentStore; this.now = time.currentTimeMillis(); this.registerDescriptorListeners(); + this.registerFingerprintListeners(); }
private void registerDescriptorListeners() { - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, DescriptorType.RELAY_CONSENSUSES); - this.descriptorSource.registerListener(this, + this.descriptorSource.registerDescriptorListener(this, + DescriptorType.RELAY_SERVER_DESCRIPTORS); + } + + private void registerFingerprintListeners() { + this.descriptorSource.registerFingerprintListener(this, + DescriptorType.RELAY_CONSENSUSES); + this.descriptorSource.registerFingerprintListener(this, DescriptorType.RELAY_SERVER_DESCRIPTORS); }
@@ -289,7 +298,6 @@ public class WeightsDataWriter implements DataWriter, DescriptorListener { history.put(interval, weights); history = this.compressHistory(history); this.writeHistoryToDisk(fingerprint, history); - this.updateWeightsDocuments.add(fingerprint); this.updateWeightsStatuses.remove(fingerprint); } } @@ -456,6 +464,13 @@ public class WeightsDataWriter implements DataWriter, DescriptorListener { this.documentStore.store(weightsStatus, fingerprint); }
+ public void processFingerprints(SortedSet<String> fingerprints, + boolean relay) { + if (relay) { + this.updateWeightsDocuments.addAll(fingerprints); + } + } + private void writeWeightsDataFiles() { for (String fingerprint : this.updateWeightsDocuments) { SortedMap<long[], double[]> history =