[tor-commits] [onionoo/master] Change descriptor parsing model from pull to push.

karsten at torproject.org karsten at torproject.org
Sun Jul 7 14:16:48 UTC 2013


commit 3a987142ce843cc65284cf00a5c02ede97d81b67
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Tue Jul 2 09:01:52 2013 +0200

    Change descriptor parsing model from pull to push.
---
 .../torproject/onionoo/BandwidthDataWriter.java    |   32 ++--
 src/org/torproject/onionoo/DescriptorSource.java   |  121 +++++++++----
 src/org/torproject/onionoo/DetailsDataWriter.java  |  135 +++++++--------
 src/org/torproject/onionoo/Main.java               |   31 ++--
 src/org/torproject/onionoo/NodeDataWriter.java     |  182 ++++++++------------
 src/org/torproject/onionoo/WeightsDataWriter.java  |   83 +++++----
 6 files changed, 299 insertions(+), 285 deletions(-)

diff --git a/src/org/torproject/onionoo/BandwidthDataWriter.java b/src/org/torproject/onionoo/BandwidthDataWriter.java
index 7203b6d..2e0b8a7 100644
--- a/src/org/torproject/onionoo/BandwidthDataWriter.java
+++ b/src/org/torproject/onionoo/BandwidthDataWriter.java
@@ -36,7 +36,7 @@ import org.torproject.descriptor.ExtraInfoDescriptor;
  * last 3 days in the bandwidth document may not be equivalent to the last
  * 3 days as of publishing the document, but that's something clients can
  * work around. */
