commit 3a987142ce843cc65284cf00a5c02ede97d81b67
Author: Karsten Loesing <karsten.loesing(a)gmx.net>
Date: Tue Jul 2 09:01:52 2013 +0200
Change descriptor parsing model from pull to push.
---
.../torproject/onionoo/BandwidthDataWriter.java | 32 ++--
src/org/torproject/onionoo/DescriptorSource.java | 121 +++++++++----
src/org/torproject/onionoo/DetailsDataWriter.java | 135 +++++++--------
src/org/torproject/onionoo/Main.java | 31 ++--
src/org/torproject/onionoo/NodeDataWriter.java | 182 ++++++++------------
src/org/torproject/onionoo/WeightsDataWriter.java | 83 +++++----
6 files changed, 299 insertions(+), 285 deletions(-)
diff --git a/src/org/torproject/onionoo/BandwidthDataWriter.java b/src/org/torproject/onionoo/BandwidthDataWriter.java
index 7203b6d..2e0b8a7 100644
--- a/src/org/torproject/onionoo/BandwidthDataWriter.java
+++ b/src/org/torproject/onionoo/BandwidthDataWriter.java
@@ -36,7 +36,7 @@ import org.torproject.descriptor.ExtraInfoDescriptor;
* last 3 days in the bandwidth document may not be equivalent to the last
* 3 days as of publishing the document, but that's something clients can
* work around. */
-public class BandwidthDataWriter {
+public class BandwidthDataWriter implements DescriptorListener {
private DescriptorSource descriptorSource;
@@ -48,6 +48,7 @@ public class BandwidthDataWriter {
DocumentStore documentStore) {
this.descriptorSource = descriptorSource;
this.documentStore = documentStore;
+ this.registerDescriptorListeners();
}
private SimpleDateFormat dateTimeFormat = new SimpleDateFormat(
@@ -57,27 +58,24 @@ public class BandwidthDataWriter {
this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
}
- public void setCurrentNodes(
- SortedMap<String, NodeStatus> currentNodes) {
- this.currentFingerprints.addAll(currentNodes.keySet());
+ private void registerDescriptorListeners() {
+ this.descriptorSource.registerListener(this,
+ DescriptorType.RELAY_EXTRA_INFOS);
+ this.descriptorSource.registerListener(this,
+ DescriptorType.BRIDGE_EXTRA_INFOS);
}
- public void readExtraInfoDescriptors() {
- DescriptorQueue descriptorQueue =
- this.descriptorSource.getDescriptorQueue(
- new DescriptorType[] { DescriptorType.RELAY_EXTRA_INFOS,
- DescriptorType.BRIDGE_EXTRA_INFOS },
- DescriptorHistory.EXTRAINFO_HISTORY);
- Descriptor descriptor;
- while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
- if (descriptor instanceof ExtraInfoDescriptor) {
- ExtraInfoDescriptor extraInfoDescriptor =
- (ExtraInfoDescriptor) descriptor;
- this.parseDescriptor(extraInfoDescriptor);
- }
+ public void processDescriptor(Descriptor descriptor, boolean relay) {
+ if (descriptor instanceof ExtraInfoDescriptor) {
+ this.parseDescriptor((ExtraInfoDescriptor) descriptor);
}
}
+ public void setCurrentNodes(
+ SortedMap<String, NodeStatus> currentNodes) {
+ this.currentFingerprints.addAll(currentNodes.keySet());
+ }
+
private void parseDescriptor(ExtraInfoDescriptor descriptor) {
String fingerprint = descriptor.getFingerprint();
boolean updateHistory = false;
diff --git a/src/org/torproject/onionoo/DescriptorSource.java b/src/org/torproject/onionoo/DescriptorSource.java
index 23febc8..5868f0f 100644
--- a/src/org/torproject/onionoo/DescriptorSource.java
+++ b/src/org/torproject/onionoo/DescriptorSource.java
@@ -9,9 +9,12 @@ import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -24,20 +27,24 @@ enum DescriptorType {
RELAY_CONSENSUSES,
RELAY_SERVER_DESCRIPTORS,
RELAY_EXTRA_INFOS,
+ EXIT_LISTS,
BRIDGE_STATUSES,
BRIDGE_SERVER_DESCRIPTORS,
BRIDGE_EXTRA_INFOS,
BRIDGE_POOL_ASSIGNMENTS,
- EXIT_LISTS,
+}
+
+interface DescriptorListener {
+ abstract void processDescriptor(Descriptor descriptor, boolean relay);
}
enum DescriptorHistory {
- EXTRAINFO_HISTORY,
- EXIT_LIST_HISTORY,
- BRIDGE_POOLASSIGN_HISTORY,
- WEIGHTS_RELAY_CONSENSUS_HISTORY,
RELAY_CONSENSUS_HISTORY,
+ RELAY_EXTRAINFO_HISTORY,
+ EXIT_LIST_HISTORY,
BRIDGE_STATUS_HISTORY,
+ BRIDGE_EXTRAINFO_HISTORY,
+ BRIDGE_POOLASSIGN_HISTORY,
}
class DescriptorQueue {
@@ -108,8 +115,11 @@ class DescriptorQueue {
public void readHistoryFile(DescriptorHistory descriptorHistory) {
String historyFileName = null;
switch (descriptorHistory) {
- case EXTRAINFO_HISTORY:
- historyFileName = "extrainfo-history";
+ case RELAY_EXTRAINFO_HISTORY:
+ historyFileName = "relay-extrainfo-history";
+ break;
+ case BRIDGE_EXTRAINFO_HISTORY:
+ historyFileName = "bridge-extrainfo-history";
break;
case EXIT_LIST_HISTORY:
historyFileName = "exit-list-history";
@@ -117,9 +127,6 @@ class DescriptorQueue {
case BRIDGE_POOLASSIGN_HISTORY:
historyFileName = "bridge-poolassign-history";
break;
- case WEIGHTS_RELAY_CONSENSUS_HISTORY:
- historyFileName = "weights-relay-consensus-history";
- break;
case RELAY_CONSENSUS_HISTORY:
historyFileName = "relay-consensus-history";
break;
@@ -230,38 +237,92 @@ public class DescriptorSource {
this.inDir = inDir;
this.statusDir = statusDir;
this.descriptorQueues = new ArrayList<DescriptorQueue>();
+ this.descriptorListeners =
+ new HashMap<DescriptorType, Set<DescriptorListener>>();
}
- public DescriptorQueue getDescriptorQueue(
- DescriptorType descriptorType) {
+ private DescriptorQueue getDescriptorQueue(
+ DescriptorType descriptorType,
+ DescriptorHistory descriptorHistory) {
DescriptorQueue descriptorQueue = new DescriptorQueue(this.inDir,
this.statusDir);
descriptorQueue.addDirectory(descriptorType);
+ if (descriptorHistory != null) {
+ descriptorQueue.readHistoryFile(descriptorHistory);
+ }
this.descriptorQueues.add(descriptorQueue);
return descriptorQueue;
}
- public DescriptorQueue getDescriptorQueue(
- DescriptorType[] descriptorTypes,
- DescriptorHistory descriptorHistory) {
- DescriptorQueue descriptorQueue = new DescriptorQueue(this.inDir,
- this.statusDir);
- for (DescriptorType descriptorType : descriptorTypes) {
- descriptorQueue.addDirectory(descriptorType);
+ private Map<DescriptorType, Set<DescriptorListener>>
+ descriptorListeners;
+
+ public void registerListener(DescriptorListener listener,
+ DescriptorType descriptorType) {
+ if (!this.descriptorListeners.containsKey(descriptorType)) {
+ this.descriptorListeners.put(descriptorType,
+ new HashSet<DescriptorListener>());
}
- descriptorQueue.readHistoryFile(descriptorHistory);
- this.descriptorQueues.add(descriptorQueue);
- return descriptorQueue;
+ this.descriptorListeners.get(descriptorType).add(listener);
}
- public DescriptorQueue getDescriptorQueue(DescriptorType descriptorType,
- DescriptorHistory descriptorHistory) {
- DescriptorQueue descriptorQueue = new DescriptorQueue(this.inDir,
- this.statusDir);
- descriptorQueue.addDirectory(descriptorType);
- descriptorQueue.readHistoryFile(descriptorHistory);
- this.descriptorQueues.add(descriptorQueue);
- return descriptorQueue;
+ public void readRelayNetworkConsensuses() {
+ this.readDescriptors(DescriptorType.RELAY_CONSENSUSES,
+ DescriptorHistory.RELAY_CONSENSUS_HISTORY, true);
+ }
+
+ public void readRelayServerDescriptors() {
+ // TODO Use parse history as soon as all listeners can handle it.
+ this.readDescriptors(DescriptorType.RELAY_SERVER_DESCRIPTORS, null,
+ true);
+ }
+
+ public void readRelayExtraInfos() {
+ this.readDescriptors(DescriptorType.RELAY_EXTRA_INFOS,
+ DescriptorHistory.RELAY_EXTRAINFO_HISTORY, true);
+ }
+
+ public void readExitLists() {
+ this.readDescriptors(DescriptorType.EXIT_LISTS,
+ DescriptorHistory.EXIT_LIST_HISTORY, true);
+ }
+
+ public void readBridgeNetworkStatuses() {
+ this.readDescriptors(DescriptorType.BRIDGE_STATUSES,
+ DescriptorHistory.BRIDGE_STATUS_HISTORY, false);
+ }
+
+ public void readBridgeServerDescriptors() {
+ // TODO Use parse history as soon as all listeners can handle it.
+ this.readDescriptors(DescriptorType.BRIDGE_SERVER_DESCRIPTORS, null,
+ false);
+ }
+
+ public void readBridgeExtraInfos() {
+ this.readDescriptors(DescriptorType.BRIDGE_EXTRA_INFOS,
+ DescriptorHistory.BRIDGE_EXTRAINFO_HISTORY, false);
+ }
+
+ public void readBridgePoolAssignments() {
+ this.readDescriptors(DescriptorType.BRIDGE_POOL_ASSIGNMENTS,
+ DescriptorHistory.BRIDGE_POOLASSIGN_HISTORY, false);
+ }
+
+ private void readDescriptors(DescriptorType descriptorType,
+ DescriptorHistory descriptorHistory, boolean relay) {
+ Set<DescriptorListener> descriptorListeners =
+ this.descriptorListeners.get(descriptorType);
+ if (descriptorListeners == null || descriptorListeners.isEmpty()) {
+ return;
+ }
+ DescriptorQueue descriptorQueue = this.getDescriptorQueue(
+ descriptorType, descriptorHistory);
+ Descriptor descriptor;
+ while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
+ for (DescriptorListener descriptorListener : descriptorListeners) {
+ descriptorListener.processDescriptor(descriptor, relay);
+ }
+ }
}
public void writeHistoryFiles() {
diff --git a/src/org/torproject/onionoo/DetailsDataWriter.java b/src/org/torproject/onionoo/DetailsDataWriter.java
index e4ccb78..acb01d1 100644
--- a/src/org/torproject/onionoo/DetailsDataWriter.java
+++ b/src/org/torproject/onionoo/DetailsDataWriter.java
@@ -32,7 +32,7 @@ import org.torproject.descriptor.ServerDescriptor;
* The parts of details files coming from server descriptors always come
* from the last known descriptor of a relay or bridge, not from the
* descriptor that was last referenced in a network status. */
-public class DetailsDataWriter {
+public class DetailsDataWriter implements DescriptorListener {
private DescriptorSource descriptorSource;
@@ -50,6 +50,30 @@ public class DetailsDataWriter {
this.descriptorSource = descriptorSource;
this.reverseDomainNameResolver = reverseDomainNameResolver;
this.documentStore = documentStore;
+ this.registerDescriptorListeners();
+ }
+
+ private void registerDescriptorListeners() {
+ this.descriptorSource.registerListener(this,
+ DescriptorType.RELAY_SERVER_DESCRIPTORS);
+ this.descriptorSource.registerListener(this,
+ DescriptorType.BRIDGE_SERVER_DESCRIPTORS);
+ this.descriptorSource.registerListener(this,
+ DescriptorType.BRIDGE_POOL_ASSIGNMENTS);
+ this.descriptorSource.registerListener(this,
+ DescriptorType.EXIT_LISTS);
+ }
+
+ public void processDescriptor(Descriptor descriptor, boolean relay) {
+ if (descriptor instanceof ServerDescriptor && relay) {
+ this.processRelayServerDescriptor((ServerDescriptor) descriptor);
+ } else if (descriptor instanceof ServerDescriptor && !relay) {
+ this.processBridgeServerDescriptor((ServerDescriptor) descriptor);
+ } else if (descriptor instanceof BridgePoolAssignment) {
+ this.processBridgePoolAssignment((BridgePoolAssignment) descriptor);
+ } else if (descriptor instanceof ExitList) {
+ this.processExitList((ExitList) descriptor);
+ }
}
public void setCurrentNodes(
@@ -92,28 +116,19 @@ public class DetailsDataWriter {
private Map<String, ServerDescriptor> relayServerDescriptors =
new HashMap<String, ServerDescriptor>();
- public void readRelayServerDescriptors() {
+
+ private void processRelayServerDescriptor(
+ ServerDescriptor serverDescriptor) {
/* Don't remember which server descriptors we already parsed. If we
* parse a server descriptor now and first learn about the relay in a
* later consensus, we'll never write the descriptor content anywhere.
* The result would be details files containing no descriptor parts
* until the relay publishes the next descriptor. */
- DescriptorQueue descriptorQueue =
- this.descriptorSource.getDescriptorQueue(
- DescriptorType.RELAY_SERVER_DESCRIPTORS);
- Descriptor descriptor;
- while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
- if (descriptor instanceof ServerDescriptor) {
- ServerDescriptor serverDescriptor = (ServerDescriptor) descriptor;
- String fingerprint = serverDescriptor.getFingerprint();
- if (!this.relayServerDescriptors.containsKey(fingerprint) ||
- this.relayServerDescriptors.get(fingerprint).
- getPublishedMillis()
- < serverDescriptor.getPublishedMillis()) {
- this.relayServerDescriptors.put(fingerprint,
- serverDescriptor);
- }
- }
+ String fingerprint = serverDescriptor.getFingerprint();
+ if (!this.relayServerDescriptors.containsKey(fingerprint) ||
+ this.relayServerDescriptors.get(fingerprint).getPublishedMillis()
+ < serverDescriptor.getPublishedMillis()) {
+ this.relayServerDescriptors.put(fingerprint, serverDescriptor);
}
}
@@ -232,79 +247,53 @@ public class DetailsDataWriter {
}
private long now = System.currentTimeMillis();
+
private Map<String, Set<ExitListEntry>> exitListEntries =
new HashMap<String, Set<ExitListEntry>>();
- public void readExitLists() {
- DescriptorQueue descriptorQueue =
- this.descriptorSource.getDescriptorQueue(
- DescriptorType.EXIT_LISTS, DescriptorHistory.EXIT_LIST_HISTORY);
- Descriptor descriptor;
- while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
- if (descriptor instanceof ExitList) {
- ExitList exitList = (ExitList) descriptor;
- for (ExitListEntry exitListEntry :
- exitList.getExitListEntries()) {
- if (exitListEntry.getScanMillis() <
- this.now - 24L * 60L * 60L * 1000L) {
- continue;
- }
- String fingerprint = exitListEntry.getFingerprint();
- if (!this.exitListEntries.containsKey(fingerprint)) {
- this.exitListEntries.put(fingerprint,
- new HashSet<ExitListEntry>());
- }
- this.exitListEntries.get(fingerprint).add(exitListEntry);
- }
+
+ private void processExitList(ExitList exitList) {
+ for (ExitListEntry exitListEntry : exitList.getExitListEntries()) {
+ if (exitListEntry.getScanMillis() <
+ this.now - 24L * 60L * 60L * 1000L) {
+ continue;
}
+ String fingerprint = exitListEntry.getFingerprint();
+ if (!this.exitListEntries.containsKey(fingerprint)) {
+ this.exitListEntries.put(fingerprint,
+ new HashSet<ExitListEntry>());
+ }
+ this.exitListEntries.get(fingerprint).add(exitListEntry);
}
}
private Map<String, ServerDescriptor> bridgeServerDescriptors =
new HashMap<String, ServerDescriptor>();
- public void readBridgeServerDescriptors() {
+
+ private void processBridgeServerDescriptor(
+ ServerDescriptor serverDescriptor) {
/* Don't remember which server descriptors we already parsed. If we
* parse a server descriptor now and first learn about the relay in a
* later status, we'll never write the descriptor content anywhere.
* The result would be details files containing no descriptor parts
* until the bridge publishes the next descriptor. */
- DescriptorQueue descriptorQueue =
- this.descriptorSource.getDescriptorQueue(
- DescriptorType.BRIDGE_SERVER_DESCRIPTORS);
- Descriptor descriptor;
- while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
- if (descriptor instanceof ServerDescriptor) {
- ServerDescriptor serverDescriptor = (ServerDescriptor) descriptor;
- String fingerprint = serverDescriptor.getFingerprint();
- if (!this.bridgeServerDescriptors.containsKey(fingerprint) ||
- this.bridgeServerDescriptors.get(fingerprint).
- getPublishedMillis()
- < serverDescriptor.getPublishedMillis()) {
- this.bridgeServerDescriptors.put(fingerprint,
- serverDescriptor);
- }
- }
+ String fingerprint = serverDescriptor.getFingerprint();
+ if (!this.bridgeServerDescriptors.containsKey(fingerprint) ||
+ this.bridgeServerDescriptors.get(fingerprint).getPublishedMillis()
+ < serverDescriptor.getPublishedMillis()) {
+ this.bridgeServerDescriptors.put(fingerprint, serverDescriptor);
}
}
private Map<String, String> bridgePoolAssignments =
new HashMap<String, String>();
- public void readBridgePoolAssignments() {
- DescriptorQueue descriptorQueue =
- this.descriptorSource.getDescriptorQueue(
- DescriptorType.BRIDGE_POOL_ASSIGNMENTS,
- DescriptorHistory.BRIDGE_POOLASSIGN_HISTORY);
- Descriptor descriptor;
- while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
- if (descriptor instanceof BridgePoolAssignment) {
- BridgePoolAssignment bridgePoolAssignment =
- (BridgePoolAssignment) descriptor;
- for (Map.Entry<String, String> e :
- bridgePoolAssignment.getEntries().entrySet()) {
- String fingerprint = e.getKey();
- String details = e.getValue();
- this.bridgePoolAssignments.put(fingerprint, details);
- }
- }
+
+ private void processBridgePoolAssignment(
+ BridgePoolAssignment bridgePoolAssignment) {
+ for (Map.Entry<String, String> e :
+ bridgePoolAssignment.getEntries().entrySet()) {
+ String fingerprint = e.getKey();
+ String details = e.getValue();
+ this.bridgePoolAssignments.put(fingerprint, details);
}
}
diff --git a/src/org/torproject/onionoo/Main.java b/src/org/torproject/onionoo/Main.java
index 9caac06..9c02319 100644
--- a/src/org/torproject/onionoo/Main.java
+++ b/src/org/torproject/onionoo/Main.java
@@ -30,30 +30,27 @@ public class Main {
WeightsDataWriter wdw = new WeightsDataWriter(dso, ds);
printStatusTime("Initialized weights data writer");
- // TODO Instead of creating nine, partly overlapping descriptor
- // queues, register for descriptor type and let DescriptorSource
- // parse everything just once.
printStatus("Reading descriptors.");
- ndw.readRelayNetworkConsensuses();
- printStatusTime("Read network status consensuses");
- ndw.readBridgeNetworkStatuses();
- printStatusTime("Read bridge network statuses");
- ddw.readRelayServerDescriptors();
+ dso.readRelayNetworkConsensuses();
+ printStatusTime("Read relay network consensuses");
+ dso.readRelayServerDescriptors();
printStatusTime("Read relay server descriptors");
- ddw.readExitLists();
+ dso.readRelayExtraInfos();
+ printStatusTime("Read relay extra-info descriptors");
+ dso.readExitLists();
printStatusTime("Read exit lists");
- ddw.readBridgeServerDescriptors();
+ dso.readBridgeNetworkStatuses();
+ printStatusTime("Read bridge network statuses");
+ dso.readBridgeServerDescriptors();
printStatusTime("Read bridge server descriptors");
- ddw.readBridgePoolAssignments();
+ dso.readBridgeExtraInfos();
+ printStatusTime("Read bridge extra-info descriptors");
+ dso.readBridgePoolAssignments();
printStatusTime("Read bridge-pool assignments");
- bdw.readExtraInfoDescriptors();
- printStatusTime("Read extra-info descriptors");
- wdw.readRelayServerDescriptors();
- printStatusTime("Read relay server descriptors");
- wdw.readRelayNetworkConsensuses();
- printStatusTime("Read relay network consensuses");
printStatus("Updating internal node list.");
+ ndw.readStatusSummary();
+ printStatusTime("Read status summary");
ndw.lookUpCitiesAndASes();
printStatusTime("Looked up cities and ASes");
ndw.setRunningBits();
diff --git a/src/org/torproject/onionoo/NodeDataWriter.java b/src/org/torproject/onionoo/NodeDataWriter.java
index b2d7e32..69ce01b 100644
--- a/src/org/torproject/onionoo/NodeDataWriter.java
+++ b/src/org/torproject/onionoo/NodeDataWriter.java
@@ -16,7 +16,7 @@ import org.torproject.onionoo.LookupService.LookupResult;
/* Store relays and bridges that have been running in the past seven
* days. */
-public class NodeDataWriter {
+public class NodeDataWriter implements DescriptorListener {
private DescriptorSource descriptorSource;
@@ -33,63 +33,29 @@ public class NodeDataWriter {
private SortedMap<String, Integer> lastBandwidthWeights = null;
- private int relaysUpdated = 0, relaysAdded = 0,
- relayConsensusesProcessed = 0, bridgesUpdated = 0,
- bridgesAdded = 0, bridgeStatusesProcessed = 0;
+ private int relayConsensusesProcessed = 0, bridgeStatusesProcessed = 0;
+
public NodeDataWriter(DescriptorSource descriptorSource,
LookupService lookupService, DocumentStore documentStore) {
this.descriptorSource = descriptorSource;
this.lookupService = lookupService;
this.documentStore = documentStore;
- this.readStatusSummary();
+ this.registerDescriptorListeners();
}
- private void readStatusSummary() {
- SortedSet<String> fingerprints = this.documentStore.list(
- NodeStatus.class, true);
- for (String fingerprint : fingerprints) {
- NodeStatus node = this.documentStore.retrieve(NodeStatus.class,
- true, fingerprint);
- if (node.isRelay()) {
- this.relaysLastValidAfterMillis = Math.max(
- this.relaysLastValidAfterMillis, node.getLastSeenMillis());
- } else {
- this.bridgesLastPublishedMillis = Math.max(
- this.bridgesLastPublishedMillis, node.getLastSeenMillis());
- }
- this.knownNodes.put(fingerprint, node);
- }
+ private void registerDescriptorListeners() {
+ this.descriptorSource.registerListener(this,
+ DescriptorType.RELAY_CONSENSUSES);
+ this.descriptorSource.registerListener(this,
+ DescriptorType.BRIDGE_STATUSES);
}
- public void readRelayNetworkConsensuses() {
- if (this.descriptorSource == null) {
- System.err.println("Not configured to read relay network "
- + "consensuses.");
- return;
- }
- DescriptorQueue descriptorQueue =
- this.descriptorSource.getDescriptorQueue(
- DescriptorType.RELAY_CONSENSUSES,
- DescriptorHistory.RELAY_CONSENSUS_HISTORY);
- Descriptor descriptor;
- while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
- if (descriptor instanceof RelayNetworkStatusConsensus) {
- updateRelayNetworkStatusConsensus(
- (RelayNetworkStatusConsensus) descriptor);
- }
- }
- }
-
- public void setRunningBits() {
- for (NodeStatus node : this.knownNodes.values()) {
- if (node.isRelay() &&
- node.getLastSeenMillis() == this.relaysLastValidAfterMillis) {
- node.setRunning(true);
- }
- if (!node.isRelay() &&
- node.getLastSeenMillis() == this.bridgesLastPublishedMillis) {
- node.setRunning(true);
- }
+ public void processDescriptor(Descriptor descriptor, boolean relay) {
+ if (descriptor instanceof RelayNetworkStatusConsensus) {
+ updateRelayNetworkStatusConsensus(
+ (RelayNetworkStatusConsensus) descriptor);
+ } else if (descriptor instanceof BridgeNetworkStatus) {
+ updateBridgeNetworkStatus((BridgeNetworkStatus) descriptor);
}
}
@@ -119,10 +85,8 @@ public class NodeDataWriter {
validAfterMillis, null);
if (this.knownNodes.containsKey(fingerprint)) {
this.knownNodes.get(fingerprint).update(newNodeStatus);
- this.relaysUpdated++;
} else {
this.knownNodes.put(fingerprint, newNodeStatus);
- this.relaysAdded++;
}
}
this.relayConsensusesProcessed++;
@@ -131,6 +95,67 @@ public class NodeDataWriter {
}
}
+ private void updateBridgeNetworkStatus(BridgeNetworkStatus status) {
+ long publishedMillis = status.getPublishedMillis();
+ if (publishedMillis > this.bridgesLastPublishedMillis) {
+ this.bridgesLastPublishedMillis = publishedMillis;
+ }
+ for (NetworkStatusEntry entry : status.getStatusEntries().values()) {
+ String nickname = entry.getNickname();
+ String fingerprint = entry.getFingerprint();
+ String address = entry.getAddress();
+ SortedSet<String> orAddressesAndPorts = new TreeSet<String>(
+ entry.getOrAddresses());
+ int orPort = entry.getOrPort();
+ int dirPort = entry.getDirPort();
+ SortedSet<String> relayFlags = entry.getFlags();
+ NodeStatus newNodeStatus = new NodeStatus(false, nickname,
+ fingerprint, address, orAddressesAndPorts, null,
+ publishedMillis, orPort, dirPort, relayFlags, -1L, "??", null,
+ -1L, null, null, publishedMillis, -1L, null);
+ if (this.knownNodes.containsKey(fingerprint)) {
+ this.knownNodes.get(fingerprint).update(newNodeStatus);
+ } else {
+ this.knownNodes.put(fingerprint, newNodeStatus);
+ }
+ }
+ this.bridgeStatusesProcessed++;
+ }
+
+ public void readStatusSummary() {
+ SortedSet<String> fingerprints = this.documentStore.list(
+ NodeStatus.class, true);
+ for (String fingerprint : fingerprints) {
+ NodeStatus node = this.documentStore.retrieve(NodeStatus.class,
+ true, fingerprint);
+ if (node.isRelay()) {
+ this.relaysLastValidAfterMillis = Math.max(
+ this.relaysLastValidAfterMillis, node.getLastSeenMillis());
+ } else {
+ this.bridgesLastPublishedMillis = Math.max(
+ this.bridgesLastPublishedMillis, node.getLastSeenMillis());
+ }
+ if (this.knownNodes.containsKey(fingerprint)) {
+ this.knownNodes.get(fingerprint).update(node);
+ } else {
+ this.knownNodes.put(fingerprint, node);
+ }
+ }
+ }
+
+ public void setRunningBits() {
+ for (NodeStatus node : this.knownNodes.values()) {
+ if (node.isRelay() &&
+ node.getLastSeenMillis() == this.relaysLastValidAfterMillis) {
+ node.setRunning(true);
+ }
+ if (!node.isRelay() &&
+ node.getLastSeenMillis() == this.bridgesLastPublishedMillis) {
+ node.setRunning(true);
+ }
+ }
+ }
+
public void lookUpCitiesAndASes() {
SortedSet<String> addressStrings = new TreeSet<String>();
for (NodeStatus node : this.knownNodes.values()) {
@@ -164,53 +189,6 @@ public class NodeDataWriter {
}
}
- public void readBridgeNetworkStatuses() {
- if (this.descriptorSource == null) {
- System.err.println("Not configured to read bridge network "
- + "statuses.");
- return;
- }
- DescriptorQueue descriptorQueue =
- this.descriptorSource.getDescriptorQueue(
- DescriptorType.BRIDGE_STATUSES,
- DescriptorHistory.BRIDGE_STATUS_HISTORY);
- Descriptor descriptor;
- while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
- if (descriptor instanceof BridgeNetworkStatus) {
- updateBridgeNetworkStatus((BridgeNetworkStatus) descriptor);
- }
- }
- }
-
- private void updateBridgeNetworkStatus(BridgeNetworkStatus status) {
- long publishedMillis = status.getPublishedMillis();
- if (publishedMillis > this.bridgesLastPublishedMillis) {
- this.bridgesLastPublishedMillis = publishedMillis;
- }
- for (NetworkStatusEntry entry : status.getStatusEntries().values()) {
- String nickname = entry.getNickname();
- String fingerprint = entry.getFingerprint();
- String address = entry.getAddress();
- SortedSet<String> orAddressesAndPorts = new TreeSet<String>(
- entry.getOrAddresses());
- int orPort = entry.getOrPort();
- int dirPort = entry.getDirPort();
- SortedSet<String> relayFlags = entry.getFlags();
- NodeStatus newNodeStatus = new NodeStatus(false, nickname,
- fingerprint, address, orAddressesAndPorts, null,
- publishedMillis, orPort, dirPort, relayFlags, -1L, "??", null,
- -1L, null, null, publishedMillis, -1L, null);
- if (this.knownNodes.containsKey(fingerprint)) {
- this.knownNodes.get(fingerprint).update(newNodeStatus);
- this.bridgesUpdated++;
- } else {
- this.knownNodes.put(fingerprint, newNodeStatus);
- this.bridgesAdded++;
- }
- }
- this.bridgeStatusesProcessed++;
- }
-
public void writeStatusSummary() {
this.writeSummary(true);
}
@@ -248,16 +226,8 @@ public class NodeDataWriter {
StringBuilder sb = new StringBuilder();
sb.append(" " + formatDecimalNumber(relayConsensusesProcessed)
+ " relay consensuses processed\n");
- sb.append(" " + formatDecimalNumber(relaysUpdated)
- + " relays updated\n");
- sb.append(" " + formatDecimalNumber(relaysAdded)
- + " relays added\n");
sb.append(" " + formatDecimalNumber(bridgeStatusesProcessed)
+ " bridge statuses processed\n");
- sb.append(" " + formatDecimalNumber(bridgesUpdated)
- + " bridges updated\n");
- sb.append(" " + formatDecimalNumber(bridgesAdded)
- + " bridges added\n");
return sb.toString();
}
diff --git a/src/org/torproject/onionoo/WeightsDataWriter.java b/src/org/torproject/onionoo/WeightsDataWriter.java
index 317b8e2..e388ddd 100644
--- a/src/org/torproject/onionoo/WeightsDataWriter.java
+++ b/src/org/torproject/onionoo/WeightsDataWriter.java
@@ -23,7 +23,7 @@ import org.torproject.descriptor.NetworkStatusEntry;
import org.torproject.descriptor.RelayNetworkStatusConsensus;
import org.torproject.descriptor.ServerDescriptor;
-public class WeightsDataWriter {
+public class WeightsDataWriter implements DescriptorListener {
private DescriptorSource descriptorSource;
@@ -35,6 +35,23 @@ public class WeightsDataWriter {
DocumentStore documentStore) {
this.descriptorSource = descriptorSource;
this.documentStore = documentStore;
+ this.registerDescriptorListeners();
+ }
+
+ private void registerDescriptorListeners() {
+ this.descriptorSource.registerListener(this,
+ DescriptorType.RELAY_CONSENSUSES);
+ this.descriptorSource.registerListener(this,
+ DescriptorType.RELAY_SERVER_DESCRIPTORS);
+ }
+
+ public void processDescriptor(Descriptor descriptor, boolean relay) {
+ if (descriptor instanceof ServerDescriptor) {
+ this.processRelayServerDescriptor((ServerDescriptor) descriptor);
+ } else if (descriptor instanceof RelayNetworkStatusConsensus) {
+ this.processRelayNetworkConsensus(
+ (RelayNetworkStatusConsensus) descriptor);
+ }
}
public void setCurrentNodes(
@@ -42,51 +59,33 @@ public class WeightsDataWriter {
this.currentFingerprints.addAll(currentNodes.keySet());
}
- /* Read advertised bandwidths of all server descriptors in
- * in/relay-descriptors/server-descriptors/ to memory. Ideally, we'd
- * skip descriptors that we read before and obtain their advertised
- * bandwidths from some temp file. This approach should do for now,
- * though. */
private Map<String, Integer> advertisedBandwidths =
new HashMap<String, Integer>();
- public void readRelayServerDescriptors() {
- DescriptorQueue descriptorQueue =
- this.descriptorSource.getDescriptorQueue(
- DescriptorType.RELAY_SERVER_DESCRIPTORS);
- Descriptor descriptor;
- while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
- if (descriptor instanceof ServerDescriptor) {
- ServerDescriptor serverDescriptor =
- (ServerDescriptor) descriptor;
- String digest = serverDescriptor.getServerDescriptorDigest().
- toUpperCase();
- int advertisedBandwidth = Math.min(Math.min(
- serverDescriptor.getBandwidthBurst(),
- serverDescriptor.getBandwidthObserved()),
- serverDescriptor.getBandwidthRate());
- this.advertisedBandwidths.put(digest, advertisedBandwidth);
- }
- }
+
+ private void processRelayServerDescriptor(
+ ServerDescriptor serverDescriptor) {
+ /* Read advertised bandwidths of all server descriptors in
+ * in/relay-descriptors/server-descriptors/ to memory. Ideally, we'd
+ * skip descriptors that we read before and obtain their advertised
+ * bandwidths from some temp file. This approach should do for now,
+ * though. */
+ String digest = serverDescriptor.getServerDescriptorDigest().
+ toUpperCase();
+ int advertisedBandwidth = Math.min(Math.min(
+ serverDescriptor.getBandwidthBurst(),
+ serverDescriptor.getBandwidthObserved()),
+ serverDescriptor.getBandwidthRate());
+ this.advertisedBandwidths.put(digest, advertisedBandwidth);
}
- public void readRelayNetworkConsensuses() {
- DescriptorQueue descriptorQueue =
- this.descriptorSource.getDescriptorQueue(
- DescriptorType.RELAY_CONSENSUSES,
- DescriptorHistory.WEIGHTS_RELAY_CONSENSUS_HISTORY);
- Descriptor descriptor;
- while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
- if (descriptor instanceof RelayNetworkStatusConsensus) {
- RelayNetworkStatusConsensus consensus =
- (RelayNetworkStatusConsensus) descriptor;
- long validAfterMillis = consensus.getValidAfterMillis(),
- freshUntilMillis = consensus.getFreshUntilMillis();
- SortedMap<String, double[]> pathSelectionWeights =
- this.calculatePathSelectionProbabilities(consensus);
- this.updateWeightsHistory(validAfterMillis, freshUntilMillis,
- pathSelectionWeights);
- }
- }
+ private void processRelayNetworkConsensus(
+ RelayNetworkStatusConsensus consensus) {
+ long validAfterMillis = consensus.getValidAfterMillis(),
+ freshUntilMillis = consensus.getFreshUntilMillis();
+ SortedMap<String, double[]> pathSelectionWeights =
+ this.calculatePathSelectionProbabilities(consensus);
+ this.updateWeightsHistory(validAfterMillis, freshUntilMillis,
+ pathSelectionWeights);
}
private static final int HISTORY_UPDATER_WORKERS_NUM = 4;