commit e868be92e4dab9aa96b5b9acc454ff9dfb5ee1b2
Author: Karsten Loesing <karsten.loesing(a)gmx.net>
Date: Tue Jan 17 09:48:42 2012 +0100
Split relay descriptor downloader into three classes.
The three classes are:
- The "user"-facing RelayDescriptorDownloaderImpl that is used to
configure what descriptors shall be downloaded.
- The DownloadCoordinator that memorizes which descriptors still need to
be downloaded, which downloads are in progress, and which downloads are
completed or have failed.
- The DirectoryDownloader that sends the actual requests to one directory
authority or mirror and reports back to DownloadCoordinator for
finished downloads.
---
.../descriptor/impl/DirectoryDownloader.java | 107 +++++++
.../descriptor/impl/DownloadCoordinator.java | 259 +++++++++++++++
.../impl/RelayDescriptorDownloaderImpl.java | 332 +-------------------
3 files changed, 381 insertions(+), 317 deletions(-)
diff --git a/src/org/torproject/descriptor/impl/DirectoryDownloader.java b/src/org/torproject/descriptor/impl/DirectoryDownloader.java
new file mode 100644
index 0000000..d07c5c9
--- /dev/null
+++ b/src/org/torproject/descriptor/impl/DirectoryDownloader.java
@@ -0,0 +1,107 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.descriptor.impl;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.zip.InflaterInputStream;
+
+/* Download descriptors from one directory authority or mirror. First,
+ * ask the coordinator thread to create a request, run it, and deliver
+ * the response. Repeat until the coordinator thread says there are no
+ * further requests to make. */
+public class DirectoryDownloader implements Runnable {
+
+ private String nickname;
+ private String ipPort;
+ protected DirectoryDownloader(String nickname, String ip, int dirPort) {
+ this.nickname = nickname;
+ this.ipPort = ip + ":" + String.valueOf(dirPort);
+ }
+
+ private DownloadCoordinator downloadCoordinator;
+ protected void setDownloadCoordinator(
+ DownloadCoordinator downloadCoordinator) {
+ this.downloadCoordinator = downloadCoordinator;
+ }
+
+ private long requestTimeout;
+ protected void setRequestTimeout(long requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
+ public void run() {
+ boolean keepRunning = true;
+ do {
+ DescriptorRequestImpl request =
+ this.downloadCoordinator.createRequest(this.nickname);
+ if (request != null) {
+ String url = "http://" + this.ipPort
+ + request.getRequestedResource();
+ request.setRequestStart(System.currentTimeMillis());
+ Thread timeoutThread = new Thread(new RequestTimeout(
+ this.requestTimeout));
+ timeoutThread.start();
+ try {
+ URL u = new URL(url);
+ HttpURLConnection huc =
+ (HttpURLConnection) u.openConnection();
+ huc.setRequestMethod("GET");
+ huc.connect();
+ int responseCode = huc.getResponseCode();
+ request.setResponseCode(responseCode);
+ if (responseCode == 200) {
+ BufferedInputStream in = new BufferedInputStream(
+ new InflaterInputStream(huc.getInputStream()));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ int len;
+ byte[] data = new byte[1024];
+ while ((len = in.read(data, 0, 1024)) >= 0) {
+ baos.write(data, 0, len);
+ }
+ in.close();
+ byte[] responseBytes = baos.toByteArray();
+ request.setResponseBytes(responseBytes);
+ request.setRequestEnd(System.currentTimeMillis());
+ }
+ } catch (IOException e) {
+ /* Stop downloading from this directory if there are any
+ * problems, e.g., refused connections. */
+ keepRunning = false;
+ }
+ /* TODO How do we find out if we were interrupted, and by who?
+ * Set the request or global timeout flag in the response. */
+ timeoutThread.interrupt();
+ this.downloadCoordinator.deliverResponse(request);
+ } else {
+ keepRunning = false;
+ }
+ } while (keepRunning);
+ }
+
+ /* Interrupt a download request if it takes longer than a given time. */
+ private static class RequestTimeout implements Runnable {
+ private long timeoutMillis;
+ private Thread downloaderThread;
+ private RequestTimeout(long timeoutMillis) {
+ this.downloaderThread = Thread.currentThread();
+ this.timeoutMillis = timeoutMillis;
+ }
+ public void run() {
+ long started = System.currentTimeMillis(), sleep;
+ while ((sleep = started + this.timeoutMillis
+ - System.currentTimeMillis()) > 0L) {
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ this.downloaderThread.interrupt();
+ }
+ }
+}
+
diff --git a/src/org/torproject/descriptor/impl/DownloadCoordinator.java b/src/org/torproject/descriptor/impl/DownloadCoordinator.java
new file mode 100644
index 0000000..c706415
--- /dev/null
+++ b/src/org/torproject/descriptor/impl/DownloadCoordinator.java
@@ -0,0 +1,259 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.descriptor.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorRequest;
+import org.torproject.descriptor.DirSourceEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
+import org.torproject.descriptor.RelayNetworkStatusVote;
+
+/* TODO This whole download logic is a mess and needs a cleanup. */
+public class DownloadCoordinator {
+
+ private BlockingIteratorImpl<DescriptorRequest> descriptorQueue =
+ new BlockingIteratorImpl<DescriptorRequest>();
+ protected Iterator<DescriptorRequest> getDescriptorQueue() {
+ return this.descriptorQueue;
+ }
+
+ private SortedMap<String, DirectoryDownloader> directoryAuthorities;
+ private SortedMap<String, DirectoryDownloader> directoryMirrors;
+ private boolean downloadConsensusFromAllAuthorities;
+ private boolean includeCurrentReferencedVotes;
+ private long requestTimeoutMillis;
+ private long globalTimeoutMillis;
+
+ protected DownloadCoordinator(
+ SortedMap<String, DirectoryDownloader> directoryAuthorities,
+ SortedMap<String, DirectoryDownloader> directoryMirrors,
+ boolean downloadConsensus,
+ boolean downloadConsensusFromAllAuthorities,
+ Set<String> downloadVotes, boolean includeCurrentReferencedVotes,
+ long requestTimeoutMillis, long globalTimeoutMillis) {
+ this.directoryAuthorities = directoryAuthorities;
+ this.directoryMirrors = directoryMirrors;
+ this.missingConsensus = downloadConsensus;
+ this.downloadConsensusFromAllAuthorities =
+ downloadConsensusFromAllAuthorities;
+ this.missingVotes = downloadVotes;
+ this.includeCurrentReferencedVotes = includeCurrentReferencedVotes;
+ this.requestTimeoutMillis = requestTimeoutMillis;
+ this.globalTimeoutMillis = globalTimeoutMillis;
+ if (this.directoryMirrors.isEmpty() &&
+ this.directoryAuthorities.isEmpty()) {
+ this.descriptorQueue.setOutOfDescriptors();
+ /* TODO Should we say anything if we don't have any directories
+ * configured? */
+ } else {
+ GlobalTimer globalTimer = new GlobalTimer(this.globalTimeoutMillis,
+ this);
+ this.globalTimerThread = new Thread(globalTimer);
+ this.globalTimerThread.start();
+ for (DirectoryDownloader directoryMirror :
+ this.directoryMirrors.values()) {
+ directoryMirror.setDownloadCoordinator(this);
+ directoryMirror.setRequestTimeout(this.requestTimeoutMillis);
+ new Thread(directoryMirror).start();
+ }
+ for (DirectoryDownloader directoryAuthority :
+ this.directoryAuthorities.values()) {
+ directoryAuthority.setDownloadCoordinator(this);
+ directoryAuthority.setRequestTimeout(this.requestTimeoutMillis);
+ new Thread(directoryAuthority).start();
+ }
+ }
+ }
+
+ /* Interrupt all downloads if the total download time exceeds a given
+ * time. */
+ private Thread globalTimerThread;
+ private static class GlobalTimer implements Runnable {
+ private long timeoutMillis;
+ private DownloadCoordinator downloadCoordinator;
+ private GlobalTimer(long timeoutMillis,
+ DownloadCoordinator downloadCoordinator) {
+ this.timeoutMillis = timeoutMillis;
+ this.downloadCoordinator = downloadCoordinator;
+ }
+ public void run() {
+ long started = System.currentTimeMillis(), sleep;
+ while ((sleep = started + this.timeoutMillis
+ - System.currentTimeMillis()) > 0L) {
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ this.downloadCoordinator.interruptAllDownloads();
+ }
+ }
+
+ /* Are we missing the consensus, and should the next directory that
+ * hasn't tried downloading it before attempt to download it? */
+ private boolean missingConsensus = false;
+
+ /* Which directories have attempted to download the consensus so far,
+ * including those directories that are currently attempting it? */
+ private Set<String> requestedConsensuses = new HashSet<String>();
+
+ /* Which votes are we currently missing? */
+ private Set<String> missingVotes = new HashSet<String>();
+
+ /* Which vote (map value) is a given directory (map key) currently
+ * attempting to download? */
+ private Map<String, String> requestingVotes =
+ new HashMap<String, String>();
+
+ /* Which votes (map value) has a given directory (map key) attempted or
+ * is currently attempting to download? */
+ private Map<String, Set<String>> requestedVotes =
+ new HashMap<String, Set<String>>();
+
+ private boolean hasFinishedDownloading = false;
+
+ /* Look up what request a directory should make next. If there is
+ * nothing to do right now, but maybe later, block the caller. If
+ * we're done downloading, return null to notify the caller. */
+ protected synchronized DescriptorRequestImpl createRequest(
+ String nickname) {
+ while (!this.hasFinishedDownloading) {
+ DescriptorRequestImpl request = new DescriptorRequestImpl();
+ request.setDirectoryNickname(nickname);
+ if ((this.missingConsensus ||
+ (this.downloadConsensusFromAllAuthorities &&
+ this.directoryAuthorities.containsKey(nickname))) &&
+ !this.requestedConsensuses.contains(nickname)) {
+ if (!this.downloadConsensusFromAllAuthorities) {
+ this.missingConsensus = false;
+ }
+ this.requestedConsensuses.add(nickname);
+ request.setRequestedResource(
+ "/tor/status-vote/current/consensus.z");
+ request.setDescriptorType("consensus");
+ return request;
+ }
+ if (!this.missingVotes.isEmpty() &&
+ this.directoryAuthorities.containsKey(nickname)) {
+ String requestingVote = null;
+ for (String missingVote : this.missingVotes) {
+ if (!this.requestedVotes.containsKey(nickname) ||
+ !this.requestedVotes.get(nickname).contains(missingVote)) {
+ requestingVote = missingVote;
+ }
+ }
+ if (requestingVote != null) {
+ this.requestingVotes.put(nickname, requestingVote);
+ if (!this.requestedVotes.containsKey(nickname)) {
+ this.requestedVotes.put(nickname, new HashSet<String>());
+ }
+ this.requestedVotes.get(nickname).add(requestingVote);
+ this.missingVotes.remove(requestingVote);
+ request.setRequestedResource("/tor/status-vote/current/"
+ + requestingVote + ".z");
+ request.setDescriptorType("vote");
+ return request;
+ }
+ }
+ /* TODO Add server descriptors and extra-info descriptors later. */
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ /* TODO What shall we do? */
+ }
+ }
+ return null;
+ }
+
+ /* Deliver a response which may either contain one or more descriptors
+ * or a failure response code. Update the lists of missing descriptors,
+ * decide if there are more descriptors to download, and wake up any
+ * waiting downloader threads. */
+ protected synchronized void deliverResponse(
+ DescriptorRequestImpl response) {
+ String nickname = response.getDirectoryNickname();
+ if (response.getDescriptorType().equals("consensus")) {
+ if (response.getResponseCode() == 200) {
+ List<RelayNetworkStatusConsensus> parsedConsensuses =
+ RelayNetworkStatusConsensusImpl.parseConsensuses(
+ response.getResponseBytes());
+ List<Descriptor> parsedDescriptors =
+ new ArrayList<Descriptor>(parsedConsensuses);
+ response.setDescriptors(parsedDescriptors);
+ if (this.includeCurrentReferencedVotes) {
+ /* TODO Only add votes if the consensus is not older than one
+ * hour. Or does that make no sense? */
+ for (RelayNetworkStatusConsensus parsedConsensus :
+ parsedConsensuses) {
+ for (DirSourceEntry dirSource :
+ parsedConsensus.getDirSourceEntries().values()) {
+ String identity = dirSource.getIdentity();
+ if (!this.missingVotes.contains(identity)) {
+ boolean alreadyRequested = false;
+ for (Set<String> requestedBefore :
+ this.requestedVotes.values()) {
+ if (requestedBefore.contains(identity)) {
+ alreadyRequested = true;
+ break;
+ }
+ }
+ if (!alreadyRequested) {
+ this.missingVotes.add(identity);
+ }
+ }
+ }
+ }
+ /* TODO Later, add referenced server descriptors. */
+ }
+ } else {
+ this.missingConsensus = true;
+ }
+ } else if (response.getDescriptorType().equals("vote")) {
+ String requestedVote = requestingVotes.remove(nickname);
+ if (response.getResponseCode() == 200) {
+ List<RelayNetworkStatusVote> parsedVotes =
+ RelayNetworkStatusVoteImpl.parseVotes(
+ response.getResponseBytes());
+ List<Descriptor> parsedDescriptors =
+ new ArrayList<Descriptor>(parsedVotes);
+ response.setDescriptors(parsedDescriptors);
+ } else {
+ this.missingVotes.add(requestedVote);
+ }
+ }
+ if (response.getRequestEnd() != 0L) {
+ this.descriptorQueue.add(response);
+ }
+ if ((!this.missingConsensus ||
+ (this.downloadConsensusFromAllAuthorities &&
+ this.requestedConsensuses.containsAll(
+ this.directoryAuthorities.keySet()))) &&
+ this.missingVotes.isEmpty() &&
+ this.requestingVotes.isEmpty()) {
+ /* TODO This logic may be somewhat broken. We don't wait for all
+ * consensus requests to complete or fail, which results in adding
+ * (failed) requests to the queue when we think we're done. */
+ this.hasFinishedDownloading = true;
+ this.globalTimerThread.interrupt();
+ this.descriptorQueue.setOutOfDescriptors();
+ }
+ /* Wake up all waiting downloader threads. Maybe they can now
+ * download something, or they'll realize we're done downloading. */
+ this.notifyAll();
+ }
+
+ private synchronized void interruptAllDownloads() {
+ this.hasFinishedDownloading = true;
+ this.notifyAll();
+ }
+}
+
diff --git a/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java b/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java
index 6805cc4..fff6b36 100644
--- a/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java
+++ b/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java
@@ -2,11 +2,6 @@
* See LICENSE for licensing information */
package org.torproject.descriptor.impl;
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -16,7 +11,6 @@ import java.util.List;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.zip.InflaterInputStream;
import org.torproject.descriptor.Descriptor;
import org.torproject.descriptor.DescriptorRequest;
import org.torproject.descriptor.DirSourceEntry;
@@ -24,21 +18,10 @@ import org.torproject.descriptor.RelayDescriptorDownloader;
import org.torproject.descriptor.RelayNetworkStatusConsensus;
import org.torproject.descriptor.RelayNetworkStatusVote;
-/* TODO Should this class be split up and be moved to its own subpackage?
- * It's huge, and it's not going to get smaller in the future. */
public class RelayDescriptorDownloaderImpl
implements RelayDescriptorDownloader {
- /* TODO Move part of the factory class to the impl package and make this
- * constructor protected. */
- public RelayDescriptorDownloaderImpl() {
- }
-
private boolean hasStartedDownloading = false;
- private boolean hasFinishedDownloading = false;
-
- private BlockingIteratorImpl<DescriptorRequest> descriptorQueue =
- new BlockingIteratorImpl<DescriptorRequest>();
private SortedMap<String, DirectoryDownloader> directoryAuthorities =
new TreeMap<String, DirectoryDownloader>();
@@ -50,7 +33,7 @@ public class RelayDescriptorDownloaderImpl
}
this.checkDirectoryParameters(nickname, ip, dirPort);
DirectoryDownloader directoryAuthority = new DirectoryDownloader(
- nickname, ip, dirPort, this);
+ nickname, ip, dirPort);
this.directoryAuthorities.put(nickname, directoryAuthority);
}
@@ -64,7 +47,7 @@ public class RelayDescriptorDownloaderImpl
}
this.checkDirectoryParameters(nickname, ip, dirPort);
DirectoryDownloader directoryMirror = new DirectoryDownloader(
- nickname, ip, dirPort, this);
+ nickname, ip, dirPort);
this.directoryMirrors.put(nickname, directoryMirror);
/* TODO Implement prioritizing mirrors for non-vote downloads. */
throw new UnsupportedOperationException("Prioritizing directory "
@@ -96,12 +79,13 @@ public class RelayDescriptorDownloaderImpl
}
}
+ private boolean downloadConsensus = false;
public void setIncludeCurrentConsensus() {
if (this.hasStartedDownloading) {
throw new IllegalStateException("Reconfiguration is not permitted "
+ "after starting to download.");
}
- this.missingConsensus = true;
+ this.downloadConsensus = true;
}
private boolean downloadConsensusFromAllAuthorities = false;
@@ -122,13 +106,14 @@ public class RelayDescriptorDownloaderImpl
this.includeCurrentReferencedVotes = true;
}
+ private Set<String> downloadVotes = new HashSet<String>();
public void setIncludeCurrentVote(String fingerprint) {
if (this.hasStartedDownloading) {
throw new IllegalStateException("Reconfiguration is not permitted "
+ "after starting to download.");
}
this.checkVoteFingerprint(fingerprint);
- this.missingVotes.add(fingerprint);
+ this.downloadVotes.add(fingerprint);
}
public void setIncludeCurrentVotes(Set<String> fingerprints) {
@@ -248,302 +233,15 @@ public class RelayDescriptorDownloaderImpl
throw new IllegalStateException("Initiating downloads is only "
+ "permitted once.");
}
- if (this.directoryMirrors.isEmpty() &&
- this.directoryAuthorities.isEmpty()) {
- this.descriptorQueue.setOutOfDescriptors();
- /* TODO Should we say anything if we don't have any directories
- * configured? */
- } else {
- GlobalTimer globalTimer = new GlobalTimer(this.globalTimeoutMillis,
- this);
- this.globalTimerThread = new Thread(globalTimer);
- this.globalTimerThread.start();
- for (DirectoryDownloader directoryMirror :
- this.directoryMirrors.values()) {
- directoryMirror.setRequestTimeout(this.requestTimeoutMillis);
- new Thread(directoryMirror).start();
- }
- for (DirectoryDownloader directoryAuthority :
- this.directoryAuthorities.values()) {
- directoryAuthority.setRequestTimeout(this.requestTimeoutMillis);
- new Thread(directoryAuthority).start();
- }
- }
- return this.descriptorQueue;
- }
-
- /* Interrupt all downloads if the total download time exceeds a given
- * time. */
- private Thread globalTimerThread;
- private static class GlobalTimer implements Runnable {
- private long timeoutMillis;
- private RelayDescriptorDownloaderImpl downloadCoordinator;
- private GlobalTimer(long timeoutMillis,
- RelayDescriptorDownloaderImpl downloadCoordinator) {
- this.timeoutMillis = timeoutMillis;
- this.downloadCoordinator = downloadCoordinator;
- }
- public void run() {
- long started = System.currentTimeMillis(), sleep;
- while ((sleep = started + this.timeoutMillis
- - System.currentTimeMillis()) > 0L) {
- try {
- Thread.sleep(sleep);
- } catch (InterruptedException e) {
- return;
- }
- }
- this.downloadCoordinator.interruptAllDownloads();
- }
- }
-
- /* TODO This whole download logic is a mess. It should probably go to
- * its own class, and it needs a cleanup. */
-
- /* Are we missing the consensus, and should the next directory that
- * hasn't tried downloading it before attempt to download it? */
- private boolean missingConsensus = false;
-
- /* Which directories have attempted to download the consensus so far,
- * including those directories that are currently attempting it? */
- private Set<String> requestedConsensuses = new HashSet<String>();
-
- /* Which votes are we currently missing? */
- private Set<String> missingVotes = new HashSet<String>();
-
- /* Which vote (map value) is a given directory (map key) currently
- * attempting to download? */
- private Map<String, String> requestingVotes =
- new HashMap<String, String>();
-
- /* Which votes (map value) has a given directory (map key) attempted or
- * is currently attempting to download? */
- private Map<String, Set<String>> requestedVotes =
- new HashMap<String, Set<String>>();
-
- /* Look up what request a directory should make next. If there is
- * nothing to do right now, but maybe later, block the caller. If
- * we're done downloading, return null to notify the caller. */
- private synchronized DescriptorRequestImpl createRequest(
- String nickname) {
- while (!this.hasFinishedDownloading) {
- DescriptorRequestImpl request = new DescriptorRequestImpl();
- request.setDirectoryNickname(nickname);
- if ((this.missingConsensus ||
- (this.downloadConsensusFromAllAuthorities &&
- this.directoryAuthorities.containsKey(nickname))) &&
- !this.requestedConsensuses.contains(nickname)) {
- if (!this.downloadConsensusFromAllAuthorities) {
- this.missingConsensus = false;
- }
- this.requestedConsensuses.add(nickname);
- request.setRequestedResource(
- "/tor/status-vote/current/consensus.z");
- request.setDescriptorType("consensus");
- return request;
- }
- if (!this.missingVotes.isEmpty() &&
- this.directoryAuthorities.containsKey(nickname)) {
- String requestingVote = null;
- for (String missingVote : this.missingVotes) {
- if (!this.requestedVotes.containsKey(nickname) ||
- !this.requestedVotes.get(nickname).contains(missingVote)) {
- requestingVote = missingVote;
- }
- }
- if (requestingVote != null) {
- this.requestingVotes.put(nickname, requestingVote);
- if (!this.requestedVotes.containsKey(nickname)) {
- this.requestedVotes.put(nickname, new HashSet<String>());
- }
- this.requestedVotes.get(nickname).add(requestingVote);
- this.missingVotes.remove(requestingVote);
- request.setRequestedResource("/tor/status-vote/current/"
- + requestingVote + ".z");
- request.setDescriptorType("vote");
- return request;
- }
- }
- /* TODO Add server descriptors and extra-info descriptors later. */
- try {
- this.wait();
- } catch (InterruptedException e) {
- /* TODO What shall we do? */
- }
- }
- return null;
- }
-
- /* Deliver a response which may either contain one or more descriptors
- * or a failure response code. Update the lists of missing descriptors,
- * decide if there are more descriptors to download, and wake up any
- * waiting downloader threads. */
- private synchronized void deliverResponse(
- DescriptorRequestImpl response) {
- String nickname = response.getDirectoryNickname();
- if (response.getDescriptorType().equals("consensus")) {
- if (response.getResponseCode() == 200) {
- List<RelayNetworkStatusConsensus> parsedConsensuses =
- RelayNetworkStatusConsensusImpl.parseConsensuses(
- response.getResponseBytes());
- List<Descriptor> parsedDescriptors =
- new ArrayList<Descriptor>(parsedConsensuses);
- response.setDescriptors(parsedDescriptors);
- if (this.includeCurrentReferencedVotes) {
- /* TODO Only add votes if the consensus is not older than one
- * hour. Or does that make no sense? */
- for (RelayNetworkStatusConsensus parsedConsensus :
- parsedConsensuses) {
- for (DirSourceEntry dirSource :
- parsedConsensus.getDirSourceEntries().values()) {
- String identity = dirSource.getIdentity();
- if (!this.missingVotes.contains(identity)) {
- boolean alreadyRequested = false;
- for (Set<String> requestedBefore :
- this.requestedVotes.values()) {
- if (requestedBefore.contains(identity)) {
- alreadyRequested = true;
- break;
- }
- }
- if (!alreadyRequested) {
- this.missingVotes.add(identity);
- }
- }
- }
- }
- /* TODO Later, add referenced server descriptors. */
- }
- } else {
- this.missingConsensus = true;
- }
- } else if (response.getDescriptorType().equals("vote")) {
- String requestedVote = requestingVotes.remove(nickname);
- if (response.getResponseCode() == 200) {
- List<RelayNetworkStatusVote> parsedVotes =
- RelayNetworkStatusVoteImpl.parseVotes(
- response.getResponseBytes());
- List<Descriptor> parsedDescriptors =
- new ArrayList<Descriptor>(parsedVotes);
- response.setDescriptors(parsedDescriptors);
- } else {
- this.missingVotes.add(requestedVote);
- }
- }
- if (response.getRequestEnd() != 0L) {
- this.descriptorQueue.add(response);
- }
- if ((!this.missingConsensus ||
- (this.downloadConsensusFromAllAuthorities &&
- this.requestedConsensuses.containsAll(
- this.directoryAuthorities.keySet()))) &&
- this.missingVotes.isEmpty() &&
- this.requestingVotes.isEmpty()) {
- /* TODO This logic may be somewhat broken. We don't wait for all
- * consensus requests to complete or fail, which results in adding
- * (failed) requests to the queue when we think we're done. */
- this.hasFinishedDownloading = true;
- this.globalTimerThread.interrupt();
- this.descriptorQueue.setOutOfDescriptors();
- }
- /* Wake up all waiting downloader threads. Maybe they can now
- * download something, or they'll realize we're done downloading. */
- this.notifyAll();
- }
-
- private synchronized void interruptAllDownloads() {
- this.hasFinishedDownloading = true;
- this.notifyAll();
- }
-
- /* Download descriptors from one directory authority or mirror. First,
- * ask the coordinator thread to create a request, run it, and deliver
- * the response. Repeat until the coordinator thread says there are no
- * further requests to make. */
- private static class DirectoryDownloader implements Runnable {
- private String nickname;
- private String ipPort;
- private RelayDescriptorDownloaderImpl downloadCoordinator;
- private DirectoryDownloader(String nickname, String ip, int dirPort,
- RelayDescriptorDownloaderImpl downloadCoordinator) {
- this.nickname = nickname;
- this.ipPort = ip + ":" + String.valueOf(dirPort);
- this.downloadCoordinator = downloadCoordinator;
- }
- private long requestTimeout;
- private void setRequestTimeout(long requestTimeout) {
- this.requestTimeout = requestTimeout;
- }
- public void run() {
- boolean keepRunning = true;
- do {
- DescriptorRequestImpl request =
- this.downloadCoordinator.createRequest(this.nickname);
- if (request != null) {
- String url = "http://" + this.ipPort
- + request.getRequestedResource();
- request.setRequestStart(System.currentTimeMillis());
- Thread timeoutThread = new Thread(new RequestTimeout(
- this.requestTimeout));
- timeoutThread.start();
- try {
- URL u = new URL(url);
- HttpURLConnection huc =
- (HttpURLConnection) u.openConnection();
- huc.setRequestMethod("GET");
- huc.connect();
- int responseCode = huc.getResponseCode();
- request.setResponseCode(responseCode);
- if (responseCode == 200) {
- BufferedInputStream in = new BufferedInputStream(
- new InflaterInputStream(huc.getInputStream()));
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- int len;
- byte[] data = new byte[1024];
- while ((len = in.read(data, 0, 1024)) >= 0) {
- baos.write(data, 0, len);
- }
- in.close();
- byte[] responseBytes = baos.toByteArray();
- request.setResponseBytes(responseBytes);
- request.setRequestEnd(System.currentTimeMillis());
- }
- } catch (IOException e) {
- /* Stop downloading from this directory if there are any
- * problems, e.g., refused connections. */
- keepRunning = false;
- }
- /* TODO How do we find out if we were interrupted, and by who?
- * Set the request or global timeout flag in the response. */
- timeoutThread.interrupt();
- this.downloadCoordinator.deliverResponse(request);
- } else {
- keepRunning = false;
- }
- } while (keepRunning);
- }
- }
-
- /* Interrupt a download request if it takes longer than a given time. */
- private static class RequestTimeout implements Runnable {
- private long timeoutMillis;
- private Thread downloaderThread;
- private RequestTimeout(long timeoutMillis) {
- this.downloaderThread = Thread.currentThread();
- this.timeoutMillis = timeoutMillis;
- }
- public void run() {
- long started = System.currentTimeMillis(), sleep;
- while ((sleep = started + this.timeoutMillis
- - System.currentTimeMillis()) > 0L) {
- try {
- Thread.sleep(sleep);
- } catch (InterruptedException e) {
- return;
- }
- }
- this.downloaderThread.interrupt();
- }
+ this.hasStartedDownloading = true;
+ DownloadCoordinator downloadCoordinator = new DownloadCoordinator(
+ this.directoryAuthorities, this.directoryMirrors,
+ this.downloadConsensus, this.downloadConsensusFromAllAuthorities,
+ this.downloadVotes, this.includeCurrentReferencedVotes,
+ this.requestTimeoutMillis, this.globalTimeoutMillis);
+ Iterator<DescriptorRequest> descriptorQueue = downloadCoordinator.
+ getDescriptorQueue();
+ return descriptorQueue;
}
}