-public class BandwidthDataWriter {
+public class BandwidthDataWriter implements DescriptorListener {
 
   private DescriptorSource descriptorSource;
 
@@ -48,6 +48,7 @@ public class BandwidthDataWriter {
       DocumentStore documentStore) {
     this.descriptorSource = descriptorSource;
     this.documentStore = documentStore;
+    this.registerDescriptorListeners();
   }
 
   private SimpleDateFormat dateTimeFormat = new SimpleDateFormat(
@@ -57,27 +58,24 @@ public class BandwidthDataWriter {
     this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
   }
 
-  public void setCurrentNodes(
-      SortedMap<String, NodeStatus> currentNodes) {
-    this.currentFingerprints.addAll(currentNodes.keySet());
+  private void registerDescriptorListeners() {
+    this.descriptorSource.registerListener(this,
+        DescriptorType.RELAY_EXTRA_INFOS);
+    this.descriptorSource.registerListener(this,
+        DescriptorType.BRIDGE_EXTRA_INFOS);
   }
 
-  public void readExtraInfoDescriptors() {
-    DescriptorQueue descriptorQueue =
-        this.descriptorSource.getDescriptorQueue(
-        new DescriptorType[] { DescriptorType.RELAY_EXTRA_INFOS,
-        DescriptorType.BRIDGE_EXTRA_INFOS },
-        DescriptorHistory.EXTRAINFO_HISTORY);
-    Descriptor descriptor;
-    while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
-      if (descriptor instanceof ExtraInfoDescriptor) {
-        ExtraInfoDescriptor extraInfoDescriptor =
-            (ExtraInfoDescriptor) descriptor;
-        this.parseDescriptor(extraInfoDescriptor);
-      }
+  public void processDescriptor(Descriptor descriptor, boolean relay) {
+    if (descriptor instanceof ExtraInfoDescriptor) {
+      this.parseDescriptor((ExtraInfoDescriptor) descriptor);
     }
   }
 
+  public void setCurrentNodes(
+      SortedMap<String, NodeStatus> currentNodes) {
+    this.currentFingerprints.addAll(currentNodes.keySet());
+  }
+
   private void parseDescriptor(ExtraInfoDescriptor descriptor) {
     String fingerprint = descriptor.getFingerprint();
     boolean updateHistory = false;
diff --git a/src/org/torproject/onionoo/DescriptorSource.java b/src/org/torproject/onionoo/DescriptorSource.java
index 23febc8..5868f0f 100644
--- a/src/org/torproject/onionoo/DescriptorSource.java
+++ b/src/org/torproject/onionoo/DescriptorSource.java
@@ -9,9 +9,12 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -24,20 +27,24 @@ enum DescriptorType {
   RELAY_CONSENSUSES,
   RELAY_SERVER_DESCRIPTORS,
   RELAY_EXTRA_INFOS,
+  EXIT_LISTS,
   BRIDGE_STATUSES,
   BRIDGE_SERVER_DESCRIPTORS,
   BRIDGE_EXTRA_INFOS,
   BRIDGE_POOL_ASSIGNMENTS,
-  EXIT_LISTS,
+}
+
+interface DescriptorListener {
+  abstract void processDescriptor(Descriptor descriptor, boolean relay);
 }
 
 enum DescriptorHistory {
-  EXTRAINFO_HISTORY,
-  EXIT_LIST_HISTORY,
-  BRIDGE_POOLASSIGN_HISTORY,
-  WEIGHTS_RELAY_CONSENSUS_HISTORY,
   RELAY_CONSENSUS_HISTORY,
+  RELAY_EXTRAINFO_HISTORY,
+  EXIT_LIST_HISTORY,
   BRIDGE_STATUS_HISTORY,
+  BRIDGE_EXTRAINFO_HISTORY,
+  BRIDGE_POOLASSIGN_HISTORY,
 }
 
 class DescriptorQueue {
@@ -108,8 +115,11 @@ class DescriptorQueue {
   public void readHistoryFile(DescriptorHistory descriptorHistory) {
     String historyFileName = null;
     switch (descriptorHistory) {
-    case EXTRAINFO_HISTORY:
-      historyFileName = "extrainfo-history";
+    case RELAY_EXTRAINFO_HISTORY:
+      historyFileName = "relay-extrainfo-history";
+      break;
+    case BRIDGE_EXTRAINFO_HISTORY:
+      historyFileName = "bridge-extrainfo-history";
       break;
     case EXIT_LIST_HISTORY:
       historyFileName = "exit-list-history";
@@ -117,9 +127,6 @@ class DescriptorQueue {
     case BRIDGE_POOLASSIGN_HISTORY:
       historyFileName = "bridge-poolassign-history";
       break;
-    case WEIGHTS_RELAY_CONSENSUS_HISTORY:
-      historyFileName = "weights-relay-consensus-history";
-      break;
     case RELAY_CONSENSUS_HISTORY:
       historyFileName = "relay-consensus-history";
       break;
@@ -230,38 +237,92 @@ public class DescriptorSource {
     this.inDir = inDir;
     this.statusDir = statusDir;
     this.descriptorQueues = new ArrayList<DescriptorQueue>();
+    this.descriptorListeners =
+        new HashMap<DescriptorType, Set<DescriptorListener>>();
   }
 
-  public DescriptorQueue getDescriptorQueue(
-      DescriptorType descriptorType) {
+  private DescriptorQueue getDescriptorQueue(
+      DescriptorType descriptorType,
+      DescriptorHistory descriptorHistory) {
     DescriptorQueue descriptorQueue = new DescriptorQueue(this.inDir,
         this.statusDir);
     descriptorQueue.addDirectory(descriptorType);
+    if (descriptorHistory != null) {
+      descriptorQueue.readHistoryFile(descriptorHistory);
+    }
     this.descriptorQueues.add(descriptorQueue);
     return descriptorQueue;
   }
 
-  public DescriptorQueue getDescriptorQueue(
-      DescriptorType[] descriptorTypes,
-      DescriptorHistory descriptorHistory) {
-    DescriptorQueue descriptorQueue = new DescriptorQueue(this.inDir,
-        this.statusDir);
-    for (DescriptorType descriptorType : descriptorTypes) {
-      descriptorQueue.addDirectory(descriptorType);
+  private Map<DescriptorType, Set<DescriptorListener>>
+      descriptorListeners;
+
+  public void registerListener(DescriptorListener listener,
+      DescriptorType descriptorType) {
+    if (!this.descriptorListeners.containsKey(descriptorType)) {
+      this.descriptorListeners.put(descriptorType,
+          new HashSet<DescriptorListener>());
     }
-    descriptorQueue.readHistoryFile(descriptorHistory);
-    this.descriptorQueues.add(descriptorQueue);
-    return descriptorQueue;
+    this.descriptorListeners.get(descriptorType).add(listener);
   }
 
-  public DescriptorQueue getDescriptorQueue(DescriptorType descriptorType,
-      DescriptorHistory descriptorHistory) {
-    DescriptorQueue descriptorQueue = new DescriptorQueue(this.inDir,
-        this.statusDir);
-    descriptorQueue.addDirectory(descriptorType);
-    descriptorQueue.readHistoryFile(descriptorHistory);
-    this.descriptorQueues.add(descriptorQueue);
-    return descriptorQueue;
+  public void readRelayNetworkConsensuses() {
+    this.readDescriptors(DescriptorType.RELAY_CONSENSUSES,
+        DescriptorHistory.RELAY_CONSENSUS_HISTORY, true);
+  }
+
+  public void readRelayServerDescriptors() {
+    // TODO Use parse history as soon as all listeners can handle it.
+    this.readDescriptors(DescriptorType.RELAY_SERVER_DESCRIPTORS, null,
+        true);
+  }
+
+  public void readRelayExtraInfos() {
+    this.readDescriptors(DescriptorType.RELAY_EXTRA_INFOS,
+        DescriptorHistory.RELAY_EXTRAINFO_HISTORY, true);
+  }
+
+  public void readExitLists() {
+    this.readDescriptors(DescriptorType.EXIT_LISTS,
+        DescriptorHistory.EXIT_LIST_HISTORY, true);
+  }
+
+  public void readBridgeNetworkStatuses() {
+    this.readDescriptors(DescriptorType.BRIDGE_STATUSES,
+        DescriptorHistory.BRIDGE_STATUS_HISTORY, false);
+  }
+
+  public void readBridgeServerDescriptors() {
+    // TODO Use parse history as soon as all listeners can handle it.
+    this.readDescriptors(DescriptorType.BRIDGE_SERVER_DESCRIPTORS, null,
+        false);
+  }
+
+  public void readBridgeExtraInfos() {
+    this.readDescriptors(DescriptorType.BRIDGE_EXTRA_INFOS,
+        DescriptorHistory.BRIDGE_EXTRAINFO_HISTORY, false);
+  }
+
+  public void readBridgePoolAssignments() {
+    this.readDescriptors(DescriptorType.BRIDGE_POOL_ASSIGNMENTS,
+        DescriptorHistory.BRIDGE_POOLASSIGN_HISTORY, false);
+  }
+
+  private void readDescriptors(DescriptorType descriptorType,
+      DescriptorHistory descriptorHistory, boolean relay) {
+    Set<DescriptorListener> descriptorListeners =
+        this.descriptorListeners.get(descriptorType);
+    if (descriptorListeners == null || descriptorListeners.isEmpty()) {
+      return;
+    }
+    DescriptorQueue descriptorQueue = this.getDescriptorQueue(
+        descriptorType, descriptorHistory);
+    Descriptor descriptor;
+    while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
+      for (DescriptorListener descriptorListener : descriptorListeners) {
+        descriptorListener.processDescriptor(descriptor, relay);
+      }
+    }
   }
 
   public void writeHistoryFiles() {
diff --git a/src/org/torproject/onionoo/DetailsDataWriter.java b/src/org/torproject/onionoo/DetailsDataWriter.java
index e4ccb78..acb01d1 100644
--- a/src/org/torproject/onionoo/DetailsDataWriter.java
+++ b/src/org/torproject/onionoo/DetailsDataWriter.java
@@ -32,7 +32,7 @@ import org.torproject.descriptor.ServerDescriptor;
  * The parts of details files coming from server descriptors always come
  * from the last known descriptor of a relay or bridge, not from the
  * descriptor that was last referenced in a network status. */
-public class DetailsDataWriter {
+public class DetailsDataWriter implements DescriptorListener {
 
   private DescriptorSource descriptorSource;
 
@@ -50,6 +50,30 @@ public class DetailsDataWriter {
     this.descriptorSource = descriptorSource;
     this.reverseDomainNameResolver = reverseDomainNameResolver;
     this.documentStore = documentStore;
+    this.registerDescriptorListeners();
+  }
+
+  private void registerDescriptorListeners() {
+    this.descriptorSource.registerListener(this,
+        DescriptorType.RELAY_SERVER_DESCRIPTORS);
+    this.descriptorSource.registerListener(this,
+        DescriptorType.BRIDGE_SERVER_DESCRIPTORS);
+    this.descriptorSource.registerListener(this,
+        DescriptorType.BRIDGE_POOL_ASSIGNMENTS);
+    this.descriptorSource.registerListener(this,
+        DescriptorType.EXIT_LISTS);
+  }
+
+  public void processDescriptor(Descriptor descriptor, boolean relay) {
+    if (descriptor instanceof ServerDescriptor && relay) {
+      this.processRelayServerDescriptor((ServerDescriptor) descriptor);
+    } else if (descriptor instanceof ServerDescriptor && !relay) {
+      this.processBridgeServerDescriptor((ServerDescriptor) descriptor);
+    } else if (descriptor instanceof BridgePoolAssignment) {
+      this.processBridgePoolAssignment((BridgePoolAssignment) descriptor);
+    } else if (descriptor instanceof ExitList) {
+      this.processExitList((ExitList) descriptor);
+    }
   }
 
   public void setCurrentNodes(
@@ -92,28 +116,19 @@ public class DetailsDataWriter {
 
   private Map<String, ServerDescriptor> relayServerDescriptors =
       new HashMap<String, ServerDescriptor>();
-  public void readRelayServerDescriptors() {
+
+  private void processRelayServerDescriptor(
+      ServerDescriptor serverDescriptor) {
     /* Don't remember which server descriptors we already parsed.  If we
      * parse a server descriptor now and first learn about the relay in a
      * later consensus, we'll never write the descriptor content anywhere.
      * The result would be details files containing no descriptor parts
      * until the relay publishes the next descriptor. */
-    DescriptorQueue descriptorQueue =
-        this.descriptorSource.getDescriptorQueue(
-        DescriptorType.RELAY_SERVER_DESCRIPTORS);
-    Descriptor descriptor;
-    while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
-      if (descriptor instanceof ServerDescriptor) {
-        ServerDescriptor serverDescriptor = (ServerDescriptor) descriptor;
-        String fingerprint = serverDescriptor.getFingerprint();
-        if (!this.relayServerDescriptors.containsKey(fingerprint) ||
-            this.relayServerDescriptors.get(fingerprint).
-            getPublishedMillis()
-            < serverDescriptor.getPublishedMillis()) {
-          this.relayServerDescriptors.put(fingerprint,
-              serverDescriptor);
-        }
-      }
+    String fingerprint = serverDescriptor.getFingerprint();
+    if (!this.relayServerDescriptors.containsKey(fingerprint) ||
+        this.relayServerDescriptors.get(fingerprint).getPublishedMillis()
+        < serverDescriptor.getPublishedMillis()) {
+      this.relayServerDescriptors.put(fingerprint, serverDescriptor);
     }
   }
 
@@ -232,79 +247,53 @@ public class DetailsDataWriter {
   }
 
   private long now = System.currentTimeMillis();
+
   private Map<String, Set<ExitListEntry>> exitListEntries =
       new HashMap<String, Set<ExitListEntry>>();
-  public void readExitLists() {
-    DescriptorQueue descriptorQueue =
-        this.descriptorSource.getDescriptorQueue(
-        DescriptorType.EXIT_LISTS, DescriptorHistory.EXIT_LIST_HISTORY);
-    Descriptor descriptor;
-    while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
-      if (descriptor instanceof ExitList) {
-        ExitList exitList = (ExitList) descriptor;
-        for (ExitListEntry exitListEntry :
-            exitList.getExitListEntries()) {
-          if (exitListEntry.getScanMillis() <
-              this.now - 24L * 60L * 60L * 1000L) {
-            continue;
-          }
-          String fingerprint = exitListEntry.getFingerprint();
-          if (!this.exitListEntries.containsKey(fingerprint)) {
-            this.exitListEntries.put(fingerprint,
-                new HashSet<ExitListEntry>());
-          }
-          this.exitListEntries.get(fingerprint).add(exitListEntry);
-        }
+
+  private void processExitList(ExitList exitList) {
+    for (ExitListEntry exitListEntry : exitList.getExitListEntries()) {
+      if (exitListEntry.getScanMillis() <
+          this.now - 24L * 60L * 60L * 1000L) {
+        continue;
       }
+      String fingerprint = exitListEntry.getFingerprint();
+      if (!this.exitListEntries.containsKey(fingerprint)) {
+        this.exitListEntries.put(fingerprint,
+            new HashSet<ExitListEntry>());
+      }
+      this.exitListEntries.get(fingerprint).add(exitListEntry);
     }
   }
 
   private Map<String, ServerDescriptor> bridgeServerDescriptors =
       new HashMap<String, ServerDescriptor>();
-  public void readBridgeServerDescriptors() {
+
+  private void processBridgeServerDescriptor(
+      ServerDescriptor serverDescriptor) {
     /* Don't remember which server descriptors we already parsed.  If we
      * parse a server descriptor now and first learn about the relay in a
      * later status, we'll never write the descriptor content anywhere.
      * The result would be details files containing no descriptor parts
      * until the bridge publishes the next descriptor. */
-    DescriptorQueue descriptorQueue =
-        this.descriptorSource.getDescriptorQueue(
-        DescriptorType.BRIDGE_SERVER_DESCRIPTORS);
-    Descriptor descriptor;
-    while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
-      if (descriptor instanceof ServerDescriptor) {
-        ServerDescriptor serverDescriptor = (ServerDescriptor) descriptor;
-        String fingerprint = serverDescriptor.getFingerprint();
-        if (!this.bridgeServerDescriptors.containsKey(fingerprint) ||
-            this.bridgeServerDescriptors.get(fingerprint).
-            getPublishedMillis()
-            < serverDescriptor.getPublishedMillis()) {
-          this.bridgeServerDescriptors.put(fingerprint,
-              serverDescriptor);
-        }
-      }
+    String fingerprint = serverDescriptor.getFingerprint();
+    if (!this.bridgeServerDescriptors.containsKey(fingerprint) ||
+        this.bridgeServerDescriptors.get(fingerprint).getPublishedMillis()
+        < serverDescriptor.getPublishedMillis()) {
+      this.bridgeServerDescriptors.put(fingerprint, serverDescriptor);
     }
   }
 
   private Map<String, String> bridgePoolAssignments =
       new HashMap<String, String>();
-  public void readBridgePoolAssignments() {
-    DescriptorQueue descriptorQueue =
-        this.descriptorSource.getDescriptorQueue(
-        DescriptorType.BRIDGE_POOL_ASSIGNMENTS,
-        DescriptorHistory.BRIDGE_POOLASSIGN_HISTORY);
-    Descriptor descriptor;
-    while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
-      if (descriptor instanceof BridgePoolAssignment) {
-        BridgePoolAssignment bridgePoolAssignment =
-            (BridgePoolAssignment) descriptor;
-        for (Map.Entry<String, String> e :
-            bridgePoolAssignment.getEntries().entrySet()) {
-          String fingerprint = e.getKey();
-          String details = e.getValue();
-          this.bridgePoolAssignments.put(fingerprint, details);
-        }
-      }
+
+  private void processBridgePoolAssignment(
+      BridgePoolAssignment bridgePoolAssignment) {
+    for (Map.Entry<String, String> e :
+        bridgePoolAssignment.getEntries().entrySet()) {
+      String fingerprint = e.getKey();
+      String details = e.getValue();
+      this.bridgePoolAssignments.put(fingerprint, details);
     }
   }
 
diff --git a/src/org/torproject/onionoo/Main.java b/src/org/torproject/onionoo/Main.java
index 9caac06..9c02319 100644
--- a/src/org/torproject/onionoo/Main.java
+++ b/src/org/torproject/onionoo/Main.java
@@ -30,30 +30,27 @@ public class Main {
     WeightsDataWriter wdw = new WeightsDataWriter(dso, ds);
     printStatusTime("Initialized weights data writer");
 
-    // TODO Instead of creating nine, partly overlapping descriptor
-    // queues, register for descriptor type and let DescriptorSource
-    // parse everything just once.
     printStatus("Reading descriptors.");
-    ndw.readRelayNetworkConsensuses();
-    printStatusTime("Read network status consensuses");
-    ndw.readBridgeNetworkStatuses();
-    printStatusTime("Read bridge network statuses");
-    ddw.readRelayServerDescriptors();
+    dso.readRelayNetworkConsensuses();
+    printStatusTime("Read relay network consensuses");
+    dso.readRelayServerDescriptors();
     printStatusTime("Read relay server descriptors");
-    ddw.readExitLists();
+    dso.readRelayExtraInfos();
+    printStatusTime("Read relay extra-info descriptors");
+    dso.readExitLists();
     printStatusTime("Read exit lists");
-    ddw.readBridgeServerDescriptors();
+    dso.readBridgeNetworkStatuses();
+    printStatusTime("Read bridge network statuses");
+    dso.readBridgeServerDescriptors();
     printStatusTime("Read bridge server descriptors");
-    ddw.readBridgePoolAssignments();
+    dso.readBridgeExtraInfos();
+    printStatusTime("Read bridge extra-info descriptors");
+    dso.readBridgePoolAssignments();
     printStatusTime("Read bridge-pool assignments");
-    bdw.readExtraInfoDescriptors();
-    printStatusTime("Read extra-info descriptors");
-    wdw.readRelayServerDescriptors();
-    printStatusTime("Read relay server descriptors");
-    wdw.readRelayNetworkConsensuses();
-    printStatusTime("Read relay network consensuses");
 
     printStatus("Updating internal node list.");
+    ndw.readStatusSummary();
+    printStatusTime("Read status summary");
     ndw.lookUpCitiesAndASes();
     printStatusTime("Looked up cities and ASes");
     ndw.setRunningBits();
diff --git a/src/org/torproject/onionoo/NodeDataWriter.java b/src/org/torproject/onionoo/NodeDataWriter.java
index b2d7e32..69ce01b 100644
--- a/src/org/torproject/onionoo/NodeDataWriter.java
+++ b/src/org/torproject/onionoo/NodeDataWriter.java
@@ -16,7 +16,7 @@ import org.torproject.onionoo.LookupService.LookupResult;
 
 /* Store relays and bridges that have been running in the past seven
  * days. */
-public class NodeDataWriter {
+public class NodeDataWriter implements DescriptorListener {
 
   private DescriptorSource descriptorSource;
 
@@ -33,63 +33,29 @@ public class NodeDataWriter {
 
   private SortedMap<String, Integer> lastBandwidthWeights = null;
 
-  private int relaysUpdated = 0, relaysAdded = 0,
-      relayConsensusesProcessed = 0, bridgesUpdated = 0,
-      bridgesAdded = 0, bridgeStatusesProcessed = 0;
+  private int relayConsensusesProcessed = 0, bridgeStatusesProcessed = 0;
+
   public NodeDataWriter(DescriptorSource descriptorSource,
       LookupService lookupService, DocumentStore documentStore) {
     this.descriptorSource = descriptorSource;
     this.lookupService = lookupService;
     this.documentStore = documentStore;
-    this.readStatusSummary();
+    this.registerDescriptorListeners();
   }
 
-  private void readStatusSummary() {
-    SortedSet<String> fingerprints = this.documentStore.list(
-        NodeStatus.class, true);
-    for (String fingerprint : fingerprints) {
-      NodeStatus node = this.documentStore.retrieve(NodeStatus.class,
-          true, fingerprint);
-      if (node.isRelay()) {
-        this.relaysLastValidAfterMillis = Math.max(
-            this.relaysLastValidAfterMillis, node.getLastSeenMillis());
-      } else {
-        this.bridgesLastPublishedMillis = Math.max(
-            this.bridgesLastPublishedMillis, node.getLastSeenMillis());
-      }
-      this.knownNodes.put(fingerprint, node);
-    }
+  private void registerDescriptorListeners() {
+    this.descriptorSource.registerListener(this,
+        DescriptorType.RELAY_CONSENSUSES);
+    this.descriptorSource.registerListener(this,
+        DescriptorType.BRIDGE_STATUSES);
   }
 
-  public void readRelayNetworkConsensuses() {
-    if (this.descriptorSource == null) {
-      System.err.println("Not configured to read relay network "
-          + "consensuses.");
-      return;
-    }
-    DescriptorQueue descriptorQueue =
-        this.descriptorSource.getDescriptorQueue(
-        DescriptorType.RELAY_CONSENSUSES,
-        DescriptorHistory.RELAY_CONSENSUS_HISTORY);
-    Descriptor descriptor;
-    while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
-      if (descriptor instanceof RelayNetworkStatusConsensus) {
-        updateRelayNetworkStatusConsensus(
-            (RelayNetworkStatusConsensus) descriptor);
-      }
-    }
-  }
-
-  public void setRunningBits() {
-    for (NodeStatus node : this.knownNodes.values()) {
-      if (node.isRelay() &&
-          node.getLastSeenMillis() == this.relaysLastValidAfterMillis) {
-        node.setRunning(true);
-      }
-      if (!node.isRelay() &&
-          node.getLastSeenMillis() == this.bridgesLastPublishedMillis) {
-        node.setRunning(true);
-      }
+  public void processDescriptor(Descriptor descriptor, boolean relay) {
+    if (descriptor instanceof RelayNetworkStatusConsensus) {
+      updateRelayNetworkStatusConsensus(
+          (RelayNetworkStatusConsensus) descriptor);
+    } else if (descriptor instanceof BridgeNetworkStatus) {
+      updateBridgeNetworkStatus((BridgeNetworkStatus) descriptor);
     }
   }
 
@@ -119,10 +85,8 @@ public class NodeDataWriter {
           validAfterMillis, null);
       if (this.knownNodes.containsKey(fingerprint)) {
         this.knownNodes.get(fingerprint).update(newNodeStatus);
-        this.relaysUpdated++;
       } else {
         this.knownNodes.put(fingerprint, newNodeStatus);
-        this.relaysAdded++;
       }
     }
     this.relayConsensusesProcessed++;
@@ -131,6 +95,67 @@ public class NodeDataWriter {
     }
   }
 
+  private void updateBridgeNetworkStatus(BridgeNetworkStatus status) {
+    long publishedMillis = status.getPublishedMillis();
+    if (publishedMillis > this.bridgesLastPublishedMillis) {
+      this.bridgesLastPublishedMillis = publishedMillis;
+    }
+    for (NetworkStatusEntry entry : status.getStatusEntries().values()) {
+      String nickname = entry.getNickname();
+      String fingerprint = entry.getFingerprint();
+      String address = entry.getAddress();
+      SortedSet<String> orAddressesAndPorts = new TreeSet<String>(
+          entry.getOrAddresses());
+      int orPort = entry.getOrPort();
+      int dirPort = entry.getDirPort();
+      SortedSet<String> relayFlags = entry.getFlags();
+      NodeStatus newNodeStatus = new NodeStatus(false, nickname,
+          fingerprint, address, orAddressesAndPorts, null,
+          publishedMillis, orPort, dirPort, relayFlags, -1L, "??", null,
+          -1L, null, null, publishedMillis, -1L, null);
+      if (this.knownNodes.containsKey(fingerprint)) {
+        this.knownNodes.get(fingerprint).update(newNodeStatus);
+      } else {
+        this.knownNodes.put(fingerprint, newNodeStatus);
+      }
+    }
+    this.bridgeStatusesProcessed++;
+  }
+
+  public void readStatusSummary() {
+    SortedSet<String> fingerprints = this.documentStore.list(
+        NodeStatus.class, true);
+    for (String fingerprint : fingerprints) {
+      NodeStatus node = this.documentStore.retrieve(NodeStatus.class,
+          true, fingerprint);
+      if (node.isRelay()) {
+        this.relaysLastValidAfterMillis = Math.max(
+            this.relaysLastValidAfterMillis, node.getLastSeenMillis());
+      } else {
+        this.bridgesLastPublishedMillis = Math.max(
+            this.bridgesLastPublishedMillis, node.getLastSeenMillis());
+      }
+      if (this.knownNodes.containsKey(fingerprint)) {
+        this.knownNodes.get(fingerprint).update(node);
+      } else {
+        this.knownNodes.put(fingerprint, node);
+      }
+    }
+  }
+
+  public void setRunningBits() {
+    for (NodeStatus node : this.knownNodes.values()) {
+      if (node.isRelay() &&
+          node.getLastSeenMillis() == this.relaysLastValidAfterMillis) {
+        node.setRunning(true);
+      }
+      if (!node.isRelay() &&
+          node.getLastSeenMillis() == this.bridgesLastPublishedMillis) {
+        node.setRunning(true);
+      }
+    }
+  }
+
   public void lookUpCitiesAndASes() {
     SortedSet<String> addressStrings = new TreeSet<String>();
     for (NodeStatus node : this.knownNodes.values()) {
@@ -164,53 +189,6 @@ public class NodeDataWriter {
     }
   }
 
-  public void readBridgeNetworkStatuses() {
-    if (this.descriptorSource == null) {
-      System.err.println("Not configured to read bridge network "
-          + "statuses.");
-      return;
-    }
-    DescriptorQueue descriptorQueue =
-        this.descriptorSource.getDescriptorQueue(
-        DescriptorType.BRIDGE_STATUSES,
-        DescriptorHistory.BRIDGE_STATUS_HISTORY);
-    Descriptor descriptor;
-    while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
-      if (descriptor instanceof BridgeNetworkStatus) {
-        updateBridgeNetworkStatus((BridgeNetworkStatus) descriptor);
-      }
-    }
-  }
-
-  private void updateBridgeNetworkStatus(BridgeNetworkStatus status) {
-    long publishedMillis = status.getPublishedMillis();
-    if (publishedMillis > this.bridgesLastPublishedMillis) {
-      this.bridgesLastPublishedMillis = publishedMillis;
-    }
-    for (NetworkStatusEntry entry : status.getStatusEntries().values()) {
-      String nickname = entry.getNickname();
-      String fingerprint = entry.getFingerprint();
-      String address = entry.getAddress();
-      SortedSet<String> orAddressesAndPorts = new TreeSet<String>(
-          entry.getOrAddresses());
-      int orPort = entry.getOrPort();
-      int dirPort = entry.getDirPort();
-      SortedSet<String> relayFlags = entry.getFlags();
-      NodeStatus newNodeStatus = new NodeStatus(false, nickname,
-          fingerprint, address, orAddressesAndPorts, null,
-          publishedMillis, orPort, dirPort, relayFlags, -1L, "??", null,
-          -1L, null, null, publishedMillis, -1L, null);
-      if (this.knownNodes.containsKey(fingerprint)) {
-        this.knownNodes.get(fingerprint).update(newNodeStatus);
-        this.bridgesUpdated++;
-      } else {
-        this.knownNodes.put(fingerprint, newNodeStatus);
-        this.bridgesAdded++;
-      }
-    }
-    this.bridgeStatusesProcessed++;
-  }
-
   public void writeStatusSummary() {
     this.writeSummary(true);
   }
@@ -248,16 +226,8 @@ public class NodeDataWriter {
     StringBuilder sb = new StringBuilder();
     sb.append("    " + formatDecimalNumber(relayConsensusesProcessed)
         + " relay consensuses processed\n");
-    sb.append("    " + formatDecimalNumber(relaysUpdated)
-        + " relays updated\n");
-    sb.append("    " + formatDecimalNumber(relaysAdded)
-        + " relays added\n");
     sb.append("    " + formatDecimalNumber(bridgeStatusesProcessed)
         + " bridge statuses processed\n");
-    sb.append("    " + formatDecimalNumber(bridgesUpdated)
-        + " bridges updated\n");
-    sb.append("    " + formatDecimalNumber(bridgesAdded)
-        + " bridges added\n");
     return sb.toString();
   }
 
diff --git a/src/org/torproject/onionoo/WeightsDataWriter.java b/src/org/torproject/onionoo/WeightsDataWriter.java
index 317b8e2..e388ddd 100644
--- a/src/org/torproject/onionoo/WeightsDataWriter.java
+++ b/src/org/torproject/onionoo/WeightsDataWriter.java
@@ -23,7 +23,7 @@ import org.torproject.descriptor.NetworkStatusEntry;
 import org.torproject.descriptor.RelayNetworkStatusConsensus;
 import org.torproject.descriptor.ServerDescriptor;
 
-public class WeightsDataWriter {
+public class WeightsDataWriter implements DescriptorListener {
 
   private DescriptorSource descriptorSource;
 
@@ -35,6 +35,23 @@ public class WeightsDataWriter {
       DocumentStore documentStore) {
     this.descriptorSource = descriptorSource;
     this.documentStore = documentStore;
+    this.registerDescriptorListeners();
+  }
+
+  private void registerDescriptorListeners() {
+    this.descriptorSource.registerListener(this,
+        DescriptorType.RELAY_CONSENSUSES);
+    this.descriptorSource.registerListener(this,
+        DescriptorType.RELAY_SERVER_DESCRIPTORS);
+  }
+
+  public void processDescriptor(Descriptor descriptor, boolean relay) {
+    if (descriptor instanceof ServerDescriptor) {
+      this.processRelayServerDescriptor((ServerDescriptor) descriptor);
+    } else if (descriptor instanceof RelayNetworkStatusConsensus) {
+      this.processRelayNetworkConsensus(
+          (RelayNetworkStatusConsensus) descriptor);
+    }
   }
 
   public void setCurrentNodes(
@@ -42,51 +59,33 @@ public class WeightsDataWriter {
     this.currentFingerprints.addAll(currentNodes.keySet());
   }
 
-  /* Read advertised bandwidths of all server descriptors in
-   * in/relay-descriptors/server-descriptors/ to memory.  Ideally, we'd
-   * skip descriptors that we read before and obtain their advertised
-   * bandwidths from some temp file.  This approach should do for now,
-   * though. */
   private Map<String, Integer> advertisedBandwidths =
       new HashMap<String, Integer>();
-  public void readRelayServerDescriptors() {
-    DescriptorQueue descriptorQueue =
-        this.descriptorSource.getDescriptorQueue(
-        DescriptorType.RELAY_SERVER_DESCRIPTORS);
-    Descriptor descriptor;
-    while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
-      if (descriptor instanceof ServerDescriptor) {
-        ServerDescriptor serverDescriptor =
-            (ServerDescriptor) descriptor;
-        String digest = serverDescriptor.getServerDescriptorDigest().
-            toUpperCase();
-        int advertisedBandwidth = Math.min(Math.min(
-            serverDescriptor.getBandwidthBurst(),
-            serverDescriptor.getBandwidthObserved()),
-            serverDescriptor.getBandwidthRate());
-        this.advertisedBandwidths.put(digest, advertisedBandwidth);
-      }
-    }
+
+  private void processRelayServerDescriptor(
+      ServerDescriptor serverDescriptor) {
+    /* Read advertised bandwidths of all server descriptors in
+     * in/relay-descriptors/server-descriptors/ to memory.  Ideally, we'd
+     * skip descriptors that we read before and obtain their advertised
+     * bandwidths from some temp file.  This approach should do for now,
+     * though. */
+    String digest = serverDescriptor.getServerDescriptorDigest().
+        toUpperCase();
+    int advertisedBandwidth = Math.min(Math.min(
+        serverDescriptor.getBandwidthBurst(),
+        serverDescriptor.getBandwidthObserved()),
+        serverDescriptor.getBandwidthRate());
+    this.advertisedBandwidths.put(digest, advertisedBandwidth);
   }
 
-  public void readRelayNetworkConsensuses() {
-    DescriptorQueue descriptorQueue =
-        this.descriptorSource.getDescriptorQueue(
-        DescriptorType.RELAY_CONSENSUSES,
-        DescriptorHistory.WEIGHTS_RELAY_CONSENSUS_HISTORY);
-    Descriptor descriptor;
-    while ((descriptor = descriptorQueue.nextDescriptor()) != null) {
-      if (descriptor instanceof RelayNetworkStatusConsensus) {
-        RelayNetworkStatusConsensus consensus =
-            (RelayNetworkStatusConsensus) descriptor;
-        long validAfterMillis = consensus.getValidAfterMillis(),
-            freshUntilMillis = consensus.getFreshUntilMillis();
-        SortedMap<String, double[]> pathSelectionWeights =
-            this.calculatePathSelectionProbabilities(consensus);
-        this.updateWeightsHistory(validAfterMillis, freshUntilMillis,
-            pathSelectionWeights);
-      }
-    }
+  private void processRelayNetworkConsensus(
+      RelayNetworkStatusConsensus consensus) {
+    long validAfterMillis = consensus.getValidAfterMillis(),
+        freshUntilMillis = consensus.getFreshUntilMillis();
+    SortedMap<String, double[]> pathSelectionWeights =
+        this.calculatePathSelectionProbabilities(consensus);
+    this.updateWeightsHistory(validAfterMillis, freshUntilMillis,
+        pathSelectionWeights);
   }
 
   private static final int HISTORY_UPDATER_WORKERS_NUM = 4;





More information about the tor-commits mailing list