commit 3a987142ce843cc65284cf00a5c02ede97d81b67 Author: Karsten Loesing karsten.loesing@gmx.net Date: Tue Jul 2 09:01:52 2013 +0200
Change descriptor parsing model from pull to push. --- .../torproject/onionoo/BandwidthDataWriter.java | 32 ++-- src/org/torproject/onionoo/DescriptorSource.java | 121 +++++++++---- src/org/torproject/onionoo/DetailsDataWriter.java | 135 +++++++-------- src/org/torproject/onionoo/Main.java | 31 ++-- src/org/torproject/onionoo/NodeDataWriter.java | 182 ++++++++------------ src/org/torproject/onionoo/WeightsDataWriter.java | 83 +++++---- 6 files changed, 299 insertions(+), 285 deletions(-)
diff --git a/src/org/torproject/onionoo/BandwidthDataWriter.java b/src/org/torproject/onionoo/BandwidthDataWriter.java index 7203b6d..2e0b8a7 100644 --- a/src/org/torproject/onionoo/BandwidthDataWriter.java +++ b/src/org/torproject/onionoo/BandwidthDataWriter.java @@ -36,7 +36,7 @@ import org.torproject.descriptor.ExtraInfoDescriptor; * last 3 days in the bandwidth document may not be equivalent to the last * 3 days as of publishing the document, but that's something clients can * work around. */ -public class BandwidthDataWriter { +public class BandwidthDataWriter implements DescriptorListener {
private DescriptorSource descriptorSource;
@@ -48,6 +48,7 @@ public class BandwidthDataWriter { DocumentStore documentStore) { this.descriptorSource = descriptorSource; this.documentStore = documentStore; + this.registerDescriptorListeners(); }
private SimpleDateFormat dateTimeFormat = new SimpleDateFormat( @@ -57,27 +58,24 @@ public class BandwidthDataWriter { this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); }
- public void setCurrentNodes( - SortedMap<String, NodeStatus> currentNodes) { - this.currentFingerprints.addAll(currentNodes.keySet()); + private void registerDescriptorListeners() { + this.descriptorSource.registerListener(this, + DescriptorType.RELAY_EXTRA_INFOS); + this.descriptorSource.registerListener(this, + DescriptorType.BRIDGE_EXTRA_INFOS); }
- public void readExtraInfoDescriptors() { - DescriptorQueue descriptorQueue = - this.descriptorSource.getDescriptorQueue( - new DescriptorType[] { DescriptorType.RELAY_EXTRA_INFOS, - DescriptorType.BRIDGE_EXTRA_INFOS }, - DescriptorHistory.EXTRAINFO_HISTORY); - Descriptor descriptor; - while ((descriptor = descriptorQueue.nextDescriptor()) != null) { - if (descriptor instanceof ExtraInfoDescriptor) { - ExtraInfoDescriptor extraInfoDescriptor = - (ExtraInfoDescriptor) descriptor; - this.parseDescriptor(extraInfoDescriptor); - } + public void processDescriptor(Descriptor descriptor, boolean relay) { + if (descriptor instanceof ExtraInfoDescriptor) { + this.parseDescriptor((ExtraInfoDescriptor) descriptor); } }
+ public void setCurrentNodes( + SortedMap<String, NodeStatus> currentNodes) { + this.currentFingerprints.addAll(currentNodes.keySet()); + } + private void parseDescriptor(ExtraInfoDescriptor descriptor) { String fingerprint = descriptor.getFingerprint(); boolean updateHistory = false; diff --git a/src/org/torproject/onionoo/DescriptorSource.java b/src/org/torproject/onionoo/DescriptorSource.java index 23febc8..5868f0f 100644 --- a/src/org/torproject/onionoo/DescriptorSource.java +++ b/src/org/torproject/onionoo/DescriptorSource.java @@ -9,9 +9,12 @@ import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap;
@@ -24,20 +27,24 @@ enum DescriptorType { RELAY_CONSENSUSES, RELAY_SERVER_DESCRIPTORS, RELAY_EXTRA_INFOS, + EXIT_LISTS, BRIDGE_STATUSES, BRIDGE_SERVER_DESCRIPTORS, BRIDGE_EXTRA_INFOS, BRIDGE_POOL_ASSIGNMENTS, - EXIT_LISTS, +} + +interface DescriptorListener { + abstract void processDescriptor(Descriptor descriptor, boolean relay); }
enum DescriptorHistory { - EXTRAINFO_HISTORY, - EXIT_LIST_HISTORY, - BRIDGE_POOLASSIGN_HISTORY, - WEIGHTS_RELAY_CONSENSUS_HISTORY, RELAY_CONSENSUS_HISTORY, + RELAY_EXTRAINFO_HISTORY, + EXIT_LIST_HISTORY, BRIDGE_STATUS_HISTORY, + BRIDGE_EXTRAINFO_HISTORY, + BRIDGE_POOLASSIGN_HISTORY, }
class DescriptorQueue { @@ -108,8 +115,11 @@ class DescriptorQueue { public void readHistoryFile(DescriptorHistory descriptorHistory) { String historyFileName = null; switch (descriptorHistory) { - case EXTRAINFO_HISTORY: - historyFileName = "extrainfo-history"; + case RELAY_EXTRAINFO_HISTORY: + historyFileName = "relay-extrainfo-history"; + break; + case BRIDGE_EXTRAINFO_HISTORY: + historyFileName = "bridge-extrainfo-history"; break; case EXIT_LIST_HISTORY: historyFileName = "exit-list-history"; @@ -117,9 +127,6 @@ class DescriptorQueue { case BRIDGE_POOLASSIGN_HISTORY: historyFileName = "bridge-poolassign-history"; break; - case WEIGHTS_RELAY_CONSENSUS_HISTORY: - historyFileName = "weights-relay-consensus-history"; - break; case RELAY_CONSENSUS_HISTORY: historyFileName = "relay-consensus-history"; break; @@ -230,38 +237,92 @@ public class DescriptorSource { this.inDir = inDir; this.statusDir = statusDir; this.descriptorQueues = new ArrayList<DescriptorQueue>(); + this.descriptorListeners = + new HashMap<DescriptorType, Set<DescriptorListener>>(); }
- public DescriptorQueue getDescriptorQueue( - DescriptorType descriptorType) { + private DescriptorQueue getDescriptorQueue( + DescriptorType descriptorType, + DescriptorHistory descriptorHistory) { DescriptorQueue descriptorQueue = new DescriptorQueue(this.inDir, this.statusDir); descriptorQueue.addDirectory(descriptorType); + if (descriptorHistory != null) { + descriptorQueue.readHistoryFile(descriptorHistory); + } this.descriptorQueues.add(descriptorQueue); return descriptorQueue; }
- public DescriptorQueue getDescriptorQueue( - DescriptorType[] descriptorTypes, - DescriptorHistory descriptorHistory) { - DescriptorQueue descriptorQueue = new DescriptorQueue(this.inDir, - this.statusDir); - for (DescriptorType descriptorType : descriptorTypes) { - descriptorQueue.addDirectory(descriptorType); + private Map<DescriptorType, Set<DescriptorListener>> + descriptorListeners; + + public void registerListener(DescriptorListener listener, + DescriptorType descriptorType) { + if (!this.descriptorListeners.containsKey(descriptorType)) { + this.descriptorListeners.put(descriptorType, + new HashSet<DescriptorListener>()); } - descriptorQueue.readHistoryFile(descriptorHistory); - this.descriptorQueues.add(descriptorQueue); - return descriptorQueue; + this.descriptorListeners.get(descriptorType).add(listener); }
- public DescriptorQueue getDescriptorQueue(DescriptorType descriptorType, - DescriptorHistory descriptorHistory) { - DescriptorQueue descriptorQueue = new DescriptorQueue(this.inDir, - this.statusDir); - descriptorQueue.addDirectory(descriptorType); - descriptorQueue.readHistoryFile(descriptorHistory); - this.descriptorQueues.add(descriptorQueue); - return descriptorQueue; + public void readRelayNetworkConsensuses() { + this.readDescriptors(DescriptorType.RELAY_CONSENSUSES, + DescriptorHistory.RELAY_CONSENSUS_HISTORY, true); + } + + public void readRelayServerDescriptors() { + // TODO Use parse history as soon as all listeners can handle it. + this.readDescriptors(DescriptorType.RELAY_SERVER_DESCRIPTORS, null, + true); + } + + public void readRelayExtraInfos() { + this.readDescriptors(DescriptorType.RELAY_EXTRA_INFOS, + DescriptorHistory.RELAY_EXTRAINFO_HISTORY, true); + } + + public void readExitLists() { + this.readDescriptors(DescriptorType.EXIT_LISTS, + DescriptorHistory.EXIT_LIST_HISTORY, true); + } + + public void readBridgeNetworkStatuses() { + this.readDescriptors(DescriptorType.BRIDGE_STATUSES, + DescriptorHistory.BRIDGE_STATUS_HISTORY, false); + } + + public void readBridgeServerDescriptors() { + // TODO Use parse history as soon as all listeners can handle it. + this.readDescriptors(DescriptorType.BRIDGE_SERVER_DESCRIPTORS, null, + false); + } + + public void readBridgeExtraInfos() { + this.readDescriptors(DescriptorType.BRIDGE_EXTRA_INFOS, + DescriptorHistory.BRIDGE_EXTRAINFO_HISTORY, false); + } + + public void readBridgePoolAssignments() { + this.readDescriptors(DescriptorType.BRIDGE_POOL_ASSIGNMENTS, + DescriptorHistory.BRIDGE_POOLASSIGN_HISTORY, false); + } + + private void readDescriptors(DescriptorType descriptorType, + DescriptorHistory descriptorHistory, boolean relay) { + Set<DescriptorListener> descriptorListeners = + this.descriptorListeners.get(descriptorType); + if (descriptorListeners == null || descriptorListeners.isEmpty()) { + return; + } + DescriptorQueue descriptorQueue = this.getDescriptorQueue( + descriptorType, descriptorHistory); + Descriptor descriptor; + while ((descriptor = descriptorQueue.nextDescriptor()) != null) { + for (DescriptorListener descriptorListener : descriptorListeners) { + descriptorListener.processDescriptor(descriptor, relay); + } + } }
public void writeHistoryFiles() { diff --git a/src/org/torproject/onionoo/DetailsDataWriter.java b/src/org/torproject/onionoo/DetailsDataWriter.java index e4ccb78..acb01d1 100644 --- a/src/org/torproject/onionoo/DetailsDataWriter.java +++ b/src/org/torproject/onionoo/DetailsDataWriter.java @@ -32,7 +32,7 @@ import org.torproject.descriptor.ServerDescriptor; * The parts of details files coming from server descriptors always come * from the last known descriptor of a relay or bridge, not from the * descriptor that was last referenced in a network status. */ -public class DetailsDataWriter { +public class DetailsDataWriter implements DescriptorListener {
private DescriptorSource descriptorSource;
@@ -50,6 +50,30 @@ public class DetailsDataWriter { this.descriptorSource = descriptorSource; this.reverseDomainNameResolver = reverseDomainNameResolver; this.documentStore = documentStore; + this.registerDescriptorListeners(); + } + + private void registerDescriptorListeners() { + this.descriptorSource.registerListener(this, + DescriptorType.RELAY_SERVER_DESCRIPTORS); + this.descriptorSource.registerListener(this, + DescriptorType.BRIDGE_SERVER_DESCRIPTORS); + this.descriptorSource.registerListener(this, + DescriptorType.BRIDGE_POOL_ASSIGNMENTS); + this.descriptorSource.registerListener(this, + DescriptorType.EXIT_LISTS); + } + + public void processDescriptor(Descriptor descriptor, boolean relay) { + if (descriptor instanceof ServerDescriptor && relay) { + this.processRelayServerDescriptor((ServerDescriptor) descriptor); + } else if (descriptor instanceof ServerDescriptor && !relay) { + this.processBridgeServerDescriptor((ServerDescriptor) descriptor); + } else if (descriptor instanceof BridgePoolAssignment) { + this.processBridgePoolAssignment((BridgePoolAssignment) descriptor); + } else if (descriptor instanceof ExitList) { + this.processExitList((ExitList) descriptor); + } }
public void setCurrentNodes( @@ -92,28 +116,19 @@ public class DetailsDataWriter {
private Map<String, ServerDescriptor> relayServerDescriptors = new HashMap<String, ServerDescriptor>(); - public void readRelayServerDescriptors() { + + private void processRelayServerDescriptor( + ServerDescriptor serverDescriptor) { /* Don't remember which server descriptors we already parsed. If we * parse a server descriptor now and first learn about the relay in a * later consensus, we'll never write the descriptor content anywhere. * The result would be details files containing no descriptor parts * until the relay publishes the next descriptor. */ - DescriptorQueue descriptorQueue = - this.descriptorSource.getDescriptorQueue( - DescriptorType.RELAY_SERVER_DESCRIPTORS); - Descriptor descriptor; - while ((descriptor = descriptorQueue.nextDescriptor()) != null) { - if (descriptor instanceof ServerDescriptor) { - ServerDescriptor serverDescriptor = (ServerDescriptor) descriptor; - String fingerprint = serverDescriptor.getFingerprint(); - if (!this.relayServerDescriptors.containsKey(fingerprint) || - this.relayServerDescriptors.get(fingerprint). - getPublishedMillis() - < serverDescriptor.getPublishedMillis()) { - this.relayServerDescriptors.put(fingerprint, - serverDescriptor); - } - } + String fingerprint = serverDescriptor.getFingerprint(); + if (!this.relayServerDescriptors.containsKey(fingerprint) || + this.relayServerDescriptors.get(fingerprint).getPublishedMillis() + < serverDescriptor.getPublishedMillis()) { + this.relayServerDescriptors.put(fingerprint, serverDescriptor); } }
@@ -232,79 +247,53 @@ public class DetailsDataWriter { }
private long now = System.currentTimeMillis(); + private Map<String, Set<ExitListEntry>> exitListEntries = new HashMap<String, Set<ExitListEntry>>(); - public void readExitLists() { - DescriptorQueue descriptorQueue = - this.descriptorSource.getDescriptorQueue( - DescriptorType.EXIT_LISTS, DescriptorHistory.EXIT_LIST_HISTORY); - Descriptor descriptor; - while ((descriptor = descriptorQueue.nextDescriptor()) != null) { - if (descriptor instanceof ExitList) { - ExitList exitList = (ExitList) descriptor; - for (ExitListEntry exitListEntry : - exitList.getExitListEntries()) { - if (exitListEntry.getScanMillis() < - this.now - 24L * 60L * 60L * 1000L) { - continue; - } - String fingerprint = exitListEntry.getFingerprint(); - if (!this.exitListEntries.containsKey(fingerprint)) { - this.exitListEntries.put(fingerprint, - new HashSet<ExitListEntry>()); - } - this.exitListEntries.get(fingerprint).add(exitListEntry); - } + + private void processExitList(ExitList exitList) { + for (ExitListEntry exitListEntry : exitList.getExitListEntries()) { + if (exitListEntry.getScanMillis() < + this.now - 24L * 60L * 60L * 1000L) { + continue; } + String fingerprint = exitListEntry.getFingerprint(); + if (!this.exitListEntries.containsKey(fingerprint)) { + this.exitListEntries.put(fingerprint, + new HashSet<ExitListEntry>()); + } + this.exitListEntries.get(fingerprint).add(exitListEntry); } }
private Map<String, ServerDescriptor> bridgeServerDescriptors = new HashMap<String, ServerDescriptor>(); - public void readBridgeServerDescriptors() { + + private void processBridgeServerDescriptor( + ServerDescriptor serverDescriptor) { /* Don't remember which server descriptors we already parsed. If we * parse a server descriptor now and first learn about the relay in a * later status, we'll never write the descriptor content anywhere. * The result would be details files containing no descriptor parts * until the bridge publishes the next descriptor. */ - DescriptorQueue descriptorQueue = - this.descriptorSource.getDescriptorQueue( - DescriptorType.BRIDGE_SERVER_DESCRIPTORS); - Descriptor descriptor; - while ((descriptor = descriptorQueue.nextDescriptor()) != null) { - if (descriptor instanceof ServerDescriptor) { - ServerDescriptor serverDescriptor = (ServerDescriptor) descriptor; - String fingerprint = serverDescriptor.getFingerprint(); - if (!this.bridgeServerDescriptors.containsKey(fingerprint) || - this.bridgeServerDescriptors.get(fingerprint). - getPublishedMillis() - < serverDescriptor.getPublishedMillis()) { - this.bridgeServerDescriptors.put(fingerprint, - serverDescriptor); - } - } + String fingerprint = serverDescriptor.getFingerprint(); + if (!this.bridgeServerDescriptors.containsKey(fingerprint) || + this.bridgeServerDescriptors.get(fingerprint).getPublishedMillis() + < serverDescriptor.getPublishedMillis()) { + this.bridgeServerDescriptors.put(fingerprint, serverDescriptor); } }
private Map<String, String> bridgePoolAssignments = new HashMap<String, String>(); - public void readBridgePoolAssignments() { - DescriptorQueue descriptorQueue = - this.descriptorSource.getDescriptorQueue( - DescriptorType.BRIDGE_POOL_ASSIGNMENTS, - DescriptorHistory.BRIDGE_POOLASSIGN_HISTORY); - Descriptor descriptor; - while ((descriptor = descriptorQueue.nextDescriptor()) != null) { - if (descriptor instanceof BridgePoolAssignment) { - BridgePoolAssignment bridgePoolAssignment = - (BridgePoolAssignment) descriptor; - for (Map.Entry<String, String> e : - bridgePoolAssignment.getEntries().entrySet()) { - String fingerprint = e.getKey(); - String details = e.getValue(); - this.bridgePoolAssignments.put(fingerprint, details); - } - } + + private void processBridgePoolAssignment( + BridgePoolAssignment bridgePoolAssignment) { + for (Map.Entry<String, String> e : + bridgePoolAssignment.getEntries().entrySet()) { + String fingerprint = e.getKey(); + String details = e.getValue(); + this.bridgePoolAssignments.put(fingerprint, details); } }
diff --git a/src/org/torproject/onionoo/Main.java b/src/org/torproject/onionoo/Main.java index 9caac06..9c02319 100644 --- a/src/org/torproject/onionoo/Main.java +++ b/src/org/torproject/onionoo/Main.java @@ -30,30 +30,27 @@ public class Main { WeightsDataWriter wdw = new WeightsDataWriter(dso, ds); printStatusTime("Initialized weights data writer");
- // TODO Instead of creating nine, partly overlapping descriptor - // queues, register for descriptor type and let DescriptorSource - // parse everything just once. printStatus("Reading descriptors."); - ndw.readRelayNetworkConsensuses(); - printStatusTime("Read network status consensuses"); - ndw.readBridgeNetworkStatuses(); - printStatusTime("Read bridge network statuses"); - ddw.readRelayServerDescriptors(); + dso.readRelayNetworkConsensuses(); + printStatusTime("Read relay network consensuses"); + dso.readRelayServerDescriptors(); printStatusTime("Read relay server descriptors"); - ddw.readExitLists(); + dso.readRelayExtraInfos(); + printStatusTime("Read relay extra-info descriptors"); + dso.readExitLists(); printStatusTime("Read exit lists"); - ddw.readBridgeServerDescriptors(); + dso.readBridgeNetworkStatuses(); + printStatusTime("Read bridge network statuses"); + dso.readBridgeServerDescriptors(); printStatusTime("Read bridge server descriptors"); - ddw.readBridgePoolAssignments(); + dso.readBridgeExtraInfos(); + printStatusTime("Read bridge extra-info descriptors"); + dso.readBridgePoolAssignments(); printStatusTime("Read bridge-pool assignments"); - bdw.readExtraInfoDescriptors(); - printStatusTime("Read extra-info descriptors"); - wdw.readRelayServerDescriptors(); - printStatusTime("Read relay server descriptors"); - wdw.readRelayNetworkConsensuses(); - printStatusTime("Read relay network consensuses");
printStatus("Updating internal node list."); + ndw.readStatusSummary(); + printStatusTime("Read status summary"); ndw.lookUpCitiesAndASes(); printStatusTime("Looked up cities and ASes"); ndw.setRunningBits(); diff --git a/src/org/torproject/onionoo/NodeDataWriter.java b/src/org/torproject/onionoo/NodeDataWriter.java index b2d7e32..69ce01b 100644 --- a/src/org/torproject/onionoo/NodeDataWriter.java +++ b/src/org/torproject/onionoo/NodeDataWriter.java @@ -16,7 +16,7 @@ import org.torproject.onionoo.LookupService.LookupResult;
/* Store relays and bridges that have been running in the past seven * days. */ -public class NodeDataWriter { +public class NodeDataWriter implements DescriptorListener {
private DescriptorSource descriptorSource;
@@ -33,63 +33,29 @@ public class NodeDataWriter {
private SortedMap<String, Integer> lastBandwidthWeights = null;
- private int relaysUpdated = 0, relaysAdded = 0, - relayConsensusesProcessed = 0, bridgesUpdated = 0, - bridgesAdded = 0, bridgeStatusesProcessed = 0; + private int relayConsensusesProcessed = 0, bridgeStatusesProcessed = 0; + public NodeDataWriter(DescriptorSource descriptorSource, LookupService lookupService, DocumentStore documentStore) { this.descriptorSource = descriptorSource; this.lookupService = lookupService; this.documentStore = documentStore; - this.readStatusSummary(); + this.registerDescriptorListeners(); }
- private void readStatusSummary() { - SortedSet<String> fingerprints = this.documentStore.list( - NodeStatus.class, true); - for (String fingerprint : fingerprints) { - NodeStatus node = this.documentStore.retrieve(NodeStatus.class, - true, fingerprint); - if (node.isRelay()) { - this.relaysLastValidAfterMillis = Math.max( - this.relaysLastValidAfterMillis, node.getLastSeenMillis()); - } else { - this.bridgesLastPublishedMillis = Math.max( - this.bridgesLastPublishedMillis, node.getLastSeenMillis()); - } - this.knownNodes.put(fingerprint, node); - } + private void registerDescriptorListeners() { + this.descriptorSource.registerListener(this, + DescriptorType.RELAY_CONSENSUSES); + this.descriptorSource.registerListener(this, + DescriptorType.BRIDGE_STATUSES); }
- public void readRelayNetworkConsensuses() { - if (this.descriptorSource == null) { - System.err.println("Not configured to read relay network " - + "consensuses."); - return; - } - DescriptorQueue descriptorQueue = - this.descriptorSource.getDescriptorQueue( - DescriptorType.RELAY_CONSENSUSES, - DescriptorHistory.RELAY_CONSENSUS_HISTORY); - Descriptor descriptor; - while ((descriptor = descriptorQueue.nextDescriptor()) != null) { - if (descriptor instanceof RelayNetworkStatusConsensus) { - updateRelayNetworkStatusConsensus( - (RelayNetworkStatusConsensus) descriptor); - } - } - } - - public void setRunningBits() { - for (NodeStatus node : this.knownNodes.values()) { - if (node.isRelay() && - node.getLastSeenMillis() == this.relaysLastValidAfterMillis) { - node.setRunning(true); - } - if (!node.isRelay() && - node.getLastSeenMillis() == this.bridgesLastPublishedMillis) { - node.setRunning(true); - } + public void processDescriptor(Descriptor descriptor, boolean relay) { + if (descriptor instanceof RelayNetworkStatusConsensus) { + updateRelayNetworkStatusConsensus( + (RelayNetworkStatusConsensus) descriptor); + } else if (descriptor instanceof BridgeNetworkStatus) { + updateBridgeNetworkStatus((BridgeNetworkStatus) descriptor); } }
@@ -119,10 +85,8 @@ public class NodeDataWriter { validAfterMillis, null); if (this.knownNodes.containsKey(fingerprint)) { this.knownNodes.get(fingerprint).update(newNodeStatus); - this.relaysUpdated++; } else { this.knownNodes.put(fingerprint, newNodeStatus); - this.relaysAdded++; } } this.relayConsensusesProcessed++; @@ -131,6 +95,67 @@ public class NodeDataWriter { } }
+ private void updateBridgeNetworkStatus(BridgeNetworkStatus status) { + long publishedMillis = status.getPublishedMillis(); + if (publishedMillis > this.bridgesLastPublishedMillis) { + this.bridgesLastPublishedMillis = publishedMillis; + } + for (NetworkStatusEntry entry : status.getStatusEntries().values()) { + String nickname = entry.getNickname(); + String fingerprint = entry.getFingerprint(); + String address = entry.getAddress(); + SortedSet<String> orAddressesAndPorts = new TreeSet<String>( + entry.getOrAddresses()); + int orPort = entry.getOrPort(); + int dirPort = entry.getDirPort(); + SortedSet<String> relayFlags = entry.getFlags(); + NodeStatus newNodeStatus = new NodeStatus(false, nickname, + fingerprint, address, orAddressesAndPorts, null, + publishedMillis, orPort, dirPort, relayFlags, -1L, "??", null, + -1L, null, null, publishedMillis, -1L, null); + if (this.knownNodes.containsKey(fingerprint)) { + this.knownNodes.get(fingerprint).update(newNodeStatus); + } else { + this.knownNodes.put(fingerprint, newNodeStatus); + } + } + this.bridgeStatusesProcessed++; + } + + public void readStatusSummary() { + SortedSet<String> fingerprints = this.documentStore.list( + NodeStatus.class, true); + for (String fingerprint : fingerprints) { + NodeStatus node = this.documentStore.retrieve(NodeStatus.class, + true, fingerprint); + if (node.isRelay()) { + this.relaysLastValidAfterMillis = Math.max( + this.relaysLastValidAfterMillis, node.getLastSeenMillis()); + } else { + this.bridgesLastPublishedMillis = Math.max( + this.bridgesLastPublishedMillis, node.getLastSeenMillis()); + } + if (this.knownNodes.containsKey(fingerprint)) { + this.knownNodes.get(fingerprint).update(node); + } else { + this.knownNodes.put(fingerprint, node); + } + } + } + + public void setRunningBits() { + for (NodeStatus node : this.knownNodes.values()) { + if (node.isRelay() && + node.getLastSeenMillis() == this.relaysLastValidAfterMillis) { + node.setRunning(true); + } + if (!node.isRelay() && + node.getLastSeenMillis() == this.bridgesLastPublishedMillis) { + node.setRunning(true); + } + } + } + public void lookUpCitiesAndASes() { SortedSet<String> addressStrings = new TreeSet<String>(); for (NodeStatus node : this.knownNodes.values()) { @@ -164,53 +189,6 @@ public class NodeDataWriter { } }
- public void readBridgeNetworkStatuses() { - if (this.descriptorSource == null) { - System.err.println("Not configured to read bridge network " - + "statuses."); - return; - } - DescriptorQueue descriptorQueue = - this.descriptorSource.getDescriptorQueue( - DescriptorType.BRIDGE_STATUSES, - DescriptorHistory.BRIDGE_STATUS_HISTORY); - Descriptor descriptor; - while ((descriptor = descriptorQueue.nextDescriptor()) != null) { - if (descriptor instanceof BridgeNetworkStatus) { - updateBridgeNetworkStatus((BridgeNetworkStatus) descriptor); - } - } - } - - private void updateBridgeNetworkStatus(BridgeNetworkStatus status) { - long publishedMillis = status.getPublishedMillis(); - if (publishedMillis > this.bridgesLastPublishedMillis) { - this.bridgesLastPublishedMillis = publishedMillis; - } - for (NetworkStatusEntry entry : status.getStatusEntries().values()) { - String nickname = entry.getNickname(); - String fingerprint = entry.getFingerprint(); - String address = entry.getAddress(); - SortedSet<String> orAddressesAndPorts = new TreeSet<String>( - entry.getOrAddresses()); - int orPort = entry.getOrPort(); - int dirPort = entry.getDirPort(); - SortedSet<String> relayFlags = entry.getFlags(); - NodeStatus newNodeStatus = new NodeStatus(false, nickname, - fingerprint, address, orAddressesAndPorts, null, - publishedMillis, orPort, dirPort, relayFlags, -1L, "??", null, - -1L, null, null, publishedMillis, -1L, null); - if (this.knownNodes.containsKey(fingerprint)) { - this.knownNodes.get(fingerprint).update(newNodeStatus); - this.bridgesUpdated++; - } else { - this.knownNodes.put(fingerprint, newNodeStatus); - this.bridgesAdded++; - } - } - this.bridgeStatusesProcessed++; - } - public void writeStatusSummary() { this.writeSummary(true); } @@ -248,16 +226,8 @@ public class NodeDataWriter { StringBuilder sb = new StringBuilder(); sb.append(" " + formatDecimalNumber(relayConsensusesProcessed) + " relay consensuses processed\n"); - sb.append(" " + formatDecimalNumber(relaysUpdated) - + " relays updated\n"); - sb.append(" " + formatDecimalNumber(relaysAdded) - + " relays added\n"); sb.append(" " + formatDecimalNumber(bridgeStatusesProcessed) + " bridge statuses processed\n"); - sb.append(" " + formatDecimalNumber(bridgesUpdated) - + " bridges updated\n"); - sb.append(" " + formatDecimalNumber(bridgesAdded) - + " bridges added\n"); return sb.toString(); }
diff --git a/src/org/torproject/onionoo/WeightsDataWriter.java b/src/org/torproject/onionoo/WeightsDataWriter.java index 317b8e2..e388ddd 100644 --- a/src/org/torproject/onionoo/WeightsDataWriter.java +++ b/src/org/torproject/onionoo/WeightsDataWriter.java @@ -23,7 +23,7 @@ import org.torproject.descriptor.NetworkStatusEntry; import org.torproject.descriptor.RelayNetworkStatusConsensus; import org.torproject.descriptor.ServerDescriptor;
-public class WeightsDataWriter { +public class WeightsDataWriter implements DescriptorListener {
private DescriptorSource descriptorSource;
@@ -35,6 +35,23 @@ public class WeightsDataWriter { DocumentStore documentStore) { this.descriptorSource = descriptorSource; this.documentStore = documentStore; + this.registerDescriptorListeners(); + } + + private void registerDescriptorListeners() { + this.descriptorSource.registerListener(this, + DescriptorType.RELAY_CONSENSUSES); + this.descriptorSource.registerListener(this, + DescriptorType.RELAY_SERVER_DESCRIPTORS); + } + + public void processDescriptor(Descriptor descriptor, boolean relay) { + if (descriptor instanceof ServerDescriptor) { + this.processRelayServerDescriptor((ServerDescriptor) descriptor); + } else if (descriptor instanceof RelayNetworkStatusConsensus) { + this.processRelayNetworkConsensus( + (RelayNetworkStatusConsensus) descriptor); + } }
public void setCurrentNodes( @@ -42,51 +59,33 @@ public class WeightsDataWriter { this.currentFingerprints.addAll(currentNodes.keySet()); }
- /* Read advertised bandwidths of all server descriptors in - * in/relay-descriptors/server-descriptors/ to memory. Ideally, we'd - * skip descriptors that we read before and obtain their advertised - * bandwidths from some temp file. This approach should do for now, - * though. */ private Map<String, Integer> advertisedBandwidths = new HashMap<String, Integer>(); - public void readRelayServerDescriptors() { - DescriptorQueue descriptorQueue = - this.descriptorSource.getDescriptorQueue( - DescriptorType.RELAY_SERVER_DESCRIPTORS); - Descriptor descriptor; - while ((descriptor = descriptorQueue.nextDescriptor()) != null) { - if (descriptor instanceof ServerDescriptor) { - ServerDescriptor serverDescriptor = - (ServerDescriptor) descriptor; - String digest = serverDescriptor.getServerDescriptorDigest(). - toUpperCase(); - int advertisedBandwidth = Math.min(Math.min( - serverDescriptor.getBandwidthBurst(), - serverDescriptor.getBandwidthObserved()), - serverDescriptor.getBandwidthRate()); - this.advertisedBandwidths.put(digest, advertisedBandwidth); - } - } + + private void processRelayServerDescriptor( + ServerDescriptor serverDescriptor) { + /* Read advertised bandwidths of all server descriptors in + * in/relay-descriptors/server-descriptors/ to memory. Ideally, we'd + * skip descriptors that we read before and obtain their advertised + * bandwidths from some temp file. This approach should do for now, + * though. */ + String digest = serverDescriptor.getServerDescriptorDigest(). + toUpperCase(); + int advertisedBandwidth = Math.min(Math.min( + serverDescriptor.getBandwidthBurst(), + serverDescriptor.getBandwidthObserved()), + serverDescriptor.getBandwidthRate()); + this.advertisedBandwidths.put(digest, advertisedBandwidth); }
- public void readRelayNetworkConsensuses() { - DescriptorQueue descriptorQueue = - this.descriptorSource.getDescriptorQueue( - DescriptorType.RELAY_CONSENSUSES, - DescriptorHistory.WEIGHTS_RELAY_CONSENSUS_HISTORY); - Descriptor descriptor; - while ((descriptor = descriptorQueue.nextDescriptor()) != null) { - if (descriptor instanceof RelayNetworkStatusConsensus) { - RelayNetworkStatusConsensus consensus = - (RelayNetworkStatusConsensus) descriptor; - long validAfterMillis = consensus.getValidAfterMillis(), - freshUntilMillis = consensus.getFreshUntilMillis(); - SortedMap<String, double[]> pathSelectionWeights = - this.calculatePathSelectionProbabilities(consensus); - this.updateWeightsHistory(validAfterMillis, freshUntilMillis, - pathSelectionWeights); - } - } + private void processRelayNetworkConsensus( + RelayNetworkStatusConsensus consensus) { + long validAfterMillis = consensus.getValidAfterMillis(), + freshUntilMillis = consensus.getFreshUntilMillis(); + SortedMap<String, double[]> pathSelectionWeights = + this.calculatePathSelectionProbabilities(consensus); + this.updateWeightsHistory(validAfterMillis, freshUntilMillis, + pathSelectionWeights); }
private static final int HISTORY_UPDATER_WORKERS_NUM = 4;