commit bc9925b34030c4bba62fd0bda248a720e7da2989 Author: Karsten Loesing karsten.loesing@gmx.net Date: Thu Jan 19 15:21:50 2012 +0100
Make DownloadCoordinator interface to facilitate testing. --- .../descriptor/impl/DownloadCoordinator.java | 263 +------------------- .../descriptor/impl/DownloadCoordinatorImpl.java | 267 ++++++++++++++++++++ .../impl/RelayDescriptorDownloaderImpl.java | 11 +- 3 files changed, 276 insertions(+), 265 deletions(-)
diff --git a/src/org/torproject/descriptor/impl/DownloadCoordinator.java b/src/org/torproject/descriptor/impl/DownloadCoordinator.java index 362006d..6d837a5 100644 --- a/src/org/torproject/descriptor/impl/DownloadCoordinator.java +++ b/src/org/torproject/descriptor/impl/DownloadCoordinator.java @@ -2,266 +2,9 @@ * See LICENSE for licensing information */ package org.torproject.descriptor.impl;
-import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; +public interface DownloadCoordinator {
-import org.torproject.descriptor.Descriptor; -import org.torproject.descriptor.DescriptorRequest; -import org.torproject.descriptor.DirSourceEntry; -import org.torproject.descriptor.RelayNetworkStatusConsensus; -import org.torproject.descriptor.RelayNetworkStatusVote; + public DescriptorRequestImpl createRequest(String nickname);
-/* TODO This whole download logic is a mess and needs a cleanup. */ -public class DownloadCoordinator { - - private BlockingIteratorImpl<DescriptorRequest> descriptorQueue = - new BlockingIteratorImpl<DescriptorRequest>(); - protected Iterator<DescriptorRequest> getDescriptorQueue() { - return this.descriptorQueue; - } - - private SortedMap<String, DirectoryDownloader> directoryAuthorities; - private SortedMap<String, DirectoryDownloader> directoryMirrors; - private boolean downloadConsensusFromAllAuthorities; - private boolean includeCurrentReferencedVotes; - private long requestTimeoutMillis; - private long globalTimeoutMillis; - - protected DownloadCoordinator( - SortedMap<String, DirectoryDownloader> directoryAuthorities, - SortedMap<String, DirectoryDownloader> directoryMirrors, - boolean downloadConsensus, - boolean downloadConsensusFromAllAuthorities, - Set<String> downloadVotes, boolean includeCurrentReferencedVotes, - long requestTimeoutMillis, long globalTimeoutMillis) { - this.directoryAuthorities = directoryAuthorities; - this.directoryMirrors = directoryMirrors; - this.missingConsensus = downloadConsensus; - this.downloadConsensusFromAllAuthorities = - downloadConsensusFromAllAuthorities; - this.missingVotes = downloadVotes; - this.includeCurrentReferencedVotes = includeCurrentReferencedVotes; - this.requestTimeoutMillis = requestTimeoutMillis; - this.globalTimeoutMillis = globalTimeoutMillis; - if (this.directoryMirrors.isEmpty() && - this.directoryAuthorities.isEmpty()) { - this.descriptorQueue.setOutOfDescriptors(); - /* TODO Should we say anything if we don't have any directories - * configured? */ - } else { - GlobalTimer globalTimer = new GlobalTimer(this.globalTimeoutMillis, - this); - this.globalTimerThread = new Thread(globalTimer); - this.globalTimerThread.start(); - for (DirectoryDownloader directoryMirror : - this.directoryMirrors.values()) { - directoryMirror.setDownloadCoordinator(this); - directoryMirror.setRequestTimeout(this.requestTimeoutMillis); - new Thread(directoryMirror).start(); - } - for (DirectoryDownloader directoryAuthority : - this.directoryAuthorities.values()) { - directoryAuthority.setDownloadCoordinator(this); - directoryAuthority.setRequestTimeout(this.requestTimeoutMillis); - new Thread(directoryAuthority).start(); - } - } - } - - /* Interrupt all downloads if the total download time exceeds a given - * time. */ - private Thread globalTimerThread; - private static class GlobalTimer implements Runnable { - private long timeoutMillis; - private DownloadCoordinator downloadCoordinator; - private GlobalTimer(long timeoutMillis, - DownloadCoordinator downloadCoordinator) { - this.timeoutMillis = timeoutMillis; - this.downloadCoordinator = downloadCoordinator; - } - public void run() { - long started = System.currentTimeMillis(), sleep; - while ((sleep = started + this.timeoutMillis - - System.currentTimeMillis()) > 0L) { - try { - Thread.sleep(sleep); - } catch (InterruptedException e) { - return; - } - } - this.downloadCoordinator.interruptAllDownloads(); - } - } - - /* Are we missing the consensus, and should the next directory that - * hasn't tried downloading it before attempt to download it? */ - private boolean missingConsensus = false; - - /* Which directories are currently attempting to download the - * consensus? */ - private Set<String> requestingConsensuses = new HashSet<String>(); - - /* Which directories have attempted to download the consensus so far, - * including those directories that are currently attempting it? */ - private Set<String> requestedConsensuses = new HashSet<String>(); - - /* Which votes are we currently missing? */ - private Set<String> missingVotes = new HashSet<String>(); - - /* Which vote (map value) is a given directory (map key) currently - * attempting to download? */ - private Map<String, String> requestingVotes = - new HashMap<String, String>(); - - /* Which votes (map value) has a given directory (map key) attempted or - * is currently attempting to download? */ - private Map<String, Set<String>> requestedVotes = - new HashMap<String, Set<String>>(); - - private boolean hasFinishedDownloading = false; - - /* Look up what request a directory should make next. If there is - * nothing to do right now, but maybe later, block the caller. If - * we're done downloading, return null to notify the caller. */ - protected synchronized DescriptorRequestImpl createRequest( - String nickname) { - while (!this.hasFinishedDownloading) { - DescriptorRequestImpl request = new DescriptorRequestImpl(); - request.setDirectoryNickname(nickname); - if ((this.missingConsensus || - (this.downloadConsensusFromAllAuthorities && - this.directoryAuthorities.containsKey(nickname))) && - !this.requestedConsensuses.contains(nickname)) { - if (!this.downloadConsensusFromAllAuthorities) { - this.missingConsensus = false; - } - this.requestingConsensuses.add(nickname); - this.requestedConsensuses.add(nickname); - request.setRequestedResource( - "/tor/status-vote/current/consensus.z"); - request.setDescriptorType("consensus"); - return request; - } - if (!this.missingVotes.isEmpty() && - this.directoryAuthorities.containsKey(nickname)) { - String requestingVote = null; - for (String missingVote : this.missingVotes) { - if (!this.requestedVotes.containsKey(nickname) || - !this.requestedVotes.get(nickname).contains(missingVote)) { - requestingVote = missingVote; - } - } - if (requestingVote != null) { - this.requestingVotes.put(nickname, requestingVote); - if (!this.requestedVotes.containsKey(nickname)) { - this.requestedVotes.put(nickname, new HashSet<String>()); - } - this.requestedVotes.get(nickname).add(requestingVote); - this.missingVotes.remove(requestingVote); - request.setRequestedResource("/tor/status-vote/current/" - + requestingVote + ".z"); - request.setDescriptorType("vote"); - return request; - } - } - /* TODO Add server descriptors and extra-info descriptors later. */ - try { - this.wait(); - } catch (InterruptedException e) { - /* TODO What shall we do? */ - } - } - return null; - } - - /* Deliver a response which may either contain one or more descriptors - * or a failure response code. Update the lists of missing descriptors, - * decide if there are more descriptors to download, and wake up any - * waiting downloader threads. */ - protected synchronized void deliverResponse( - DescriptorRequestImpl response) { - String nickname = response.getDirectoryNickname(); - if (response.getDescriptorType().equals("consensus")) { - this.requestingConsensuses.remove(nickname); - if (response.getResponseCode() == 200) { - List<RelayNetworkStatusConsensus> parsedConsensuses = - RelayNetworkStatusConsensusImpl.parseConsensuses( - response.getResponseBytes()); - List<Descriptor> parsedDescriptors = - new ArrayList<Descriptor>(parsedConsensuses); - response.setDescriptors(parsedDescriptors); - if (this.includeCurrentReferencedVotes) { - /* TODO Only add votes if the consensus is not older than one - * hour. Or does that make no sense? */ - for (RelayNetworkStatusConsensus parsedConsensus : - parsedConsensuses) { - for (DirSourceEntry dirSource : - parsedConsensus.getDirSourceEntries().values()) { - String identity = dirSource.getIdentity(); - if (!this.missingVotes.contains(identity)) { - boolean alreadyRequested = false; - for (Set<String> requestedBefore : - this.requestedVotes.values()) { - if (requestedBefore.contains(identity)) { - alreadyRequested = true; - break; - } - } - if (!alreadyRequested) { - this.missingVotes.add(identity); - } - } - } - } - /* TODO Later, add referenced server descriptors. */ - } - } else { - this.missingConsensus = true; - } - } else if (response.getDescriptorType().equals("vote")) { - String requestedVote = requestingVotes.remove(nickname); - if (response.getResponseCode() == 200) { - List<RelayNetworkStatusVote> parsedVotes = - RelayNetworkStatusVoteImpl.parseVotes( - response.getResponseBytes()); - List<Descriptor> parsedDescriptors = - new ArrayList<Descriptor>(parsedVotes); - response.setDescriptors(parsedDescriptors); - } else { - this.missingVotes.add(requestedVote); - } - } - if (response.getRequestEnd() != 0L) { - this.descriptorQueue.add(response); - } - if ((!this.missingConsensus || - (this.downloadConsensusFromAllAuthorities && - this.requestedConsensuses.containsAll( - this.directoryAuthorities.keySet()) && - this.requestingConsensuses.isEmpty())) && - this.missingVotes.isEmpty() && - this.requestingVotes.isEmpty()) { - /* TODO This logic may be somewhat broken. We don't wait for all - * consensus requests to complete or fail, which results in adding - * (failed) requests to the queue when we think we're done. */ - this.hasFinishedDownloading = true; - this.globalTimerThread.interrupt(); - this.descriptorQueue.setOutOfDescriptors(); - } - /* Wake up all waiting downloader threads. Maybe they can now - * download something, or they'll realize we're done downloading. */ - this.notifyAll(); - } - - private synchronized void interruptAllDownloads() { - this.hasFinishedDownloading = true; - this.notifyAll(); - } + public void deliverResponse(DescriptorRequestImpl request); } - diff --git a/src/org/torproject/descriptor/impl/DownloadCoordinatorImpl.java b/src/org/torproject/descriptor/impl/DownloadCoordinatorImpl.java new file mode 100644 index 0000000..5d4ca21 --- /dev/null +++ b/src/org/torproject/descriptor/impl/DownloadCoordinatorImpl.java @@ -0,0 +1,267 @@ +/* Copyright 2011, 2012 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.descriptor.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; + +import org.torproject.descriptor.Descriptor; +import org.torproject.descriptor.DescriptorRequest; +import org.torproject.descriptor.DirSourceEntry; +import org.torproject.descriptor.RelayNetworkStatusConsensus; +import org.torproject.descriptor.RelayNetworkStatusVote; + +/* TODO This whole download logic is a mess and needs a cleanup. */ +public class DownloadCoordinatorImpl implements DownloadCoordinator { + + private BlockingIteratorImpl<DescriptorRequest> descriptorQueue = + new BlockingIteratorImpl<DescriptorRequest>(); + protected Iterator<DescriptorRequest> getDescriptorQueue() { + return this.descriptorQueue; + } + + private SortedMap<String, DirectoryDownloader> directoryAuthorities; + private SortedMap<String, DirectoryDownloader> directoryMirrors; + private boolean downloadConsensusFromAllAuthorities; + private boolean includeCurrentReferencedVotes; + private long requestTimeoutMillis; + private long globalTimeoutMillis; + + protected DownloadCoordinatorImpl( + SortedMap<String, DirectoryDownloader> directoryAuthorities, + SortedMap<String, DirectoryDownloader> directoryMirrors, + boolean downloadConsensus, + boolean downloadConsensusFromAllAuthorities, + Set<String> downloadVotes, boolean includeCurrentReferencedVotes, + long requestTimeoutMillis, long globalTimeoutMillis) { + this.directoryAuthorities = directoryAuthorities; + this.directoryMirrors = directoryMirrors; + this.missingConsensus = downloadConsensus; + this.downloadConsensusFromAllAuthorities = + downloadConsensusFromAllAuthorities; + this.missingVotes = downloadVotes; + this.includeCurrentReferencedVotes = includeCurrentReferencedVotes; + this.requestTimeoutMillis = requestTimeoutMillis; + this.globalTimeoutMillis = globalTimeoutMillis; + if (this.directoryMirrors.isEmpty() && + this.directoryAuthorities.isEmpty()) { + this.descriptorQueue.setOutOfDescriptors(); + /* TODO Should we say anything if we don't have any directories + * configured? */ + } else { + GlobalTimer globalTimer = new GlobalTimer(this.globalTimeoutMillis, + this); + this.globalTimerThread = new Thread(globalTimer); + this.globalTimerThread.start(); + for (DirectoryDownloader directoryMirror : + this.directoryMirrors.values()) { + directoryMirror.setDownloadCoordinator(this); + directoryMirror.setRequestTimeout(this.requestTimeoutMillis); + new Thread(directoryMirror).start(); + } + for (DirectoryDownloader directoryAuthority : + this.directoryAuthorities.values()) { + directoryAuthority.setDownloadCoordinator(this); + directoryAuthority.setRequestTimeout(this.requestTimeoutMillis); + new Thread(directoryAuthority).start(); + } + } + } + + /* Interrupt all downloads if the total download time exceeds a given + * time. */ + private Thread globalTimerThread; + private static class GlobalTimer implements Runnable { + private long timeoutMillis; + private DownloadCoordinatorImpl downloadCoordinator; + private GlobalTimer(long timeoutMillis, + DownloadCoordinatorImpl downloadCoordinator) { + this.timeoutMillis = timeoutMillis; + this.downloadCoordinator = downloadCoordinator; + } + public void run() { + long started = System.currentTimeMillis(), sleep; + while ((sleep = started + this.timeoutMillis + - System.currentTimeMillis()) > 0L) { + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + return; + } + } + this.downloadCoordinator.interruptAllDownloads(); + } + } + + /* Are we missing the consensus, and should the next directory that + * hasn't tried downloading it before attempt to download it? */ + private boolean missingConsensus = false; + + /* Which directories are currently attempting to download the + * consensus? */ + private Set<String> requestingConsensuses = new HashSet<String>(); + + /* Which directories have attempted to download the consensus so far, + * including those directories that are currently attempting it? */ + private Set<String> requestedConsensuses = new HashSet<String>(); + + /* Which votes are we currently missing? */ + private Set<String> missingVotes = new HashSet<String>(); + + /* Which vote (map value) is a given directory (map key) currently + * attempting to download? */ + private Map<String, String> requestingVotes = + new HashMap<String, String>(); + + /* Which votes (map value) has a given directory (map key) attempted or + * is currently attempting to download? */ + private Map<String, Set<String>> requestedVotes = + new HashMap<String, Set<String>>(); + + private boolean hasFinishedDownloading = false; + + /* Look up what request a directory should make next. If there is + * nothing to do right now, but maybe later, block the caller. If + * we're done downloading, return null to notify the caller. */ + public synchronized DescriptorRequestImpl createRequest( + String nickname) { + while (!this.hasFinishedDownloading) { + DescriptorRequestImpl request = new DescriptorRequestImpl(); + request.setDirectoryNickname(nickname); + if ((this.missingConsensus || + (this.downloadConsensusFromAllAuthorities && + this.directoryAuthorities.containsKey(nickname))) && + !this.requestedConsensuses.contains(nickname)) { + if (!this.downloadConsensusFromAllAuthorities) { + this.missingConsensus = false; + } + this.requestingConsensuses.add(nickname); + this.requestedConsensuses.add(nickname); + request.setRequestedResource( + "/tor/status-vote/current/consensus.z"); + request.setDescriptorType("consensus"); + return request; + } + if (!this.missingVotes.isEmpty() && + this.directoryAuthorities.containsKey(nickname)) { + String requestingVote = null; + for (String missingVote : this.missingVotes) { + if (!this.requestedVotes.containsKey(nickname) || + !this.requestedVotes.get(nickname).contains(missingVote)) { + requestingVote = missingVote; + } + } + if (requestingVote != null) { + this.requestingVotes.put(nickname, requestingVote); + if (!this.requestedVotes.containsKey(nickname)) { + this.requestedVotes.put(nickname, new HashSet<String>()); + } + this.requestedVotes.get(nickname).add(requestingVote); + this.missingVotes.remove(requestingVote); + request.setRequestedResource("/tor/status-vote/current/" + + requestingVote + ".z"); + request.setDescriptorType("vote"); + return request; + } + } + /* TODO Add server descriptors and extra-info descriptors later. */ + try { + this.wait(); + } catch (InterruptedException e) { + /* TODO What shall we do? */ + } + } + return null; + } + + /* Deliver a response which may either contain one or more descriptors + * or a failure response code. Update the lists of missing descriptors, + * decide if there are more descriptors to download, and wake up any + * waiting downloader threads. */ + public synchronized void deliverResponse( + DescriptorRequestImpl response) { + String nickname = response.getDirectoryNickname(); + if (response.getDescriptorType().equals("consensus")) { + this.requestingConsensuses.remove(nickname); + if (response.getResponseCode() == 200) { + List<RelayNetworkStatusConsensus> parsedConsensuses = + RelayNetworkStatusConsensusImpl.parseConsensuses( + response.getResponseBytes()); + List<Descriptor> parsedDescriptors = + new ArrayList<Descriptor>(parsedConsensuses); + response.setDescriptors(parsedDescriptors); + if (this.includeCurrentReferencedVotes) { + /* TODO Only add votes if the consensus is not older than one + * hour. Or does that make no sense? */ + for (RelayNetworkStatusConsensus parsedConsensus : + parsedConsensuses) { + for (DirSourceEntry dirSource : + parsedConsensus.getDirSourceEntries().values()) { + String identity = dirSource.getIdentity(); + if (!this.missingVotes.contains(identity)) { + boolean alreadyRequested = false; + for (Set<String> requestedBefore : + this.requestedVotes.values()) { + if (requestedBefore.contains(identity)) { + alreadyRequested = true; + break; + } + } + if (!alreadyRequested) { + this.missingVotes.add(identity); + } + } + } + } + /* TODO Later, add referenced server descriptors. */ + } + } else { + this.missingConsensus = true; + } + } else if (response.getDescriptorType().equals("vote")) { + String requestedVote = requestingVotes.remove(nickname); + if (response.getResponseCode() == 200) { + List<RelayNetworkStatusVote> parsedVotes = + RelayNetworkStatusVoteImpl.parseVotes( + response.getResponseBytes()); + List<Descriptor> parsedDescriptors = + new ArrayList<Descriptor>(parsedVotes); + response.setDescriptors(parsedDescriptors); + } else { + this.missingVotes.add(requestedVote); + } + } + if (response.getRequestEnd() != 0L) { + this.descriptorQueue.add(response); + } + if ((!this.missingConsensus || + (this.downloadConsensusFromAllAuthorities && + this.requestedConsensuses.containsAll( + this.directoryAuthorities.keySet()) && + this.requestingConsensuses.isEmpty())) && + this.missingVotes.isEmpty() && + this.requestingVotes.isEmpty()) { + /* TODO This logic may be somewhat broken. We don't wait for all + * consensus requests to complete or fail, which results in adding + * (failed) requests to the queue when we think we're done. */ + this.hasFinishedDownloading = true; + this.globalTimerThread.interrupt(); + this.descriptorQueue.setOutOfDescriptors(); + } + /* Wake up all waiting downloader threads. Maybe they can now + * download something, or they'll realize we're done downloading. */ + this.notifyAll(); + } + + private synchronized void interruptAllDownloads() { + this.hasFinishedDownloading = true; + this.notifyAll(); + } +} + diff --git a/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java b/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java index 6ea81e1..7cdcc0c 100644 --- a/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java +++ b/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java @@ -227,11 +227,12 @@ public class RelayDescriptorDownloaderImpl + "permitted once."); } this.hasStartedDownloading = true; - DownloadCoordinator downloadCoordinator = new DownloadCoordinator( - this.directoryAuthorities, this.directoryMirrors, - this.downloadConsensus, this.downloadConsensusFromAllAuthorities, - this.downloadVotes, this.includeCurrentReferencedVotes, - this.requestTimeoutMillis, this.globalTimeoutMillis); + DownloadCoordinatorImpl downloadCoordinator = + new DownloadCoordinatorImpl(this.directoryAuthorities, + this.directoryMirrors, this.downloadConsensus, + this.downloadConsensusFromAllAuthorities, this.downloadVotes, + this.includeCurrentReferencedVotes, this.requestTimeoutMillis, + this.globalTimeoutMillis); Iterator<DescriptorRequest> descriptorQueue = downloadCoordinator. getDescriptorQueue(); return descriptorQueue;