[tor-commits] [metrics-lib/master] Split relay descriptor downloader into three classes.

karsten at torproject.org karsten at torproject.org
Tue Jan 17 10:26:22 UTC 2012


commit e868be92e4dab9aa96b5b9acc454ff9dfb5ee1b2
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Tue Jan 17 09:48:42 2012 +0100

    Split relay descriptor downloader into three classes.
    
    The three classes are:
     - The "user"-facing RelayDescriptorDownloaderImpl that is used to
       configure what descriptors shall be downloaded.
     - The DownloadCoordinator that memorizes which descriptors still need to
       be downloaded, which downloads are in progress, and which downloads are
       completed or have failed.
     - The DirectoryDownloader that sends the actual requests to one directory
       authority or mirror and reports back to DownloadCoordinator for
       finished downloads.
---
 .../descriptor/impl/DirectoryDownloader.java       |  107 +++++++
 .../descriptor/impl/DownloadCoordinator.java       |  259 +++++++++++++++
 .../impl/RelayDescriptorDownloaderImpl.java        |  332 +-------------------
 3 files changed, 381 insertions(+), 317 deletions(-)

diff --git a/src/org/torproject/descriptor/impl/DirectoryDownloader.java b/src/org/torproject/descriptor/impl/DirectoryDownloader.java
new file mode 100644
index 0000000..d07c5c9
--- /dev/null
+++ b/src/org/torproject/descriptor/impl/DirectoryDownloader.java
@@ -0,0 +1,107 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.descriptor.impl;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.zip.InflaterInputStream;
+
+/* Download descriptors from one directory authority or mirror.  First,
+ * ask the coordinator thread to create a request, run it, and deliver
+ * the response.  Repeat until the coordinator thread says there are no
+ * further requests to make. */
+public class DirectoryDownloader implements Runnable {
+
+  private String nickname;
+  private String ipPort;
+  protected DirectoryDownloader(String nickname, String ip, int dirPort) {
+    this.nickname = nickname;
+    this.ipPort = ip + ":" + String.valueOf(dirPort);
+  }
+
+  private DownloadCoordinator downloadCoordinator;
+  protected void setDownloadCoordinator(
+      DownloadCoordinator downloadCoordinator) {
+    this.downloadCoordinator = downloadCoordinator;
+  }
+
+  private long requestTimeout;
+  protected void setRequestTimeout(long requestTimeout) {
+    this.requestTimeout = requestTimeout;
+  }
+
+  public void run() {
+    boolean keepRunning = true;
+    do {
+      DescriptorRequestImpl request =
+          this.downloadCoordinator.createRequest(this.nickname);
+      if (request != null) {
+        String url = "http://" + this.ipPort
+            + request.getRequestedResource();
+        request.setRequestStart(System.currentTimeMillis());
+        Thread timeoutThread = new Thread(new RequestTimeout(
+            this.requestTimeout));
+        timeoutThread.start();
+        try {
+          URL u = new URL(url);
+          HttpURLConnection huc =
+              (HttpURLConnection) u.openConnection();
+          huc.setRequestMethod("GET");
+          huc.connect();
+          int responseCode = huc.getResponseCode();
+          request.setResponseCode(responseCode);
+          if (responseCode == 200) {
+            BufferedInputStream in = new BufferedInputStream(
+                new InflaterInputStream(huc.getInputStream()));
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            int len;
+            byte[] data = new byte[1024];
+            while ((len = in.read(data, 0, 1024)) >= 0) {
+              baos.write(data, 0, len);
+            }
+            in.close();
+            byte[] responseBytes = baos.toByteArray();
+            request.setResponseBytes(responseBytes);
+            request.setRequestEnd(System.currentTimeMillis());
+          }
+        } catch (IOException e) {
+          /* Stop downloading from this directory if there are any
+           * problems, e.g., refused connections. */
+          keepRunning = false;
+        }
+        /* TODO How do we find out if we were interrupted, and by who?
+         * Set the request or global timeout flag in the response. */
+        timeoutThread.interrupt();
+        this.downloadCoordinator.deliverResponse(request);
+      } else {
+        keepRunning = false;
+      }
+    } while (keepRunning);
+  }
+
+  /* Interrupt a download request if it takes longer than a given time. */
+  private static class RequestTimeout implements Runnable {
+    private long timeoutMillis;
+    private Thread downloaderThread;
+    private RequestTimeout(long timeoutMillis) {
+      this.downloaderThread = Thread.currentThread();
+      this.timeoutMillis = timeoutMillis;
+    }
+    public void run() {
+      long started = System.currentTimeMillis(), sleep;
+      while ((sleep = started + this.timeoutMillis
+          - System.currentTimeMillis()) > 0L) {
+        try {
+          Thread.sleep(sleep);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+      this.downloaderThread.interrupt();
+    }
+  }
+}
+
diff --git a/src/org/torproject/descriptor/impl/DownloadCoordinator.java b/src/org/torproject/descriptor/impl/DownloadCoordinator.java
new file mode 100644
index 0000000..c706415
--- /dev/null
+++ b/src/org/torproject/descriptor/impl/DownloadCoordinator.java
@@ -0,0 +1,259 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.descriptor.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorRequest;
+import org.torproject.descriptor.DirSourceEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
+import org.torproject.descriptor.RelayNetworkStatusVote;
+
+/* TODO This whole download logic is a mess and needs a cleanup. */
+public class DownloadCoordinator {
+
+  private BlockingIteratorImpl<DescriptorRequest> descriptorQueue =
+      new BlockingIteratorImpl<DescriptorRequest>();
+  protected Iterator<DescriptorRequest> getDescriptorQueue() {
+    return this.descriptorQueue;
+  }
+
+  private SortedMap<String, DirectoryDownloader> directoryAuthorities;
+  private SortedMap<String, DirectoryDownloader> directoryMirrors;
+  private boolean downloadConsensusFromAllAuthorities;
+  private boolean includeCurrentReferencedVotes;
+  private long requestTimeoutMillis;
+  private long globalTimeoutMillis;
+
+  protected DownloadCoordinator(
+      SortedMap<String, DirectoryDownloader> directoryAuthorities,
+      SortedMap<String, DirectoryDownloader> directoryMirrors,
+      boolean downloadConsensus,
+      boolean downloadConsensusFromAllAuthorities,
+      Set<String> downloadVotes, boolean includeCurrentReferencedVotes,
+      long requestTimeoutMillis, long globalTimeoutMillis) {
+    this.directoryAuthorities = directoryAuthorities;
+    this.directoryMirrors = directoryMirrors;
+    this.missingConsensus = downloadConsensus;
+    this.downloadConsensusFromAllAuthorities =
+        downloadConsensusFromAllAuthorities;
+    this.missingVotes = downloadVotes;
+    this.includeCurrentReferencedVotes = includeCurrentReferencedVotes;
+    this.requestTimeoutMillis = requestTimeoutMillis;
+    this.globalTimeoutMillis = globalTimeoutMillis;
+    if (this.directoryMirrors.isEmpty() &&
+        this.directoryAuthorities.isEmpty()) {
+      this.descriptorQueue.setOutOfDescriptors();
+      /* TODO Should we say anything if we don't have any directories
+       * configured? */
+    } else {
+      GlobalTimer globalTimer = new GlobalTimer(this.globalTimeoutMillis,
+          this);
+      this.globalTimerThread = new Thread(globalTimer);
+      this.globalTimerThread.start();
+      for (DirectoryDownloader directoryMirror :
+          this.directoryMirrors.values()) {
+        directoryMirror.setDownloadCoordinator(this);
+        directoryMirror.setRequestTimeout(this.requestTimeoutMillis);
+        new Thread(directoryMirror).start();
+      }
+      for (DirectoryDownloader directoryAuthority :
+          this.directoryAuthorities.values()) {
+        directoryAuthority.setDownloadCoordinator(this);
+        directoryAuthority.setRequestTimeout(this.requestTimeoutMillis);
+        new Thread(directoryAuthority).start();
+      }
+    }
+  }
+
+  /* Interrupt all downloads if the total download time exceeds a given
+   * time. */
+  private Thread globalTimerThread;
+  private static class GlobalTimer implements Runnable {
+    private long timeoutMillis;
+    private DownloadCoordinator downloadCoordinator;
+    private GlobalTimer(long timeoutMillis,
+        DownloadCoordinator downloadCoordinator) {
+      this.timeoutMillis = timeoutMillis;
+      this.downloadCoordinator = downloadCoordinator;
+    }
+    public void run() {
+      long started = System.currentTimeMillis(), sleep;
+      while ((sleep = started + this.timeoutMillis
+          - System.currentTimeMillis()) > 0L) {
+        try {
+          Thread.sleep(sleep);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+      this.downloadCoordinator.interruptAllDownloads();
+    }
+  }
+
+  /* Are we missing the consensus, and should the next directory that
+   * hasn't tried downloading it before attempt to download it? */
+  private boolean missingConsensus = false;
+
+  /* Which directories have attempted to download the consensus so far,
+   * including those directories that are currently attempting it? */
+  private Set<String> requestedConsensuses = new HashSet<String>();
+
+  /* Which votes are we currently missing? */
+  private Set<String> missingVotes = new HashSet<String>();
+
+  /* Which vote (map value) is a given directory (map key) currently
+   * attempting to download? */
+  private Map<String, String> requestingVotes =
+      new HashMap<String, String>();
+
+  /* Which votes (map value) has a given directory (map key) attempted or
+   * is currently attempting to download? */
+  private Map<String, Set<String>> requestedVotes =
+      new HashMap<String, Set<String>>();
+
+  private boolean hasFinishedDownloading = false;
+
+  /* Look up what request a directory should make next.  If there is
+   * nothing to do right now, but maybe later, block the caller.  If
+   * we're done downloading, return null to notify the caller. */
+  protected synchronized DescriptorRequestImpl createRequest(
+      String nickname) {
+    while (!this.hasFinishedDownloading) {
+      DescriptorRequestImpl request = new DescriptorRequestImpl();
+      request.setDirectoryNickname(nickname);
+      if ((this.missingConsensus ||
+          (this.downloadConsensusFromAllAuthorities &&
+          this.directoryAuthorities.containsKey(nickname))) &&
+          !this.requestedConsensuses.contains(nickname)) {
+        if (!this.downloadConsensusFromAllAuthorities) {
+          this.missingConsensus = false;
+        }
+        this.requestedConsensuses.add(nickname);
+        request.setRequestedResource(
+            "/tor/status-vote/current/consensus.z");
+        request.setDescriptorType("consensus");
+        return request;
+      }
+      if (!this.missingVotes.isEmpty() &&
+          this.directoryAuthorities.containsKey(nickname)) {
+        String requestingVote = null;
+        for (String missingVote : this.missingVotes) {
+          if (!this.requestedVotes.containsKey(nickname) ||
+              !this.requestedVotes.get(nickname).contains(missingVote)) {
+            requestingVote = missingVote;
+          }
+        }
+        if (requestingVote != null) {
+          this.requestingVotes.put(nickname, requestingVote);
+          if (!this.requestedVotes.containsKey(nickname)) {
+            this.requestedVotes.put(nickname, new HashSet<String>());
+          }
+          this.requestedVotes.get(nickname).add(requestingVote);
+          this.missingVotes.remove(requestingVote);
+          request.setRequestedResource("/tor/status-vote/current/"
+              + requestingVote + ".z");
+          request.setDescriptorType("vote");
+          return request;
+        }
+      }
+      /* TODO Add server descriptors and extra-info descriptors later. */
+      try {
+        this.wait();
+      } catch (InterruptedException e) {
+        /* TODO What shall we do? */
+      }
+    }
+    return null;
+  }
+
+  /* Deliver a response which may either contain one or more descriptors
+   * or a failure response code.  Update the lists of missing descriptors,
+   * decide if there are more descriptors to download, and wake up any
+   * waiting downloader threads. */
+  protected synchronized void deliverResponse(
+      DescriptorRequestImpl response) {
+    String nickname = response.getDirectoryNickname();
+    if (response.getDescriptorType().equals("consensus")) {
+      if (response.getResponseCode() == 200) {
+        List<RelayNetworkStatusConsensus> parsedConsensuses =
+            RelayNetworkStatusConsensusImpl.parseConsensuses(
+            response.getResponseBytes());
+        List<Descriptor> parsedDescriptors =
+            new ArrayList<Descriptor>(parsedConsensuses);
+        response.setDescriptors(parsedDescriptors);
+        if (this.includeCurrentReferencedVotes) {
+          /* TODO Only add votes if the consensus is not older than one
+           * hour.  Or does that make no sense? */
+          for (RelayNetworkStatusConsensus parsedConsensus :
+              parsedConsensuses) {
+            for (DirSourceEntry dirSource :
+                parsedConsensus.getDirSourceEntries().values()) {
+              String identity = dirSource.getIdentity();
+              if (!this.missingVotes.contains(identity)) {
+                boolean alreadyRequested = false;
+                for (Set<String> requestedBefore :
+                    this.requestedVotes.values()) {
+                  if (requestedBefore.contains(identity)) {
+                    alreadyRequested = true;
+                    break;
+                  }
+                }
+                if (!alreadyRequested) {
+                  this.missingVotes.add(identity);
+                }
+              }
+            }
+          }
+          /* TODO Later, add referenced server descriptors. */
+        }
+      } else {
+        this.missingConsensus = true;
+      }
+    } else if (response.getDescriptorType().equals("vote")) {
+      String requestedVote = requestingVotes.remove(nickname);
+      if (response.getResponseCode() == 200) {
+        List<RelayNetworkStatusVote> parsedVotes =
+            RelayNetworkStatusVoteImpl.parseVotes(
+            response.getResponseBytes());
+        List<Descriptor> parsedDescriptors =
+            new ArrayList<Descriptor>(parsedVotes);
+        response.setDescriptors(parsedDescriptors);
+      } else {
+        this.missingVotes.add(requestedVote);
+      }
+    }
+    if (response.getRequestEnd() != 0L) {
+      this.descriptorQueue.add(response);
+    }
+    if ((!this.missingConsensus ||
+        (this.downloadConsensusFromAllAuthorities &&
+        this.requestedConsensuses.containsAll(
+        this.directoryAuthorities.keySet()))) &&
+        this.missingVotes.isEmpty() &&
+        this.requestingVotes.isEmpty()) {
+      /* TODO This logic may be somewhat broken.  We don't wait for all
+       * consensus requests to complete or fail, which results in adding
+       * (failed) requests to the queue when we think we're done. */
+      this.hasFinishedDownloading = true;
+      this.globalTimerThread.interrupt();
+      this.descriptorQueue.setOutOfDescriptors();
+    }
+    /* Wake up all waiting downloader threads.  Maybe they can now
+     * download something, or they'll realize we're done downloading. */
+    this.notifyAll();
+  }
+
+  private synchronized void interruptAllDownloads() {
+    this.hasFinishedDownloading = true;
+    this.notifyAll();
+  }
+}
+
diff --git a/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java b/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java
index 6805cc4..fff6b36 100644
--- a/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java
+++ b/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java
@@ -2,11 +2,6 @@
  * See LICENSE for licensing information */
 package org.torproject.descriptor.impl;
 
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -16,7 +11,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.zip.InflaterInputStream;
 import org.torproject.descriptor.Descriptor;
 import org.torproject.descriptor.DescriptorRequest;
 import org.torproject.descriptor.DirSourceEntry;
@@ -24,21 +18,10 @@ import org.torproject.descriptor.RelayDescriptorDownloader;
 import org.torproject.descriptor.RelayNetworkStatusConsensus;
 import org.torproject.descriptor.RelayNetworkStatusVote;
 
-/* TODO Should this class be split up and be moved to its own subpackage?
- * It's huge, and it's not going to get smaller in the future. */
 public class RelayDescriptorDownloaderImpl
     implements RelayDescriptorDownloader {
 
-  /* TODO Move part of the factory class to the impl package and make this
-   * constructor protected. */
-  public RelayDescriptorDownloaderImpl() {
-  }
-
   private boolean hasStartedDownloading = false;
-  private boolean hasFinishedDownloading = false;
-
-  private BlockingIteratorImpl<DescriptorRequest> descriptorQueue =
-      new BlockingIteratorImpl<DescriptorRequest>();
 
   private SortedMap<String, DirectoryDownloader> directoryAuthorities =
       new TreeMap<String, DirectoryDownloader>();
@@ -50,7 +33,7 @@ public class RelayDescriptorDownloaderImpl
     }
     this.checkDirectoryParameters(nickname, ip, dirPort);
     DirectoryDownloader directoryAuthority = new DirectoryDownloader(
-        nickname, ip, dirPort, this);
+        nickname, ip, dirPort);
     this.directoryAuthorities.put(nickname, directoryAuthority);
   }
 
@@ -64,7 +47,7 @@ public class RelayDescriptorDownloaderImpl
     }
     this.checkDirectoryParameters(nickname, ip, dirPort);
     DirectoryDownloader directoryMirror = new DirectoryDownloader(
-        nickname, ip, dirPort, this);
+        nickname, ip, dirPort);
     this.directoryMirrors.put(nickname, directoryMirror);
     /* TODO Implement prioritizing mirrors for non-vote downloads. */
     throw new UnsupportedOperationException("Prioritizing directory "
@@ -96,12 +79,13 @@ public class RelayDescriptorDownloaderImpl
     }
   }
 
