[tor-commits] [stem/master] Moving retry functionality to Query class

atagar at torproject.org atagar at torproject.org
Mon Jul 22 03:10:17 UTC 2013


commit 7d04653d908f0c62e197bafa27a5cd94634cbb53
Author: Damian Johnson <atagar at torproject.org>
Date:   Mon Jul 15 09:16:54 2013 -0700

    Moving retry functionality to Query class
    
    It's far better if the Query class handles retries rather than the
    DescriptorDownloader. Advantages include...
    
    * The DescriptorDownloader no longer has a reason to support blocking queries.
      There's no advantage to them since the Query knows if/when the request fails
      (and hence can retry it on our behalf).
    
    * The Query class is easier to test. The more functionality we can push down
      into it the simpler the DescriptorDownloader will be.
    
    * More advanced use cases will be using Query instances rather than
      DescriptorDownloader. By having retry functionality there it'll be easily
      available to them.
---
 stem/descriptor/remote.py       |   88 ++++++++++++++++++++++-----------------
 test/integ/descriptor/remote.py |    5 +--
 test/unit/descriptor/remote.py  |   17 ++++----
 3 files changed, 59 insertions(+), 51 deletions(-)

diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index 464e7cd..88e3258 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -48,6 +48,8 @@ import urllib2
 
 import stem.descriptor
 
+from stem.util import log
+
 # Tor directory authorities as of commit f631b73 (7/4/13). This should only
 # include authorities with 'v3ident':
 #
@@ -72,27 +74,34 @@ class Query(object):
   mirror. The caller can block on the response by either calling
   :func:~stem.descriptor.remote.run: or iterating over our descriptor content.
 
-  :var str address: address of the authority or mirror we're querying
-  :var int port: directory port we're querying
   :var str resource: resource being fetched, such as '/tor/status-vote/current/consensus.z'
+  :var str descriptor_type: type of descriptors being fetched, see
+    :func:`~stem.descriptor.__init__.parse_file`
+
+  :var list endpoints: (address, dirport) tuples of the authority or mirror
+    we're querying, this uses authorities if undefined
+  :var int retries: number of times to attempt the request if it fails
+  :var bool fall_back_to_authority: when retrying request issues the last
+    request to a directory authority if **True**
 
   :var Exception error: exception if a problem occured
   :var bool is_done: flag that indicates if our request has finished
-  :var str descriptor_type: type of descriptors being fetched, see :func:`~stem.descriptor.__init__.parse_file`
 
   :var float start_time: unix timestamp when we first started running
   :var float timeout: duration before we'll time out our request
   :var float runtime: time our query took, this is **None** if it's not yet finished
   """
 
-  def __init__(self, address, port, resource, descriptor_type, timeout = None, start = True):
-    self.address = address
-    self.port = port
+  def __init__(self, resource, descriptor_type, endpoints = None, retries = 2, fall_back_to_authority = True, timeout = None, start = True):
     self.resource = resource
+    self.descriptor_type = descriptor_type
+
+    self.endpoints = endpoints if endpoints else []
+    self.retries = retries
+    self.fall_back_to_authority = fall_back_to_authority
 
     self.error = None
     self.is_done = False
-    self.descriptor_type = descriptor_type
 
     self.start_time = None
     self.timeout = timeout
@@ -106,14 +115,23 @@ class Query(object):
     if start:
       self.start()
 
-  def get_url(self):
+  def pick_url(self, use_authority = False):
     """
-    Provides the url being queried.
+    Provides a url that can be queried. If we have multiple endpoints then one
+    will be picked randomly.
+
+    :param bool use_authority: ignores our endpoints and uses a directory
+      authority instead
 
     :returns: **str** for the url being queried by this request
     """
 
-    return "http://%s:%i/%s" % (self.address, self.port, self.resource.lstrip('/'))
+    if use_authority or not self.endpoints:
+      address, dirport = random.choice(DIRECTORY_AUTHORITIES.values())
+    else:
+      address, dirport = random.choice(self.endpoints)
+
+    return "http://%s:%i/%s" % (address, dirport, self.resource.lstrip('/'))
 
   def start(self):
     """
@@ -122,7 +140,7 @@ class Query(object):
 
     with self._downloader_thread_lock:
       if self._downloader_thread is None:
-        self._downloader_thread = threading.Thread(target = self._download_descriptors, name="Descriptor Query")
+        self._downloader_thread = threading.Thread(target = self._download_descriptors, name="Descriptor Query", args = (self.retries,))
         self._downloader_thread.setDaemon(True)
         self._downloader_thread.start()
 
@@ -174,10 +192,13 @@ class Query(object):
     for desc in self.run(True):
       yield desc
 
-  def _download_descriptors(self):
+  def _download_descriptors(self, retries):
     try:
+      use_authority = retries == 0 and self.fall_back_to_authority
+      resource_url = self.pick_url(use_authority)
+
       self.start_time = time.time()
-      response = urllib2.urlopen(self.get_url(), timeout = self.timeout)
+      response = urllib2.urlopen(resource_url, timeout = self.timeout)
       self.runtime = time.time() - self.start_time
 
       # This sucks. We need to read the full response into memory before
@@ -188,8 +209,16 @@ class Query(object):
       response = io.BytesIO(response.read().strip())
 
       self._results = stem.descriptor.parse_file(response, self.descriptor_type)
