commit 7d04653d908f0c62e197bafa27a5cd94634cbb53 Author: Damian Johnson atagar@torproject.org Date: Mon Jul 15 09:16:54 2013 -0700
Moving retry functionality to Query class
It's far better if the Query class handles retries rather than the DescriptorDownloader. Advantages include...
* The DescriptorDownloader no longer has a reason to support blocking queries. There's no advantage to them since the Query knows if/when the request fails (and hence can retry it on our behalf).
* The Query class is easier to test. The more functionality we can push down into it the simpler the DescriptorDownloader will be.
* More advanced use cases will be using Query instances rather than DescriptorDownloader. By having retry functionality there it'll be easily available to them. --- stem/descriptor/remote.py | 88 ++++++++++++++++++++++----------------- test/integ/descriptor/remote.py | 5 +-- test/unit/descriptor/remote.py | 17 ++++---- 3 files changed, 59 insertions(+), 51 deletions(-)
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py index 464e7cd..88e3258 100644 --- a/stem/descriptor/remote.py +++ b/stem/descriptor/remote.py @@ -48,6 +48,8 @@ import urllib2
import stem.descriptor
+from stem.util import log + # Tor directory authorities as of commit f631b73 (7/4/13). This should only # include authorities with 'v3ident': # @@ -72,27 +74,34 @@ class Query(object): mirror. The caller can block on the response by either calling :func:~stem.descriptor.remote.run: or iterating over our descriptor content.
- :var str address: address of the authority or mirror we're querying - :var int port: directory port we're querying :var str resource: resource being fetched, such as '/tor/status-vote/current/consensus.z' + :var str descriptor_type: type of descriptors being fetched, see + :func:`~stem.descriptor.__init__.parse_file` + + :var list endpoints: (address, dirport) tuples of the authority or mirror + we're querying, this uses authorities if undefined + :var int retries: number of times to attempt the request if it fails + :var bool fall_back_to_authority: when retrying request issues the last + request to a directory authority if **True**
:var Exception error: exception if a problem occured :var bool is_done: flag that indicates if our request has finished - :var str descriptor_type: type of descriptors being fetched, see :func:`~stem.descriptor.__init__.parse_file`
:var float start_time: unix timestamp when we first started running :var float timeout: duration before we'll time out our request :var float runtime: time our query took, this is **None** if it's not yet finished """
- def __init__(self, address, port, resource, descriptor_type, timeout = None, start = True): - self.address = address - self.port = port + def __init__(self, resource, descriptor_type, endpoints = None, retries = 2, fall_back_to_authority = True, timeout = None, start = True): self.resource = resource + self.descriptor_type = descriptor_type + + self.endpoints = endpoints if endpoints else [] + self.retries = retries + self.fall_back_to_authority = fall_back_to_authority
self.error = None self.is_done = False - self.descriptor_type = descriptor_type
self.start_time = None self.timeout = timeout @@ -106,14 +115,23 @@ class Query(object): if start: self.start()
- def get_url(self): + def pick_url(self, use_authority = False): """ - Provides the url being queried. + Provides a url that can be queried. If we have multiple endpoints then one + will be picked randomly. + + :param bool use_authority: ignores our endpoints and uses a directory + authority instead
:returns: **str** for the url being queried by this request """
- return "http://%s:%i/%s" % (self.address, self.port, self.resource.lstrip('/')) + if use_authority or not self.endpoints: + address, dirport = random.choice(DIRECTORY_AUTHORITIES.values()) + else: + address, dirport = random.choice(self.endpoints) + + return "http://%s:%i/%s" % (address, dirport, self.resource.lstrip('/'))
def start(self): """ @@ -122,7 +140,7 @@ class Query(object):
with self._downloader_thread_lock: if self._downloader_thread is None: - self._downloader_thread = threading.Thread(target = self._download_descriptors, name="Descriptor Query") + self._downloader_thread = threading.Thread(target = self._download_descriptors, name="Descriptor Query", args = (self.retries,)) self._downloader_thread.setDaemon(True) self._downloader_thread.start()
@@ -174,10 +192,13 @@ class Query(object): for desc in self.run(True): yield desc
- def _download_descriptors(self): + def _download_descriptors(self, retries): try: + use_authority = retries == 0 and self.fall_back_to_authority + resource_url = self.pick_url(use_authority) + self.start_time = time.time() - response = urllib2.urlopen(self.get_url(), timeout = self.timeout) + response = urllib2.urlopen(resource_url, timeout = self.timeout) self.runtime = time.time() - self.start_time
# This sucks. We need to read the full response into memory before @@ -188,8 +209,16 @@ class Query(object): response = io.BytesIO(response.read().strip())
self._results = stem.descriptor.parse_file(response, self.descriptor_type) + log.trace("Descriptors retrieved from '%s' in %0.2fs" % (resource_url, self.runtime)) except: - self.error = sys.exc_info()[1] + exc = sys.exc_info()[1] + + if retries > 0: + log.debug("Unable to download descriptors from '%s' (%i retries remaining): %s" % (resource_url, retries, exc)) + return self._download_descriptors(retries - 1) + else: + log.debug("Unable to download descriptors from '%s': %s" % (resource_url, exc)) + self.error = exc finally: self.is_done = True
@@ -200,15 +229,9 @@ class DescriptorDownloader(object): caching, retries, and other capabilities to make downloading descriptors easy and efficient.
- Queries can be made in either a blocking or non-blocking fashion. If - non-blocking then retries cannot be performed (since we do not know at the - time of the request if it succeeded or failed). - For more advanced use cases you can use the :class:`~stem.descriptor.remote.Query` class directly.
- :var bool block: blocks until requests have been concluded if **True**, - otherwise provides the query as soon as its been issued :var int retries: number of times to attempt the request if it fails :var float timeout: duration before we'll time out our request, no timeout is applied if **None** @@ -219,37 +242,24 @@ class DescriptorDownloader(object): request to a directory authority if **True** """
- def __init__(self, block = True, retries = 2, timeout = None, start_when_requested = True, fall_back_to_authority = True): - self.block = block + def __init__(self, retries = 2, timeout = None, start_when_requested = True, fall_back_to_authority = True): self.retries = retries self.timeout = timeout self.start_when_requested = start_when_requested self.fall_back_to_authority = fall_back_to_authority - self._directories = DIRECTORY_AUTHORITIES.values() + self._endpoints = DIRECTORY_AUTHORITIES.values()
def _query(self, resource, descriptor_type, retries): """ Issues a request for the given resource. """
- if self.fall_back_to_authority and retries == 0: - address, dirport = random.choice(DIRECTORY_AUTHORITIES.values()) - else: - address, dirport = random.choice(self._directories) - - query = Query( - address, - dirport, + return Query( resource, descriptor_type, + endpoints = self._endpoints, + retries = self.retries, + fall_back_to_authority = self.fall_back_to_authority, timeout = self.timeout, start = self.start_when_requested, ) - - if self.block: - query.run(True) - - if query.error and retries > 0: - return self.query(resource, descriptor_type, retries - 1) - - return query diff --git a/test/integ/descriptor/remote.py b/test/integ/descriptor/remote.py index 50d6d28..8509556 100644 --- a/test/integ/descriptor/remote.py +++ b/test/integ/descriptor/remote.py @@ -29,11 +29,10 @@ class TestDescriptorReader(unittest.TestCase):
for authority, (address, dirport) in stem.descriptor.remote.DIRECTORY_AUTHORITIES.items(): queries.append(stem.descriptor.remote.Query( - address, - dirport, '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31', 'server-descriptor 1.0', - 30, + endpoints = [(address, dirport)], + timeout = 30, ))
for query in queries: diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py index ab2276e..3aadaac 100644 --- a/test/unit/descriptor/remote.py +++ b/test/unit/descriptor/remote.py @@ -65,14 +65,13 @@ class TestDescriptorDownloader(unittest.TestCase): urlopen_mock.return_value = io.BytesIO(TEST_DESCRIPTOR)
query = stem.descriptor.remote.Query( - '128.31.0.39', - 9131, '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31', 'server-descriptor 1.0', + endpoints = [('128.31.0.39', 9131)], )
expeced_url = 'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE...' - self.assertEqual(expeced_url, query.get_url()) + self.assertEqual(expeced_url, query.pick_url())
descriptors = list(query) self.assertEqual(1, len(descriptors)) @@ -95,10 +94,9 @@ class TestDescriptorDownloader(unittest.TestCase): urlopen_mock.return_value = io.BytesIO(descriptor_content)
query = stem.descriptor.remote.Query( - '128.31.0.39', - 9131, '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31', 'server-descriptor 1.0', + endpoints = [('128.31.0.39', 9131)], )
# checking via the iterator @@ -119,15 +117,16 @@ class TestDescriptorDownloader(unittest.TestCase): urlopen_mock.side_effect = socket.timeout('connection timed out')
query = stem.descriptor.remote.Query( - '128.31.0.39', - 9131, '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31', 'server-descriptor 1.0', - 5, + endpoints = [('128.31.0.39', 9131)], + fall_back_to_authority = False, + timeout = 5, )
self.assertRaises(socket.timeout, list, query.run()) - urlopen_mock.assert_called_once_with( + urlopen_mock.assert_called_with( 'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE...', timeout = 5, ) + self.assertEqual(3, urlopen_mock.call_count)