commit d3de8b55528f4ed186fb9f99d032f421a4c1f301 Author: Damian Johnson atagar@torproject.org Date: Thu Jul 4 17:27:32 2019 -0700
Implement CollecTor download retries --- stem/descriptor/collector.py | 63 +++++++++++++++++++++++++++++++-------- test/unit/descriptor/collector.py | 10 ++++++- 2 files changed, 60 insertions(+), 13 deletions(-)
diff --git a/stem/descriptor/collector.py b/stem/descriptor/collector.py index 21c9d91d..0bbf4251 100644 --- a/stem/descriptor/collector.py +++ b/stem/descriptor/collector.py @@ -47,9 +47,11 @@ With this you can either download and read directly from CollecTor... """
import json +import sys import time
from stem.descriptor import Compression +from stem.util import log
try: # account for urllib's change between python 2.x and 3.x @@ -88,6 +90,53 @@ def url(resource, compression = Compression.PLAINTEXT): return COLLECTOR_URL + '/'.join(path) + extension
+def _download(url, compression, timeout, retries): + """ + Download from the given url. + + :param str url: url to download from + :param descriptor.Compression compression: decompression type + :param int timeout: timeout when connection becomes idle, no timeout applied + if **None** + :param int retires: maximum attempts to impose + + :returns: content of the given url + + :raises: + * **IOError** if unable to decompress + * **socket.timeout** if our request timed out + * **urllib2.URLError** for most request failures + + Note that the urllib2 module may fail with other exception types, in + which case we'll pass it along. + """ + + start_time = time.time() + + try: + response = urllib.urlopen(url, timeout = timeout).read() + except: + exc = sys.exc_info()[1] + + if timeout is not None: + timeout -= time.time() - start_time + + if retries > 0 and (timeout is None or timeout > 0): + log.debug("Failed to download from CollecTor at '%s' (%i retries remaining): %s" % (url, retries, exc)) + return _download(url, compression, timeout, retries - 1) + else: + log.debug("Failed to download from CollecTor at '%s': %s" % (url, exc)) + raise + + if compression not in (None, Compression.PLAINTEXT): + try: + response = compression.decompress(response) + except Exception as exc: + raise IOError('Unable to decompress %s response from %s: %s' % (compression, url, exc)) + + return stem.util.str_tools._to_unicode(response) + + class CollecTor(object): """ Downloader for descriptors from CollecTor. The contents of CollecTor are @@ -110,7 +159,6 @@ class CollecTor(object): self._cached_index_at = 0
if compression == 'best': - for option in (Compression.LZMA, Compression.BZ2, Compression.GZIP): if option.available: self.compression = option @@ -134,17 +182,8 @@ class CollecTor(object): """
if not self._cached_index or time.time() - self._cached_index_at >= REFRESH_INDEX_RATE: - # TODO: add retry support - - response = urllib.urlopen(url('index', self.compression), timeout = self.timeout).read() - - if self.compression: - try: - response = self.compression.decompress(response) - except Exception as exc: - raise IOError('Unable to decompress response as %s: %s' % (self.compression, exc)) - - self._cached_index = json.loads(stem.util.str_tools._to_unicode(response)) + response = _download(url('index', self.compression), self.compression, self.timeout, self.retries) + self._cached_index = json.loads(response) self._cached_index_at = time.time()
return self._cached_index diff --git a/test/unit/descriptor/collector.py b/test/unit/descriptor/collector.py index 6aeda8f0..c46fb60c 100644 --- a/test/unit/descriptor/collector.py +++ b/test/unit/descriptor/collector.py @@ -28,6 +28,14 @@ class TestCollector(unittest.TestCase): self.assertEqual('https://collector.torproject.org/index/index.json.bz2', url('index', compression = Compression.BZ2)) self.assertEqual('https://collector.torproject.org/index/index.json.xz', url('index', compression = Compression.LZMA))
+ @patch(URL_OPEN) + def test_retries(self, urlopen_mock): + collector = CollecTor(retries = 4) + urlopen_mock.side_effect = IOError('boom') + + self.assertRaisesRegexp(IOError, 'boom', collector.index) + self.assertEqual(5, urlopen_mock.call_count) + @patch(URL_OPEN, Mock(return_value = io.BytesIO(b'{"index_created":"2017-12-25 21:06","build_revision":"56a303e","path":"https://collector.torproject.org%22%7D'))) def test_index(self): expected = { @@ -52,4 +60,4 @@ class TestCollector(unittest.TestCase): for compression in (Compression.GZIP, Compression.BZ2, Compression.LZMA): with patch(URL_OPEN, Mock(return_value = io.BytesIO(b'not compressed'))): collector = CollecTor(compression = compression) - self.assertRaisesRegexp(IOError, 'Unable to decompress response as %s' % compression, collector.index) + self.assertRaisesRegexp(IOError, 'Unable to decompress %s response' % compression, collector.index)