+      log.trace("Descriptors retrieved from '%s' in %0.2fs" % (resource_url, self.runtime))
     except:
-      self.error = sys.exc_info()[1]
+      exc = sys.exc_info()[1]
+
+      if retries > 0:
+        log.debug("Unable to download descriptors from '%s' (%i retries remaining): %s" % (resource_url, retries, exc))
+        return self._download_descriptors(retries - 1)
+      else:
+        log.debug("Unable to download descriptors from '%s': %s" % (resource_url, exc))
+        self.error = exc
     finally:
       self.is_done = True
 
@@ -200,15 +229,9 @@ class DescriptorDownloader(object):
   caching, retries, and other capabilities to make downloading descriptors easy
   and efficient.
 
-  Queries can be made in either a blocking or non-blocking fashion. If
-  non-blocking then retries cannot be performed (since we do not know at the
-  time of the request if it succeeded or failed).
-
   For more advanced use cases you can use the
   :class:`~stem.descriptor.remote.Query` class directly.
 
-  :var bool block: blocks until requests have been concluded if **True**,
-    otherwise provides the query as soon as its been issued
   :var int retries: number of times to attempt the request if it fails
   :var float timeout: duration before we'll time out our request, no timeout is
     applied if **None**
@@ -219,37 +242,24 @@ class DescriptorDownloader(object):
     request to a directory authority if **True**
   """
 
-  def __init__(self, block = True, retries = 2, timeout = None, start_when_requested = True, fall_back_to_authority = True):
-    self.block = block
+  def __init__(self, retries = 2, timeout = None, start_when_requested = True, fall_back_to_authority = True):
     self.retries = retries
     self.timeout = timeout
     self.start_when_requested = start_when_requested
     self.fall_back_to_authority = fall_back_to_authority
-    self._directories = DIRECTORY_AUTHORITIES.values()
+    self._endpoints = DIRECTORY_AUTHORITIES.values()
 
   def _query(self, resource, descriptor_type, retries):
     """
     Issues a request for the given resource.
     """
 
-    if self.fall_back_to_authority and retries == 0:
-      address, dirport = random.choice(DIRECTORY_AUTHORITIES.values())
-    else:
-      address, dirport = random.choice(self._directories)
-
-    query = Query(
-      address,
-      dirport,
+    return Query(
       resource,
       descriptor_type,
+      endpoints = self._endpoints,
+      retries = self.retries,
+      fall_back_to_authority = self.fall_back_to_authority,
       timeout = self.timeout,
       start = self.start_when_requested,
     )
-
-    if self.block:
-      query.run(True)
-
-      if query.error and retries > 0:
-        return self.query(resource, descriptor_type, retries - 1)
-
-    return query
diff --git a/test/integ/descriptor/remote.py b/test/integ/descriptor/remote.py
index 50d6d28..8509556 100644
--- a/test/integ/descriptor/remote.py
+++ b/test/integ/descriptor/remote.py
@@ -29,11 +29,10 @@ class TestDescriptorReader(unittest.TestCase):
 
     for authority, (address, dirport) in stem.descriptor.remote.DIRECTORY_AUTHORITIES.items():
       queries.append(stem.descriptor.remote.Query(
-        address,
-        dirport,
         '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
         'server-descriptor 1.0',
-        30,
+        endpoints = [(address, dirport)],
+        timeout = 30,
       ))
 
     for query in queries:
diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py
index ab2276e..3aadaac 100644
--- a/test/unit/descriptor/remote.py
+++ b/test/unit/descriptor/remote.py
@@ -65,14 +65,13 @@ class TestDescriptorDownloader(unittest.TestCase):
     urlopen_mock.return_value = io.BytesIO(TEST_DESCRIPTOR)
 
     query = stem.descriptor.remote.Query(
-      '128.31.0.39',
-      9131,
       '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
       'server-descriptor 1.0',
+      endpoints = [('128.31.0.39', 9131)],
     )
 
     expeced_url = 'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31'
-    self.assertEqual(expeced_url, query.get_url())
+    self.assertEqual(expeced_url, query.pick_url())
 
     descriptors = list(query)
     self.assertEqual(1, len(descriptors))
@@ -95,10 +94,9 @@ class TestDescriptorDownloader(unittest.TestCase):
     urlopen_mock.return_value = io.BytesIO(descriptor_content)
 
     query = stem.descriptor.remote.Query(
-      '128.31.0.39',
-      9131,
       '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
       'server-descriptor 1.0',
+      endpoints = [('128.31.0.39', 9131)],
     )
 
     # checking via the iterator
@@ -119,15 +117,16 @@ class TestDescriptorDownloader(unittest.TestCase):
     urlopen_mock.side_effect = socket.timeout('connection timed out')
 
     query = stem.descriptor.remote.Query(
-      '128.31.0.39',
-      9131,
       '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
       'server-descriptor 1.0',
-      5,
+      endpoints = [('128.31.0.39', 9131)],
+      fall_back_to_authority = False,
+      timeout = 5,
     )
 
     self.assertRaises(socket.timeout, list, query.run())
-    urlopen_mock.assert_called_once_with(
+    urlopen_mock.assert_called_with(
       'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
       timeout = 5,
     )
+    self.assertEqual(3, urlopen_mock.call_count)





More information about the tor-commits mailing list