+  private boolean downloadConsensus = false;
   public void setIncludeCurrentConsensus() {
     if (this.hasStartedDownloading) {
       throw new IllegalStateException("Reconfiguration is not permitted "
           + "after starting to download.");
     }
-    this.missingConsensus = true;
+    this.downloadConsensus = true;
   }
 
   private boolean downloadConsensusFromAllAuthorities = false;
@@ -122,13 +106,14 @@ public class RelayDescriptorDownloaderImpl
     this.includeCurrentReferencedVotes = true;
   }
 
+  private Set<String> downloadVotes = new HashSet<String>();
   public void setIncludeCurrentVote(String fingerprint) {
     if (this.hasStartedDownloading) {
       throw new IllegalStateException("Reconfiguration is not permitted "
           + "after starting to download.");
     }
     this.checkVoteFingerprint(fingerprint);
-    this.missingVotes.add(fingerprint);
+    this.downloadVotes.add(fingerprint);
   }
 
   public void setIncludeCurrentVotes(Set<String> fingerprints) {
@@ -248,302 +233,15 @@ public class RelayDescriptorDownloaderImpl
       throw new IllegalStateException("Initiating downloads is only "
           + "permitted once.");
     }
-    if (this.directoryMirrors.isEmpty() &&
-        this.directoryAuthorities.isEmpty()) {
-      this.descriptorQueue.setOutOfDescriptors();
-      /* TODO Should we say anything if we don't have any directories
-       * configured? */
-    } else {
-      GlobalTimer globalTimer = new GlobalTimer(this.globalTimeoutMillis,
-          this);
-      this.globalTimerThread = new Thread(globalTimer);
-      this.globalTimerThread.start();
-      for (DirectoryDownloader directoryMirror :
-          this.directoryMirrors.values()) {
-        directoryMirror.setRequestTimeout(this.requestTimeoutMillis);
-        new Thread(directoryMirror).start();
-      }
-      for (DirectoryDownloader directoryAuthority :
-          this.directoryAuthorities.values()) {
-        directoryAuthority.setRequestTimeout(this.requestTimeoutMillis);
-        new Thread(directoryAuthority).start();
-      }
-    }
-    return this.descriptorQueue;
-  }
-
-  /* Interrupt all downloads if the total download time exceeds a given
-   * time. */
-  private Thread globalTimerThread;
-  private static class GlobalTimer implements Runnable {
-    private long timeoutMillis;
-    private RelayDescriptorDownloaderImpl downloadCoordinator;
-    private GlobalTimer(long timeoutMillis,
-        RelayDescriptorDownloaderImpl downloadCoordinator) {
-      this.timeoutMillis = timeoutMillis;
-      this.downloadCoordinator = downloadCoordinator;
-    }
-    public void run() {
-      long started = System.currentTimeMillis(), sleep;
-      while ((sleep = started + this.timeoutMillis
-          - System.currentTimeMillis()) > 0L) {
-        try {
-          Thread.sleep(sleep);
-        } catch (InterruptedException e) {
-          return;
-        }
-      }
-      this.downloadCoordinator.interruptAllDownloads();
-    }
-  }
-
-  /* TODO This whole download logic is a mess.  It should probably go to
-   * its own class, and it needs a cleanup. */
-
-  /* Are we missing the consensus, and should the next directory that
-   * hasn't tried downloading it before attempt to download it? */
-  private boolean missingConsensus = false;
-
-  /* Which directories have attempted to download the consensus so far,
-   * including those directories that are currently attempting it? */
-  private Set<String> requestedConsensuses = new HashSet<String>();
-
-  /* Which votes are we currently missing? */
-  private Set<String> missingVotes = new HashSet<String>();
-
-  /* Which vote (map value) is a given directory (map key) currently
-   * attempting to download? */
-  private Map<String, String> requestingVotes =
-      new HashMap<String, String>();
-
-  /* Which votes (map value) has a given directory (map key) attempted or
-   * is currently attempting to download? */
-  private Map<String, Set<String>> requestedVotes =
-      new HashMap<String, Set<String>>();
-
-  /* Look up what request a directory should make next.  If there is
-   * nothing to do right now, but maybe later, block the caller.  If
-   * we're done downloading, return null to notify the caller. */
-  private synchronized DescriptorRequestImpl createRequest(
-      String nickname) {
-    while (!this.hasFinishedDownloading) {
-      DescriptorRequestImpl request = new DescriptorRequestImpl();
-      request.setDirectoryNickname(nickname);
-      if ((this.missingConsensus ||
-          (this.downloadConsensusFromAllAuthorities &&
-          this.directoryAuthorities.containsKey(nickname))) &&
-          !this.requestedConsensuses.contains(nickname)) {
-        if (!this.downloadConsensusFromAllAuthorities) {
-          this.missingConsensus = false;
-        }
-        this.requestedConsensuses.add(nickname);
-        request.setRequestedResource(
-            "/tor/status-vote/current/consensus.z");
-        request.setDescriptorType("consensus");
-        return request;
-      }
-      if (!this.missingVotes.isEmpty() &&
-          this.directoryAuthorities.containsKey(nickname)) {
-        String requestingVote = null;
-        for (String missingVote : this.missingVotes) {
-          if (!this.requestedVotes.containsKey(nickname) ||
-              !this.requestedVotes.get(nickname).contains(missingVote)) {
-            requestingVote = missingVote;
-          }
-        }
-        if (requestingVote != null) {
-          this.requestingVotes.put(nickname, requestingVote);
-          if (!this.requestedVotes.containsKey(nickname)) {
-            this.requestedVotes.put(nickname, new HashSet<String>());
-          }
-          this.requestedVotes.get(nickname).add(requestingVote);
-          this.missingVotes.remove(requestingVote);
-          request.setRequestedResource("/tor/status-vote/current/"
-              + requestingVote + ".z");
-          request.setDescriptorType("vote");
-          return request;
-        }
-      }
-      /* TODO Add server descriptors and extra-info descriptors later. */
-      try {
-        this.wait();
-      } catch (InterruptedException e) {
-        /* TODO What shall we do? */
-      }
-    }
-    return null;
-  }
-
-  /* Deliver a response which may either contain one or more descriptors
-   * or a failure response code.  Update the lists of missing descriptors,
-   * decide if there are more descriptors to download, and wake up any
-   * waiting downloader threads. */
-  private synchronized void deliverResponse(
-      DescriptorRequestImpl response) {
-    String nickname = response.getDirectoryNickname();
-    if (response.getDescriptorType().equals("consensus")) {
-      if (response.getResponseCode() == 200) {
-        List<RelayNetworkStatusConsensus> parsedConsensuses =
-            RelayNetworkStatusConsensusImpl.parseConsensuses(
-            response.getResponseBytes());
-        List<Descriptor> parsedDescriptors =
-            new ArrayList<Descriptor>(parsedConsensuses);
-        response.setDescriptors(parsedDescriptors);
-        if (this.includeCurrentReferencedVotes) {
-          /* TODO Only add votes if the consensus is not older than one
-           * hour.  Or does that make no sense? */
-          for (RelayNetworkStatusConsensus parsedConsensus :
-              parsedConsensuses) {
-            for (DirSourceEntry dirSource :
-                parsedConsensus.getDirSourceEntries().values()) {
-              String identity = dirSource.getIdentity();
-              if (!this.missingVotes.contains(identity)) {
-                boolean alreadyRequested = false;
-                for (Set<String> requestedBefore :
-                    this.requestedVotes.values()) {
-                  if (requestedBefore.contains(identity)) {
-                    alreadyRequested = true;
-                    break;
-                  }
-                }
-                if (!alreadyRequested) {
-                  this.missingVotes.add(identity);
-                }
-              }
-            }
-          }
-          /* TODO Later, add referenced server descriptors. */
-        }
-      } else {
-        this.missingConsensus = true;
-      }
-    } else if (response.getDescriptorType().equals("vote")) {
-      String requestedVote = requestingVotes.remove(nickname);
-      if (response.getResponseCode() == 200) {
-        List<RelayNetworkStatusVote> parsedVotes =
-            RelayNetworkStatusVoteImpl.parseVotes(
-            response.getResponseBytes());
-        List<Descriptor> parsedDescriptors =
-            new ArrayList<Descriptor>(parsedVotes);
-        response.setDescriptors(parsedDescriptors);
-      } else {
-        this.missingVotes.add(requestedVote);
-      }
-    }
-    if (response.getRequestEnd() != 0L) {
-      this.descriptorQueue.add(response);
-    }
-    if ((!this.missingConsensus ||
-        (this.downloadConsensusFromAllAuthorities &&
-        this.requestedConsensuses.containsAll(
-        this.directoryAuthorities.keySet()))) &&
-        this.missingVotes.isEmpty() &&
-        this.requestingVotes.isEmpty()) {
-      /* TODO This logic may be somewhat broken.  We don't wait for all
-       * consensus requests to complete or fail, which results in adding
-       * (failed) requests to the queue when we think we're done. */
-      this.hasFinishedDownloading = true;
-      this.globalTimerThread.interrupt();
-      this.descriptorQueue.setOutOfDescriptors();
-    }
-    /* Wake up all waiting downloader threads.  Maybe they can now
-     * download something, or they'll realize we're done downloading. */
-    this.notifyAll();
-  }
-
-  private synchronized void interruptAllDownloads() {
-    this.hasFinishedDownloading = true;
-    this.notifyAll();
-  }
-
-  /* Download descriptors from one directory authority or mirror.  First,
-   * ask the coordinator thread to create a request, run it, and deliver
-   * the response.  Repeat until the coordinator thread says there are no
-   * further requests to make. */
-  private static class DirectoryDownloader implements Runnable {
-    private String nickname;
-    private String ipPort;
-    private RelayDescriptorDownloaderImpl downloadCoordinator;
-    private DirectoryDownloader(String nickname, String ip, int dirPort,
-        RelayDescriptorDownloaderImpl downloadCoordinator) {
-      this.nickname = nickname;
-      this.ipPort = ip + ":" + String.valueOf(dirPort);
-      this.downloadCoordinator = downloadCoordinator;
-    }
-    private long requestTimeout;
-    private void setRequestTimeout(long requestTimeout) {
-      this.requestTimeout = requestTimeout;
-    }
-    public void run() {
-      boolean keepRunning = true;
-      do {
-        DescriptorRequestImpl request =
-            this.downloadCoordinator.createRequest(this.nickname);
-        if (request != null) {
-          String url = "http://" + this.ipPort
-              + request.getRequestedResource();
-          request.setRequestStart(System.currentTimeMillis());
-          Thread timeoutThread = new Thread(new RequestTimeout(
-              this.requestTimeout));
-          timeoutThread.start();
-          try {
-            URL u = new URL(url);
-            HttpURLConnection huc =
-                (HttpURLConnection) u.openConnection();
-            huc.setRequestMethod("GET");
-            huc.connect();
-            int responseCode = huc.getResponseCode();
-            request.setResponseCode(responseCode);
-            if (responseCode == 200) {
-              BufferedInputStream in = new BufferedInputStream(
-                  new InflaterInputStream(huc.getInputStream()));
-              ByteArrayOutputStream baos = new ByteArrayOutputStream();
-              int len;
-              byte[] data = new byte[1024];
-              while ((len = in.read(data, 0, 1024)) >= 0) {
-                baos.write(data, 0, len);
-              }
-              in.close();
-              byte[] responseBytes = baos.toByteArray();
-              request.setResponseBytes(responseBytes);
-              request.setRequestEnd(System.currentTimeMillis());
-            }
-          } catch (IOException e) {
-            /* Stop downloading from this directory if there are any
-             * problems, e.g., refused connections. */
-            keepRunning = false;
-          }
-          /* TODO How do we find out if we were interrupted, and by who?
-           * Set the request or global timeout flag in the response. */
-          timeoutThread.interrupt();
-          this.downloadCoordinator.deliverResponse(request);
-        } else {
-          keepRunning = false;
-        }
-      } while (keepRunning);
-    }
-  }
-
-  /* Interrupt a download request if it takes longer than a given time. */
-  private static class RequestTimeout implements Runnable {
-    private long timeoutMillis;
-    private Thread downloaderThread;
-    private RequestTimeout(long timeoutMillis) {
-      this.downloaderThread = Thread.currentThread();
-      this.timeoutMillis = timeoutMillis;
-    }
-    public void run() {
-      long started = System.currentTimeMillis(), sleep;
-      while ((sleep = started + this.timeoutMillis
-          - System.currentTimeMillis()) > 0L) {
-        try {
-          Thread.sleep(sleep);
-        } catch (InterruptedException e) {
-          return;
-        }
-      }
-      this.downloaderThread.interrupt();
-    }
+    this.hasStartedDownloading = true;
+    DownloadCoordinator downloadCoordinator = new DownloadCoordinator(
+        this.directoryAuthorities, this.directoryMirrors,
+        this.downloadConsensus, this.downloadConsensusFromAllAuthorities,
+        this.downloadVotes, this.includeCurrentReferencedVotes,
+        this.requestTimeoutMillis, this.globalTimeoutMillis);
+    Iterator<DescriptorRequest> descriptorQueue = downloadCoordinator.
+        getDescriptorQueue();
+    return descriptorQueue;
   }
 }
 





More information about the tor-commits mailing list