commit e868be92e4dab9aa96b5b9acc454ff9dfb5ee1b2 Author: Karsten Loesing karsten.loesing@gmx.net Date: Tue Jan 17 09:48:42 2012 +0100
Split relay descriptor downloader into three classes.
The three classes are: - The "user"-facing RelayDescriptorDownloaderImpl that is used to configure what descriptors shall be downloaded. - The DownloadCoordinator that memorizes which descriptors still need to be downloaded, which downloads are in progress, and which downloads are completed or have failed. - The DirectoryDownloader that sends the actual requests to one directory authority or mirror and reports back to DownloadCoordinator for finished downloads. --- .../descriptor/impl/DirectoryDownloader.java | 107 +++++++ .../descriptor/impl/DownloadCoordinator.java | 259 +++++++++++++++ .../impl/RelayDescriptorDownloaderImpl.java | 332 +------------------- 3 files changed, 381 insertions(+), 317 deletions(-)
diff --git a/src/org/torproject/descriptor/impl/DirectoryDownloader.java b/src/org/torproject/descriptor/impl/DirectoryDownloader.java new file mode 100644 index 0000000..d07c5c9 --- /dev/null +++ b/src/org/torproject/descriptor/impl/DirectoryDownloader.java @@ -0,0 +1,107 @@ +/* Copyright 2011, 2012 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.descriptor.impl; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.zip.InflaterInputStream; + +/* Download descriptors from one directory authority or mirror. First, + * ask the coordinator thread to create a request, run it, and deliver + * the response. Repeat until the coordinator thread says there are no + * further requests to make. */ +public class DirectoryDownloader implements Runnable { + + private String nickname; + private String ipPort; + protected DirectoryDownloader(String nickname, String ip, int dirPort) { + this.nickname = nickname; + this.ipPort = ip + ":" + String.valueOf(dirPort); + } + + private DownloadCoordinator downloadCoordinator; + protected void setDownloadCoordinator( + DownloadCoordinator downloadCoordinator) { + this.downloadCoordinator = downloadCoordinator; + } + + private long requestTimeout; + protected void setRequestTimeout(long requestTimeout) { + this.requestTimeout = requestTimeout; + } + + public void run() { + boolean keepRunning = true; + do { + DescriptorRequestImpl request = + this.downloadCoordinator.createRequest(this.nickname); + if (request != null) { + String url = "http://" + this.ipPort + + request.getRequestedResource(); + request.setRequestStart(System.currentTimeMillis()); + Thread timeoutThread = new Thread(new RequestTimeout( + this.requestTimeout)); + timeoutThread.start(); + try { + URL u = new URL(url); + HttpURLConnection huc = + (HttpURLConnection) u.openConnection(); + huc.setRequestMethod("GET"); + huc.connect(); + int responseCode = huc.getResponseCode(); + request.setResponseCode(responseCode); + if (responseCode == 200) { + BufferedInputStream in = new BufferedInputStream( + new InflaterInputStream(huc.getInputStream())); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + int len; + byte[] data = new byte[1024]; + while ((len = in.read(data, 0, 1024)) >= 0) { + baos.write(data, 0, len); + } + in.close(); + byte[] responseBytes = baos.toByteArray(); + request.setResponseBytes(responseBytes); + request.setRequestEnd(System.currentTimeMillis()); + } + } catch (IOException e) { + /* Stop downloading from this directory if there are any + * problems, e.g., refused connections. */ + keepRunning = false; + } + /* TODO How do we find out if we were interrupted, and by who? + * Set the request or global timeout flag in the response. */ + timeoutThread.interrupt(); + this.downloadCoordinator.deliverResponse(request); + } else { + keepRunning = false; + } + } while (keepRunning); + } + + /* Interrupt a download request if it takes longer than a given time. */ + private static class RequestTimeout implements Runnable { + private long timeoutMillis; + private Thread downloaderThread; + private RequestTimeout(long timeoutMillis) { + this.downloaderThread = Thread.currentThread(); + this.timeoutMillis = timeoutMillis; + } + public void run() { + long started = System.currentTimeMillis(), sleep; + while ((sleep = started + this.timeoutMillis + - System.currentTimeMillis()) > 0L) { + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + return; + } + } + this.downloaderThread.interrupt(); + } + } +} + diff --git a/src/org/torproject/descriptor/impl/DownloadCoordinator.java b/src/org/torproject/descriptor/impl/DownloadCoordinator.java new file mode 100644 index 0000000..c706415 --- /dev/null +++ b/src/org/torproject/descriptor/impl/DownloadCoordinator.java @@ -0,0 +1,259 @@ +/* Copyright 2011, 2012 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.descriptor.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import org.torproject.descriptor.Descriptor; +import org.torproject.descriptor.DescriptorRequest; +import org.torproject.descriptor.DirSourceEntry; +import org.torproject.descriptor.RelayNetworkStatusConsensus; +import org.torproject.descriptor.RelayNetworkStatusVote; + +/* TODO This whole download logic is a mess and needs a cleanup. */ +public class DownloadCoordinator { + + private BlockingIteratorImpl<DescriptorRequest> descriptorQueue = + new BlockingIteratorImpl<DescriptorRequest>(); + protected Iterator<DescriptorRequest> getDescriptorQueue() { + return this.descriptorQueue; + } + + private SortedMap<String, DirectoryDownloader> directoryAuthorities; + private SortedMap<String, DirectoryDownloader> directoryMirrors; + private boolean downloadConsensusFromAllAuthorities; + private boolean includeCurrentReferencedVotes; + private long requestTimeoutMillis; + private long globalTimeoutMillis; + + protected DownloadCoordinator( + SortedMap<String, DirectoryDownloader> directoryAuthorities, + SortedMap<String, DirectoryDownloader> directoryMirrors, + boolean downloadConsensus, + boolean downloadConsensusFromAllAuthorities, + Set<String> downloadVotes, boolean includeCurrentReferencedVotes, + long requestTimeoutMillis, long globalTimeoutMillis) { + this.directoryAuthorities = directoryAuthorities; + this.directoryMirrors = directoryMirrors; + this.missingConsensus = downloadConsensus; + this.downloadConsensusFromAllAuthorities = + downloadConsensusFromAllAuthorities; + this.missingVotes = downloadVotes; + this.includeCurrentReferencedVotes = includeCurrentReferencedVotes; + this.requestTimeoutMillis = requestTimeoutMillis; + this.globalTimeoutMillis = globalTimeoutMillis; + if (this.directoryMirrors.isEmpty() && + this.directoryAuthorities.isEmpty()) { + this.descriptorQueue.setOutOfDescriptors(); + /* TODO Should we say anything if we don't have any directories + * configured? */ + } else { + GlobalTimer globalTimer = new GlobalTimer(this.globalTimeoutMillis, + this); + this.globalTimerThread = new Thread(globalTimer); + this.globalTimerThread.start(); + for (DirectoryDownloader directoryMirror : + this.directoryMirrors.values()) { + directoryMirror.setDownloadCoordinator(this); + directoryMirror.setRequestTimeout(this.requestTimeoutMillis); + new Thread(directoryMirror).start(); + } + for (DirectoryDownloader directoryAuthority : + this.directoryAuthorities.values()) { + directoryAuthority.setDownloadCoordinator(this); + directoryAuthority.setRequestTimeout(this.requestTimeoutMillis); + new Thread(directoryAuthority).start(); + } + } + } + + /* Interrupt all downloads if the total download time exceeds a given + * time. */ + private Thread globalTimerThread; + private static class GlobalTimer implements Runnable { + private long timeoutMillis; + private DownloadCoordinator downloadCoordinator; + private GlobalTimer(long timeoutMillis, + DownloadCoordinator downloadCoordinator) { + this.timeoutMillis = timeoutMillis; + this.downloadCoordinator = downloadCoordinator; + } + public void run() { + long started = System.currentTimeMillis(), sleep; + while ((sleep = started + this.timeoutMillis + - System.currentTimeMillis()) > 0L) { + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + return; + } + } + this.downloadCoordinator.interruptAllDownloads(); + } + } + + /* Are we missing the consensus, and should the next directory that + * hasn't tried downloading it before attempt to download it? */ + private boolean missingConsensus = false; + + /* Which directories have attempted to download the consensus so far, + * including those directories that are currently attempting it? */ + private Set<String> requestedConsensuses = new HashSet<String>(); + + /* Which votes are we currently missing? */ + private Set<String> missingVotes = new HashSet<String>(); + + /* Which vote (map value) is a given directory (map key) currently + * attempting to download? */ + private Map<String, String> requestingVotes = + new HashMap<String, String>(); + + /* Which votes (map value) has a given directory (map key) attempted or + * is currently attempting to download? */ + private Map<String, Set<String>> requestedVotes = + new HashMap<String, Set<String>>(); + + private boolean hasFinishedDownloading = false; + + /* Look up what request a directory should make next. If there is + * nothing to do right now, but maybe later, block the caller. If + * we're done downloading, return null to notify the caller. */ + protected synchronized DescriptorRequestImpl createRequest( + String nickname) { + while (!this.hasFinishedDownloading) { + DescriptorRequestImpl request = new DescriptorRequestImpl(); + request.setDirectoryNickname(nickname); + if ((this.missingConsensus || + (this.downloadConsensusFromAllAuthorities && + this.directoryAuthorities.containsKey(nickname))) && + !this.requestedConsensuses.contains(nickname)) { + if (!this.downloadConsensusFromAllAuthorities) { + this.missingConsensus = false; + } + this.requestedConsensuses.add(nickname); + request.setRequestedResource( + "/tor/status-vote/current/consensus.z"); + request.setDescriptorType("consensus"); + return request; + } + if (!this.missingVotes.isEmpty() && + this.directoryAuthorities.containsKey(nickname)) { + String requestingVote = null; + for (String missingVote : this.missingVotes) { + if (!this.requestedVotes.containsKey(nickname) || + !this.requestedVotes.get(nickname).contains(missingVote)) { + requestingVote = missingVote; + } + } + if (requestingVote != null) { + this.requestingVotes.put(nickname, requestingVote); + if (!this.requestedVotes.containsKey(nickname)) { + this.requestedVotes.put(nickname, new HashSet<String>()); + } + this.requestedVotes.get(nickname).add(requestingVote); + this.missingVotes.remove(requestingVote); + request.setRequestedResource("/tor/status-vote/current/" + + requestingVote + ".z"); + request.setDescriptorType("vote"); + return request; + } + } + /* TODO Add server descriptors and extra-info descriptors later. */ + try { + this.wait(); + } catch (InterruptedException e) { + /* TODO What shall we do? */ + } + } + return null; + } + + /* Deliver a response which may either contain one or more descriptors + * or a failure response code. Update the lists of missing descriptors, + * decide if there are more descriptors to download, and wake up any + * waiting downloader threads. */ + protected synchronized void deliverResponse( + DescriptorRequestImpl response) { + String nickname = response.getDirectoryNickname(); + if (response.getDescriptorType().equals("consensus")) { + if (response.getResponseCode() == 200) { + List<RelayNetworkStatusConsensus> parsedConsensuses = + RelayNetworkStatusConsensusImpl.parseConsensuses( + response.getResponseBytes()); + List<Descriptor> parsedDescriptors = + new ArrayList<Descriptor>(parsedConsensuses); + response.setDescriptors(parsedDescriptors); + if (this.includeCurrentReferencedVotes) { + /* TODO Only add votes if the consensus is not older than one + * hour. Or does that make no sense? */ + for (RelayNetworkStatusConsensus parsedConsensus : + parsedConsensuses) { + for (DirSourceEntry dirSource : + parsedConsensus.getDirSourceEntries().values()) { + String identity = dirSource.getIdentity(); + if (!this.missingVotes.contains(identity)) { + boolean alreadyRequested = false; + for (Set<String> requestedBefore : + this.requestedVotes.values()) { + if (requestedBefore.contains(identity)) { + alreadyRequested = true; + break; + } + } + if (!alreadyRequested) { + this.missingVotes.add(identity); + } + } + } + } + /* TODO Later, add referenced server descriptors. */ + } + } else { + this.missingConsensus = true; + } + } else if (response.getDescriptorType().equals("vote")) { + String requestedVote = requestingVotes.remove(nickname); + if (response.getResponseCode() == 200) { + List<RelayNetworkStatusVote> parsedVotes = + RelayNetworkStatusVoteImpl.parseVotes( + response.getResponseBytes()); + List<Descriptor> parsedDescriptors = + new ArrayList<Descriptor>(parsedVotes); + response.setDescriptors(parsedDescriptors); + } else { + this.missingVotes.add(requestedVote); + } + } + if (response.getRequestEnd() != 0L) { + this.descriptorQueue.add(response); + } + if ((!this.missingConsensus || + (this.downloadConsensusFromAllAuthorities && + this.requestedConsensuses.containsAll( + this.directoryAuthorities.keySet()))) && + this.missingVotes.isEmpty() && + this.requestingVotes.isEmpty()) { + /* TODO This logic may be somewhat broken. We don't wait for all + * consensus requests to complete or fail, which results in adding + * (failed) requests to the queue when we think we're done. */ + this.hasFinishedDownloading = true; + this.globalTimerThread.interrupt(); + this.descriptorQueue.setOutOfDescriptors(); + } + /* Wake up all waiting downloader threads. Maybe they can now + * download something, or they'll realize we're done downloading. */ + this.notifyAll(); + } + + private synchronized void interruptAllDownloads() { + this.hasFinishedDownloading = true; + this.notifyAll(); + } +} + diff --git a/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java b/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java index 6805cc4..fff6b36 100644 --- a/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java +++ b/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java @@ -2,11 +2,6 @@ * See LICENSE for licensing information */ package org.torproject.descriptor.impl;
-import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -16,7 +11,6 @@ import java.util.List; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; -import java.util.zip.InflaterInputStream; import org.torproject.descriptor.Descriptor; import org.torproject.descriptor.DescriptorRequest; import org.torproject.descriptor.DirSourceEntry; @@ -24,21 +18,10 @@ import org.torproject.descriptor.RelayDescriptorDownloader; import org.torproject.descriptor.RelayNetworkStatusConsensus; import org.torproject.descriptor.RelayNetworkStatusVote;
-/* TODO Should this class be split up and be moved to its own subpackage? - * It's huge, and it's not going to get smaller in the future. */ public class RelayDescriptorDownloaderImpl implements RelayDescriptorDownloader {
- /* TODO Move part of the factory class to the impl package and make this - * constructor protected. */ - public RelayDescriptorDownloaderImpl() { - } - private boolean hasStartedDownloading = false; - private boolean hasFinishedDownloading = false; - - private BlockingIteratorImpl<DescriptorRequest> descriptorQueue = - new BlockingIteratorImpl<DescriptorRequest>();
private SortedMap<String, DirectoryDownloader> directoryAuthorities = new TreeMap<String, DirectoryDownloader>(); @@ -50,7 +33,7 @@ public class RelayDescriptorDownloaderImpl } this.checkDirectoryParameters(nickname, ip, dirPort); DirectoryDownloader directoryAuthority = new DirectoryDownloader( - nickname, ip, dirPort, this); + nickname, ip, dirPort); this.directoryAuthorities.put(nickname, directoryAuthority); }
@@ -64,7 +47,7 @@ public class RelayDescriptorDownloaderImpl } this.checkDirectoryParameters(nickname, ip, dirPort); DirectoryDownloader directoryMirror = new DirectoryDownloader( - nickname, ip, dirPort, this); + nickname, ip, dirPort); this.directoryMirrors.put(nickname, directoryMirror); /* TODO Implement prioritizing mirrors for non-vote downloads. */ throw new UnsupportedOperationException("Prioritizing directory " @@ -96,12 +79,13 @@ public class RelayDescriptorDownloaderImpl } }
+ private boolean downloadConsensus = false; public void setIncludeCurrentConsensus() { if (this.hasStartedDownloading) { throw new IllegalStateException("Reconfiguration is not permitted " + "after starting to download."); } - this.missingConsensus = true; + this.downloadConsensus = true; }
private boolean downloadConsensusFromAllAuthorities = false; @@ -122,13 +106,14 @@ public class RelayDescriptorDownloaderImpl this.includeCurrentReferencedVotes = true; }
+ private Set<String> downloadVotes = new HashSet<String>(); public void setIncludeCurrentVote(String fingerprint) { if (this.hasStartedDownloading) { throw new IllegalStateException("Reconfiguration is not permitted " + "after starting to download."); } this.checkVoteFingerprint(fingerprint); - this.missingVotes.add(fingerprint); + this.downloadVotes.add(fingerprint); }
public void setIncludeCurrentVotes(Set<String> fingerprints) { @@ -248,302 +233,15 @@ public class RelayDescriptorDownloaderImpl throw new IllegalStateException("Initiating downloads is only " + "permitted once."); } - if (this.directoryMirrors.isEmpty() && - this.directoryAuthorities.isEmpty()) { - this.descriptorQueue.setOutOfDescriptors(); - /* TODO Should we say anything if we don't have any directories - * configured? */ - } else { - GlobalTimer globalTimer = new GlobalTimer(this.globalTimeoutMillis, - this); - this.globalTimerThread = new Thread(globalTimer); - this.globalTimerThread.start(); - for (DirectoryDownloader directoryMirror : - this.directoryMirrors.values()) { - directoryMirror.setRequestTimeout(this.requestTimeoutMillis); - new Thread(directoryMirror).start(); - } - for (DirectoryDownloader directoryAuthority : - this.directoryAuthorities.values()) { - directoryAuthority.setRequestTimeout(this.requestTimeoutMillis); - new Thread(directoryAuthority).start(); - } - } - return this.descriptorQueue; - } - - /* Interrupt all downloads if the total download time exceeds a given - * time. */ - private Thread globalTimerThread; - private static class GlobalTimer implements Runnable { - private long timeoutMillis; - private RelayDescriptorDownloaderImpl downloadCoordinator; - private GlobalTimer(long timeoutMillis, - RelayDescriptorDownloaderImpl downloadCoordinator) { - this.timeoutMillis = timeoutMillis; - this.downloadCoordinator = downloadCoordinator; - } - public void run() { - long started = System.currentTimeMillis(), sleep; - while ((sleep = started + this.timeoutMillis - - System.currentTimeMillis()) > 0L) { - try { - Thread.sleep(sleep); - } catch (InterruptedException e) { - return; - } - } - this.downloadCoordinator.interruptAllDownloads(); - } - } - - /* TODO This whole download logic is a mess. It should probably go to - * its own class, and it needs a cleanup. */ - - /* Are we missing the consensus, and should the next directory that - * hasn't tried downloading it before attempt to download it? */ - private boolean missingConsensus = false; - - /* Which directories have attempted to download the consensus so far, - * including those directories that are currently attempting it? */ - private Set<String> requestedConsensuses = new HashSet<String>(); - - /* Which votes are we currently missing? */ - private Set<String> missingVotes = new HashSet<String>(); - - /* Which vote (map value) is a given directory (map key) currently - * attempting to download? */ - private Map<String, String> requestingVotes = - new HashMap<String, String>(); - - /* Which votes (map value) has a given directory (map key) attempted or - * is currently attempting to download? */ - private Map<String, Set<String>> requestedVotes = - new HashMap<String, Set<String>>(); - - /* Look up what request a directory should make next. If there is - * nothing to do right now, but maybe later, block the caller. If - * we're done downloading, return null to notify the caller. */ - private synchronized DescriptorRequestImpl createRequest( - String nickname) { - while (!this.hasFinishedDownloading) { - DescriptorRequestImpl request = new DescriptorRequestImpl(); - request.setDirectoryNickname(nickname); - if ((this.missingConsensus || - (this.downloadConsensusFromAllAuthorities && - this.directoryAuthorities.containsKey(nickname))) && - !this.requestedConsensuses.contains(nickname)) { - if (!this.downloadConsensusFromAllAuthorities) { - this.missingConsensus = false; - } - this.requestedConsensuses.add(nickname); - request.setRequestedResource( - "/tor/status-vote/current/consensus.z"); - request.setDescriptorType("consensus"); - return request; - } - if (!this.missingVotes.isEmpty() && - this.directoryAuthorities.containsKey(nickname)) { - String requestingVote = null; - for (String missingVote : this.missingVotes) { - if (!this.requestedVotes.containsKey(nickname) || - !this.requestedVotes.get(nickname).contains(missingVote)) { - requestingVote = missingVote; - } - } - if (requestingVote != null) { - this.requestingVotes.put(nickname, requestingVote); - if (!this.requestedVotes.containsKey(nickname)) { - this.requestedVotes.put(nickname, new HashSet<String>()); - } - this.requestedVotes.get(nickname).add(requestingVote); - this.missingVotes.remove(requestingVote); - request.setRequestedResource("/tor/status-vote/current/" - + requestingVote + ".z"); - request.setDescriptorType("vote"); - return request; - } - } - /* TODO Add server descriptors and extra-info descriptors later. */ - try { - this.wait(); - } catch (InterruptedException e) { - /* TODO What shall we do? */ - } - } - return null; - } - - /* Deliver a response which may either contain one or more descriptors - * or a failure response code. Update the lists of missing descriptors, - * decide if there are more descriptors to download, and wake up any - * waiting downloader threads. */ - private synchronized void deliverResponse( - DescriptorRequestImpl response) { - String nickname = response.getDirectoryNickname(); - if (response.getDescriptorType().equals("consensus")) { - if (response.getResponseCode() == 200) { - List<RelayNetworkStatusConsensus> parsedConsensuses = - RelayNetworkStatusConsensusImpl.parseConsensuses( - response.getResponseBytes()); - List<Descriptor> parsedDescriptors = - new ArrayList<Descriptor>(parsedConsensuses); - response.setDescriptors(parsedDescriptors); - if (this.includeCurrentReferencedVotes) { - /* TODO Only add votes if the consensus is not older than one - * hour. Or does that make no sense? */ - for (RelayNetworkStatusConsensus parsedConsensus : - parsedConsensuses) { - for (DirSourceEntry dirSource : - parsedConsensus.getDirSourceEntries().values()) { - String identity = dirSource.getIdentity(); - if (!this.missingVotes.contains(identity)) { - boolean alreadyRequested = false; - for (Set<String> requestedBefore : - this.requestedVotes.values()) { - if (requestedBefore.contains(identity)) { - alreadyRequested = true; - break; - } - } - if (!alreadyRequested) { - this.missingVotes.add(identity); - } - } - } - } - /* TODO Later, add referenced server descriptors. */ - } - } else { - this.missingConsensus = true; - } - } else if (response.getDescriptorType().equals("vote")) { - String requestedVote = requestingVotes.remove(nickname); - if (response.getResponseCode() == 200) { - List<RelayNetworkStatusVote> parsedVotes = - RelayNetworkStatusVoteImpl.parseVotes( - response.getResponseBytes()); - List<Descriptor> parsedDescriptors = - new ArrayList<Descriptor>(parsedVotes); - response.setDescriptors(parsedDescriptors); - } else { - this.missingVotes.add(requestedVote); - } - } - if (response.getRequestEnd() != 0L) { - this.descriptorQueue.add(response); - } - if ((!this.missingConsensus || - (this.downloadConsensusFromAllAuthorities && - this.requestedConsensuses.containsAll( - this.directoryAuthorities.keySet()))) && - this.missingVotes.isEmpty() && - this.requestingVotes.isEmpty()) { - /* TODO This logic may be somewhat broken. We don't wait for all - * consensus requests to complete or fail, which results in adding - * (failed) requests to the queue when we think we're done. */ - this.hasFinishedDownloading = true; - this.globalTimerThread.interrupt(); - this.descriptorQueue.setOutOfDescriptors(); - } - /* Wake up all waiting downloader threads. Maybe they can now - * download something, or they'll realize we're done downloading. */ - this.notifyAll(); - } - - private synchronized void interruptAllDownloads() { - this.hasFinishedDownloading = true; - this.notifyAll(); - } - - /* Download descriptors from one directory authority or mirror. First, - * ask the coordinator thread to create a request, run it, and deliver - * the response. Repeat until the coordinator thread says there are no - * further requests to make. */ - private static class DirectoryDownloader implements Runnable { - private String nickname; - private String ipPort; - private RelayDescriptorDownloaderImpl downloadCoordinator; - private DirectoryDownloader(String nickname, String ip, int dirPort, - RelayDescriptorDownloaderImpl downloadCoordinator) { - this.nickname = nickname; - this.ipPort = ip + ":" + String.valueOf(dirPort); - this.downloadCoordinator = downloadCoordinator; - } - private long requestTimeout; - private void setRequestTimeout(long requestTimeout) { - this.requestTimeout = requestTimeout; - } - public void run() { - boolean keepRunning = true; - do { - DescriptorRequestImpl request = - this.downloadCoordinator.createRequest(this.nickname); - if (request != null) { - String url = "http://" + this.ipPort - + request.getRequestedResource(); - request.setRequestStart(System.currentTimeMillis()); - Thread timeoutThread = new Thread(new RequestTimeout( - this.requestTimeout)); - timeoutThread.start(); - try { - URL u = new URL(url); - HttpURLConnection huc = - (HttpURLConnection) u.openConnection(); - huc.setRequestMethod("GET"); - huc.connect(); - int responseCode = huc.getResponseCode(); - request.setResponseCode(responseCode); - if (responseCode == 200) { - BufferedInputStream in = new BufferedInputStream( - new InflaterInputStream(huc.getInputStream())); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - int len; - byte[] data = new byte[1024]; - while ((len = in.read(data, 0, 1024)) >= 0) { - baos.write(data, 0, len); - } - in.close(); - byte[] responseBytes = baos.toByteArray(); - request.setResponseBytes(responseBytes); - request.setRequestEnd(System.currentTimeMillis()); - } - } catch (IOException e) { - /* Stop downloading from this directory if there are any - * problems, e.g., refused connections. */ - keepRunning = false; - } - /* TODO How do we find out if we were interrupted, and by who? - * Set the request or global timeout flag in the response. */ - timeoutThread.interrupt(); - this.downloadCoordinator.deliverResponse(request); - } else { - keepRunning = false; - } - } while (keepRunning); - } - } - - /* Interrupt a download request if it takes longer than a given time. */ - private static class RequestTimeout implements Runnable { - private long timeoutMillis; - private Thread downloaderThread; - private RequestTimeout(long timeoutMillis) { - this.downloaderThread = Thread.currentThread(); - this.timeoutMillis = timeoutMillis; - } - public void run() { - long started = System.currentTimeMillis(), sleep; - while ((sleep = started + this.timeoutMillis - - System.currentTimeMillis()) > 0L) { - try { - Thread.sleep(sleep); - } catch (InterruptedException e) { - return; - } - } - this.downloaderThread.interrupt(); - } + this.hasStartedDownloading = true; + DownloadCoordinator downloadCoordinator = new DownloadCoordinator( + this.directoryAuthorities, this.directoryMirrors, + this.downloadConsensus, this.downloadConsensusFromAllAuthorities, + this.downloadVotes, this.includeCurrentReferencedVotes, + this.requestTimeoutMillis, this.globalTimeoutMillis); + Iterator<DescriptorRequest> descriptorQueue = downloadCoordinator. + getDescriptorQueue(); + return descriptorQueue; } }