commit 60ad230d88b383a259c531649fdba78d0acd6493 Author: Damian Johnson atagar@torproject.org Date: Sun Jul 21 09:03:39 2013 -0700
Expanding remote descriptor pydocs
Adding documentation and examples for the remote descriptor API. This includes some minor API tweaks and a missing import statement. --- stem/descriptor/__init__.py | 1 + stem/descriptor/remote.py | 243 ++++++++++++++++++++++++++++----------- test/integ/descriptor/remote.py | 5 +- test/settings.cfg | 2 +- test/unit/descriptor/remote.py | 2 +- 5 files changed, 181 insertions(+), 72 deletions(-)
diff --git a/stem/descriptor/__init__.py b/stem/descriptor/__init__.py index 82f846d..f1fdee4 100644 --- a/stem/descriptor/__init__.py +++ b/stem/descriptor/__init__.py @@ -154,6 +154,7 @@ def parse_file(descriptor_file, descriptor_type = None, validate = True, documen import stem.descriptor.server_descriptor import stem.descriptor.extrainfo_descriptor import stem.descriptor.networkstatus + import stem.descriptor.microdescriptor
# The tor descriptor specifications do not provide a reliable method for # identifying a descriptor file's type and version so we need to guess diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py index 83416c8..8ec6f1d 100644 --- a/stem/descriptor/remote.py +++ b/stem/descriptor/remote.py @@ -2,14 +2,16 @@ # See LICENSE for licensing information
""" -Utilities for retrieving descriptors from directory authorities and mirrors. -This is mostly done through the +Module for remotely retrieving descriptors from directory authorities and +mirrors. This is most easily done through the :class:`~stem.descriptor.remote.DescriptorDownloader` class, which issues -:class:`~stem.descriptor.remote.Query` to get descriptor content. For -example... +:class:`~stem.descriptor.remote.Query` instances to get you the descriptor +content. For example...
::
+ from stem.descriptor.remote import DescriptorDownloader + downloader = DescriptorDownloader( cache = '/tmp/descriptor_cache', use_mirrors = True, @@ -27,7 +29,7 @@ example... print print "Query took %0.2f seconds" % query.runtime except Exception as exc: - print "Unable to query the server descriptors: %s" % query.error + print "Unable to retrieve the server descriptors: %s" % exc
If you don't care about errors then you can also simply iterate over the query itself... @@ -37,6 +39,33 @@ itself... for desc in downloader.get_server_descriptors(): if desc.exit_policy.is_exiting_allowed(): print " %s (%s)" % (desc.nickname, desc.fingerprint) + +:: + + Query - Asynchronous request to download tor descriptors + |- start - issues the query if it isn't already running + +- run - blocks until the request is finished and provides the results + + DescriptorDownloader - Configurable class for issuing queries + |- use_directory_mirrors - use directory mirrors to download future descriptors + |- get_server_descriptors - provides present :class:`~stem.descriptor.stem.descriptor.server_descriptor.ServerDescriptor` + |- get_extrainfo_descriptors - provides present :class:`~stem.descriptor.extrainfo_descriptor.ExtraInfoDescriptor` + |- get_microdescriptors - provides present :class:`~stem.descriptor.microdescriptor.Microdescriptor` + +- get_consensus - provides present :class:`~stem.descriptor.router_status_entry.RouterStatusEntryV3` + +.. data:: MAX_DESCRIPTOR_BATCH_SIZE + + Maximum number of server or extrainfo descriptors that can requested at a + time by their fingerprints. + +.. data:: MAX_MICRODESCRIPTOR_BATCH_SIZE + + Maximum number of microdescriptors that can requested at a time by their + hashes. + +.. data:: DIRECTORY_AUTHORITIES + + Mapping of directory authority nicknames to their (address, dirport) tuple. """
import io @@ -51,8 +80,8 @@ import stem.descriptor from stem import Flag from stem.util import log
-# Tor has a limit on the number of descriptors we can fetch explicitly by their -# fingerprint or hashes due to the url lenght of squid proxies. +# Tor has a limited number of descriptors we can fetch explicitly by their +# fingerprint or hashes due to a limit on the url length by squid proxies.
MAX_DESCRIPTOR_BATCH_SIZE = 96 MAX_MICRODESCRIPTOR_BATCH_SIZE = 92 @@ -75,15 +104,87 @@ DIRECTORY_AUTHORITIES = { }
+def _guess_descriptor_type(resource): + # Attempts to determine the descriptor type based on the resource url. This + # raises a ValueError if the resource isn't recognized. + + if resource.startswith('/tor/server/'): + return 'server-descriptor 1.0' + elif resource.startswith('/tor/extra/'): + return 'extra-info 1.0' + elif resource.startswith('/tor/micro/'): + return 'microdescriptor 1.0' + elif resource.startswith('/tor/status-vote/'): + return 'network-status-consensus-3 1.0' + else: + raise ValueError("Unable to determine the descriptor type for '%s'" % resource) + + class Query(object): """ Asynchronous request for descriptor content from a directory authority or - mirror. The caller can block on the response by either calling - :func:~stem.descriptor.remote.run: or iterating over our descriptor content. + mirror. These can either be made through the + :class:`~stem.descriptor.remote.DescriptorDownloader` or directly for more + advanced usage. + + To block on the response and get results either call + :func:`~stem.descriptor.remote.Query.run` or iterate over the Query. The + :func:`~stem.descriptor.remote.run` method pass along any errors that + arise... + + :: + + from stem.descriptor.remote import Query + + query = Query( + '/tor/server/all.z', + 'server-descriptor 1.0', + timeout = 30, + ) + + print "Current relays:" + + try: + for desc in query.run(): + print desc.fingerprint + except Exception as exc: + print "Unable to retrieve the server descriptors: %s" % exc + + ... while iterating fails silently... + + ::
- :var str resource: resource being fetched, such as '/tor/status-vote/current/consensus.z' - :var str descriptor_type: type of descriptors being fetched, see - :func:`~stem.descriptor.__init__.parse_file` + print "Current relays:" + + for desc in Query('/tor/server/all.z', 'server-descriptor 1.0'): + print desc.fingerprint + + In either case exceptions are available via our 'error' attribute. + + Tor provides quite a few different descriptor resources via its directory + protocol (see section 4.2 and later of the `dir-spec + https://gitweb.torproject.org/torspec.git/blob/HEAD:/dir-spec.txt`_). + Commonly useful ones include... + + ======== =========== + Resource Description + ======== =========== + /tor/server/all.z all present server descriptors + /tor/server/fp/<fp1>+<fp2>+<fp3>.z server descriptors with the given fingerprints + /tor/extra/all.z all present extrainfo descriptors + /tor/extra/fp/<fp1>+<fp2>+<fp3>.z extrainfo descriptors with the given fingerprints + /tor/micro/d/<hash1>-<hash2>.z microdescriptors with the given hashes + /tor/status-vote/current/consensus.z present consensus + ======== =========== + + The '.z' suffix can be excluded to get a plaintext rather than compressed + response. Compression is handled transparently, so this shouldn't matter to + the caller. + + :var str resource: resource being fetched, such as '/tor/server/all.z' + :var str descriptor_type: type of descriptors being fetched (for options see + :func:`~stem.descriptor.__init__.parse_file`), this is guessed from the + resource if **None**
:var list endpoints: (address, dirport) tuples of the authority or mirror we're querying, this uses authorities if undefined @@ -108,9 +209,16 @@ class Query(object): which to parse the :class:`~stem.descriptor.networkstatus.NetworkStatusDocument` """
- def __init__(self, resource, descriptor_type, endpoints = None, retries = 2, fall_back_to_authority = True, timeout = None, start = True, validate = True, document_handler = stem.descriptor.DocumentHandler.ENTRIES): + def __init__(self, resource, descriptor_type = None, endpoints = None, retries = 2, fall_back_to_authority = True, timeout = None, start = True, validate = True, document_handler = stem.descriptor.DocumentHandler.ENTRIES): + if not resource.startswith('/'): + raise ValueError("Resources should start with a '/': %s" % resource) + self.resource = resource - self.descriptor_type = descriptor_type + + if descriptor_type: + self.descriptor_type = descriptor_type + else: + self.descriptor_type = _guess_descriptor_type(resource)
self.endpoints = endpoints if endpoints else [] self.retries = retries @@ -135,24 +243,6 @@ class Query(object): if start: self.start()
- def pick_url(self, use_authority = False): - """ - Provides a url that can be queried. If we have multiple endpoints then one - will be picked randomly. - - :param bool use_authority: ignores our endpoints and uses a directory - authority instead - - :returns: **str** for the url being queried by this request - """ - - if use_authority or not self.endpoints: - address, dirport = random.choice(DIRECTORY_AUTHORITIES.values()) - else: - address, dirport = random.choice(self.endpoints) - - return "http://%s:%i/%s" % (address, dirport, self.resource.lstrip('/')) - def start(self): """ Starts downloading the scriptors if we haven't started already. @@ -160,7 +250,12 @@ class Query(object):
with self._downloader_thread_lock: if self._downloader_thread is None: - self._downloader_thread = threading.Thread(target = self._download_descriptors, name="Descriptor Query", args = (self.retries,)) + self._downloader_thread = threading.Thread( + name = "Descriptor Query", + target = self._download_descriptors, + args = (self.retries,) + ) + self._downloader_thread.setDaemon(True) self._downloader_thread.start()
@@ -190,38 +285,57 @@ class Query(object): self._downloader_thread.join()
if self.error: - if not suppress: - raise self.error + if suppress: + return + + raise self.error else: if self._results is None: - if not suppress: - raise ValueError('BUG: _download_descriptors() finished without either results or an error') + if suppress: + return
- return + raise ValueError('BUG: _download_descriptors() finished without either results or an error')
try: for desc in self._results: yield desc except ValueError as exc: - # encountered a parsing error + self.error = exc # encountered a parsing error
- self.error = exc + if suppress: + return
- if not suppress: - raise self.error + raise self.error
def __iter__(self): for desc in self.run(True): yield desc
+ def _pick_url(self, use_authority = False): + """ + Provides a url that can be queried. If we have multiple endpoints then one + will be picked randomly. + + :param bool use_authority: ignores our endpoints and uses a directory + authority instead + + :returns: **str** for the url being queried by this request + """ + + if use_authority or not self.endpoints: + address, dirport = random.choice(DIRECTORY_AUTHORITIES.values()) + else: + address, dirport = random.choice(self.endpoints) + + return "http://%s:%i/%s" % (address, dirport, self.resource.lstrip('/')) + def _download_descriptors(self, retries): try: use_authority = retries == 0 and self.fall_back_to_authority - self.download_url = self.pick_url(use_authority) + self.download_url = self._pick_url(use_authority)
self.start_time = time.time() response = urllib2.urlopen(self.download_url, timeout = self.timeout) - self.runtime = time.time() - self.start_time
# This sucks. We need to read the full response into memory before # processing the content. This is because urllib2 returns a 'file like' @@ -229,8 +343,9 @@ class Query(object): # own buffer that does support these.
response = io.BytesIO(response.read().strip()) - self._results = stem.descriptor.parse_file(response, self.descriptor_type, validate = self.validate, document_handler = self.document_handler) + + self.runtime = time.time() - self.start_time log.trace("Descriptors retrieved from '%s' in %0.2fs" % (self.download_url, self.runtime)) except: exc = sys.exc_info()[1] @@ -254,26 +369,27 @@ class DescriptorDownloader(object): For more advanced use cases you can use the :class:`~stem.descriptor.remote.Query` class directly.
+ :param bool use_mirrors: downloads the present consensus and uses the directory + mirrors to fetch future requests, this fails silently if the consensus + cannot be downloaded + :var int retries: number of times to attempt the request if it fails + :var bool fall_back_to_authority: when retrying request issues the last + request to a directory authority if **True** :var float timeout: duration before we'll time out our request, no timeout is applied if **None** - :var bool use_mirrors: downloads the present consensus and uses the directory - mirrors to fetch future requests, this fails silently if the consensus - cannot be downloaded :var bool start_when_requested: issues requests when our methods are called if **True**, otherwise provides non-running :class:`~stem.descriptor.remote.Query` instances :var bool validate: checks the validity of the descriptor's content if **True**, skips these checks otherwise - :var bool fall_back_to_authority: when retrying request issues the last - request to a directory authority if **True** """
- def __init__(self, retries = 2, use_mirrors = False, fall_back_to_authority = True, timeout = None, start_when_requested = True, validate = True): + def __init__(self, use_mirrors = False, retries = 2, fall_back_to_authority = True, timeout = None, start_when_requested = True, validate = True): self.retries = retries + self.fall_back_to_authority = fall_back_to_authority self.timeout = timeout self.start_when_requested = start_when_requested - self.fall_back_to_authority = fall_back_to_authority self.validate = validate self._endpoints = DIRECTORY_AUTHORITIES.values()
@@ -282,7 +398,7 @@ class DescriptorDownloader(object): start_time = time.time() self.use_directory_mirrors() log.debug("Retrieve directory mirrors (took %0.2fs)" % (time.time() - start_time)) - except Exception, exc: + except Exception as exc: log.debug("Unable to retrieve directory mirrors: %s" % exc)
def use_directory_mirrors(self): @@ -295,10 +411,7 @@ class DescriptorDownloader(object):
new_endpoints = set(DIRECTORY_AUTHORITIES.values())
- query = self.get_consensus() - query.run() # running explicitly so we'll raise errors - - for desc in query: + for desc in self.get_consensus().run(): if Flag.V2DIR in desc.flags: new_endpoints.add((desc.address, desc.dir_port))
@@ -332,7 +445,7 @@ class DescriptorDownloader(object):
resource = '/tor/server/fp/%s' % '+'.join(fingerprints)
- return self._query(resource, 'server-descriptor 1.0') + return self._query(resource)
def get_extrainfo_descriptors(self, fingerprints = None): """ @@ -360,7 +473,7 @@ class DescriptorDownloader(object):
resource = '/tor/extra/fp/%s' % '+'.join(fingerprints)
- return self._query(resource, 'extra-info 1.0') + return self._query(resource)
def get_microdescriptors(self, hashes): """ @@ -385,9 +498,7 @@ class DescriptorDownloader(object): if len(hashes) > MAX_MICRODESCRIPTOR_BATCH_SIZE: raise ValueError("Unable to request more than %i microdescriptors at a time by their hashes" % MAX_MICRODESCRIPTOR_BATCH_SIZE)
- resource = '/tor/micro/d/%s' % '-'.join(hashes) - - return self._query(resource, 'microdescriptor 1.0') + return self._query('/tor/micro/d/%s' % '-'.join(hashes))
def get_consensus(self, document_handler = stem.descriptor.DocumentHandler.ENTRIES, authority_v3ident = None): """ @@ -409,13 +520,9 @@ class DescriptorDownloader(object): if authority_v3ident: resource += '/%s' % authority_v3ident
- return self._query( - resource, - 'network-status-consensus-3 1.0', - document_handler = document_handler, - ) + return self._query(resource, document_handler = document_handler)
- def _query(self, resource, descriptor_type, document_handler = stem.descriptor.DocumentHandler.ENTRIES): + def _query(self, resource, descriptor_type = None, document_handler = stem.descriptor.DocumentHandler.ENTRIES): """ Issues a request for the given resource. """ diff --git a/test/integ/descriptor/remote.py b/test/integ/descriptor/remote.py index e9d4e8a..7c45118 100644 --- a/test/integ/descriptor/remote.py +++ b/test/integ/descriptor/remote.py @@ -15,7 +15,8 @@ import test.runner
import stem.descriptor.networkstatus
-class TestDescriptorReader(unittest.TestCase): + +class TestDescriptorDownloader(unittest.TestCase): def test_using_authorities(self): """ Fetches a descriptor from each of the directory authorities. This is @@ -44,7 +45,7 @@ class TestDescriptorReader(unittest.TestCase): for query in queries: try: descriptors = list(query.run()) - except Exception, exc: + except Exception as exc: self.fail("Unable to use %s (%s:%s, %s): %s" % (authority, address, dirport, type(exc), exc))
self.assertEqual(1, len(descriptors)) diff --git a/test/settings.cfg b/test/settings.cfg index d0fd5b0..80aaf9a 100644 --- a/test/settings.cfg +++ b/test/settings.cfg @@ -187,7 +187,7 @@ test.integ_tests |test.integ.util.proc.TestProc |test.integ.util.system.TestSystem |test.integ.descriptor.reader.TestDescriptorReader -|test.integ.descriptor.remote.TestDescriptorReader +|test.integ.descriptor.remote.TestDescriptorDownloader |test.integ.descriptor.server_descriptor.TestServerDescriptor |test.integ.descriptor.extrainfo_descriptor.TestExtraInfoDescriptor |test.integ.descriptor.microdescriptor.TestMicrodescriptor diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py index 3aadaac..fb2e3f0 100644 --- a/test/unit/descriptor/remote.py +++ b/test/unit/descriptor/remote.py @@ -71,7 +71,7 @@ class TestDescriptorDownloader(unittest.TestCase): )
expeced_url = 'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE...' - self.assertEqual(expeced_url, query.pick_url()) + self.assertEqual(expeced_url, query._pick_url())
descriptors = list(query) self.assertEqual(1, len(descriptors))