[tor-commits] [stem/master] Implement CollecTor download retries

atagar at torproject.org atagar at torproject.org
Sat Aug 17 20:44:27 UTC 2019


commit d3de8b55528f4ed186fb9f99d032f421a4c1f301
Author: Damian Johnson <atagar at torproject.org>
Date:   Thu Jul 4 17:27:32 2019 -0700

    Implement CollecTor download retries
---
 stem/descriptor/collector.py      | 63 +++++++++++++++++++++++++++++++--------
 test/unit/descriptor/collector.py | 10 ++++++-
 2 files changed, 60 insertions(+), 13 deletions(-)

diff --git a/stem/descriptor/collector.py b/stem/descriptor/collector.py
index 21c9d91d..0bbf4251 100644
--- a/stem/descriptor/collector.py
+++ b/stem/descriptor/collector.py
@@ -47,9 +47,11 @@ With this you can either download and read directly from CollecTor...
 """
 
 import json
+import sys
 import time
 
 from stem.descriptor import Compression
+from stem.util import log
 
 try:
   # account for urllib's change between python 2.x and 3.x
@@ -88,6 +90,53 @@ def url(resource, compression = Compression.PLAINTEXT):
   return COLLECTOR_URL + '/'.join(path) + extension
 
 
+def _download(url, compression, timeout, retries):
+  """
+  Download from the given url.
+
+  :param str url: url to download from
+  :param descriptor.Compression compression: decompression type
+  :param int timeout: timeout when connection becomes idle, no timeout applied
+    if **None**
+  :param int retires: maximum attempts to impose
+
+  :returns: content of the given url
+
+  :raises:
+    * **IOError** if unable to decompress
+    * **socket.timeout** if our request timed out
+    * **urllib2.URLError** for most request failures
+
+    Note that the urllib2 module may fail with other exception types, in
+    which case we'll pass it along.
+  """
+
+  start_time = time.time()
+
+  try:
+    response = urllib.urlopen(url, timeout = timeout).read()
+  except:
+    exc = sys.exc_info()[1]
+
+    if timeout is not None:
+      timeout -= time.time() - start_time
+
+    if retries > 0 and (timeout is None or timeout > 0):
+      log.debug("Failed to download from CollecTor at '%s' (%i retries remaining): %s" % (url, retries, exc))
+      return _download(url, compression, timeout, retries - 1)
+    else:
+      log.debug("Failed to download from CollecTor at '%s': %s" % (url, exc))
+      raise
+
+  if compression not in (None, Compression.PLAINTEXT):
+    try:
+      response = compression.decompress(response)
+    except Exception as exc:
+      raise IOError('Unable to decompress %s response from %s: %s' % (compression, url, exc))
+
+  return stem.util.str_tools._to_unicode(response)
+
+
 class CollecTor(object):
   """
   Downloader for descriptors from CollecTor. The contents of CollecTor are
@@ -110,7 +159,6 @@ class CollecTor(object):
     self._cached_index_at = 0
 
     if compression == 'best':
-
       for option in (Compression.LZMA, Compression.BZ2, Compression.GZIP):
         if option.available:
           self.compression = option
@@ -134,17 +182,8 @@ class CollecTor(object):
     """
 
     if not self._cached_index or time.time() - self._cached_index_at >= REFRESH_INDEX_RATE:
-      # TODO: add retry support
-
-      response = urllib.urlopen(url('index', self.compression), timeout = self.timeout).read()
-
-      if self.compression:
-        try:
-          response = self.compression.decompress(response)
-        except Exception as exc:
-          raise IOError('Unable to decompress response as %s: %s' % (self.compression, exc))
-
-      self._cached_index = json.loads(stem.util.str_tools._to_unicode(response))
+      response = _download(url('index', self.compression), self.compression, self.timeout, self.retries)
+      self._cached_index = json.loads(response)
       self._cached_index_at = time.time()
 
     return self._cached_index
diff --git a/test/unit/descriptor/collector.py b/test/unit/descriptor/collector.py
index 6aeda8f0..c46fb60c 100644
--- a/test/unit/descriptor/collector.py
+++ b/test/unit/descriptor/collector.py
@@ -28,6 +28,14 @@ class TestCollector(unittest.TestCase):
     self.assertEqual('https://collector.torproject.org/index/index.json.bz2', url('index', compression = Compression.BZ2))
     self.assertEqual('https://collector.torproject.org/index/index.json.xz', url('index', compression = Compression.LZMA))
 
+  @patch(URL_OPEN)
+  def test_retries(self, urlopen_mock):
+    collector = CollecTor(retries = 4)
+    urlopen_mock.side_effect = IOError('boom')
+
+    self.assertRaisesRegexp(IOError, 'boom', collector.index)
+    self.assertEqual(5, urlopen_mock.call_count)
+
   @patch(URL_OPEN, Mock(return_value = io.BytesIO(b'{"index_created":"2017-12-25 21:06","build_revision":"56a303e","path":"https://collector.torproject.org"}')))
   def test_index(self):
     expected = {
@@ -52,4 +60,4 @@ class TestCollector(unittest.TestCase):
     for compression in (Compression.GZIP, Compression.BZ2, Compression.LZMA):
       with patch(URL_OPEN, Mock(return_value = io.BytesIO(b'not compressed'))):
         collector = CollecTor(compression = compression)
-        self.assertRaisesRegexp(IOError, 'Unable to decompress response as %s' % compression, collector.index)
+        self.assertRaisesRegexp(IOError, 'Unable to decompress %s response' % compression, collector.index)





More information about the tor-commits mailing list