[tor-commits] [stem/master] Expanded descriptor compression support

atagar at torproject.org atagar at torproject.org
Thu Mar 29 19:14:50 UTC 2018


commit 72700087b94f2889b5b364738a1178c324862ba5
Author: Damian Johnson <atagar at torproject.org>
Date:   Wed Mar 28 11:57:10 2018 -0700

    Expanded descriptor compression support
    
    We supported plaintext and gzip when downloading descriptors, but recently tor
    added lzma and zstd support as well...
    
      https://gitweb.torproject.org/torspec.git/commit/?id=1cb56af
    
    In practice I'm having difficulty finding an example of this working in the
    live tor network...
    
      https://trac.torproject.org/projects/tor/ticket/25667
    
    That said, gonna go ahead and push this since even setting lzma/zstd aside it's
    a nice step forward since it allows callers to specify compression headers.
---
 docs/change_log.rst             |   1 +
 stem/descriptor/remote.py       | 159 ++++++++++++++++++++++++++++++----------
 stem/version.py                 |   2 +
 test/integ/descriptor/remote.py |  20 +++++
 test/unit/descriptor/remote.py  |  50 ++++++++++---
 5 files changed, 182 insertions(+), 50 deletions(-)

diff --git a/docs/change_log.rst b/docs/change_log.rst
index bb42c982..a67aae2d 100644
--- a/docs/change_log.rst
+++ b/docs/change_log.rst
@@ -55,6 +55,7 @@ The following are only available within Stem's `git repository
  * **Descriptors**
 
   * `Fallback directory v2 support <https://lists.torproject.org/pipermail/tor-dev/2017-December/012721.html>`_, which adds *nickname* and *extrainfo*
+  * Added zstd and lzma compression support (:spec:`1cb56af`)
   * Reduced maximum descriptors fetched by the remote module to match tor's new limit (:trac:`24743`)
   * Consensus **shared_randomness_*_reveal_count** attributes undocumented, and unavailable if retrieved before their corresponding shared_randomness_*_value attribute (:trac:`25046`)
   * Allow 'proto' line to have blank values (:spec:`a8455f4`)
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index 9181dbcf..ab61d3a7 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -80,6 +80,21 @@ content. For example...
 
   Maximum number of microdescriptors that can requested at a time by their
   hashes.
+
+.. data:: Compression (enum)
+
+  Compression when downloading descriptors.
+
+  .. versionadded:: 1.7.0
+
+  =============== ===========
+  Compression     Description
+  =============== ===========
+  **PLAINTEXT**   Uncompressed data.
+  **GZIP**        `GZip compression <https://www.gnu.org/software/gzip/>`_.
+  **ZSTD**        `Zstandard compression <https://www.zstd.net>`_
+  **LZMA**        `LZMA compression <https://en.wikipedia.org/wiki/LZMA>`_.
+  =============== ===========
 """
 
 import io
@@ -91,6 +106,13 @@ import threading
 import time
 import zlib
 
+import stem.descriptor
+import stem.prereq
+import stem.util.enum
+
+from stem import Flag
+from stem.util import _hash_attr, connection, log, str_tools, tor_tools
+
 try:
   # added in python 2.7
   from collections import OrderedDict
@@ -103,11 +125,24 @@ try:
 except ImportError:
   import urllib2 as urllib
 
-import stem.descriptor
-import stem.prereq
+try:
+  # added in python 3.3
+  import lzma
+  LZMA_SUPPORTED = True
+except ImportError:
+  LZMA_SUPPORTED = False
 
-from stem import Flag
-from stem.util import _hash_attr, connection, log, str_tools, tor_tools
+ZSTD_SUPPORTED = False
+
+Compression = stem.util.enum.Enum(
+  ('PLAINTEXT', 'identity'),
+  ('GZIP', 'gzip'),  # can also be 'deflate'
+  ('ZSTD', 'x-zstd'),
+  ('LZMA', 'x-tor-lzma'),
+)
+
+ZSTD_UNAVAILABLE_MSG = 'ZSTD is not yet supported'
+LZMA_UNAVAILABLE_MSG = 'LZMA compression was requested but requires the lzma module, which was added in python 3.3'
 
 # Tor has a limited number of descriptors we can fetch explicitly by their
 # fingerprint or hashes due to a limit on the url length by squid proxies.
@@ -224,7 +259,7 @@ class Query(object):
     from stem.descriptor.remote import Query
 
     query = Query(
-      '/tor/server/all.z',
+      '/tor/server/all',
       block = True,
       timeout = 30,
     )
@@ -243,7 +278,7 @@ class Query(object):
 
     print('Current relays:')
 
-    for desc in Query('/tor/server/all.z', 'server-descriptor 1.0'):
+    for desc in Query('/tor/server/all', 'server-descriptor 1.0'):
       print(desc.fingerprint)
 
   In either case exceptions are available via our 'error' attribute.
@@ -256,28 +291,37 @@ class Query(object):
   =============================================== ===========
   Resource                                        Description
   =============================================== ===========
-  /tor/server/all.z                               all present server descriptors
-  /tor/server/fp/<fp1>+<fp2>+<fp3>.z              server descriptors with the given fingerprints
-  /tor/extra/all.z                                all present extrainfo descriptors
-  /tor/extra/fp/<fp1>+<fp2>+<fp3>.z               extrainfo descriptors with the given fingerprints
-  /tor/micro/d/<hash1>-<hash2>.z                  microdescriptors with the given hashes
-  /tor/status-vote/current/consensus.z            present consensus
-  /tor/status-vote/current/consensus-microdesc.z  present microdescriptor consensus
-  /tor/keys/all.z                                 key certificates for the authorities
-  /tor/keys/fp/<v3ident1>+<v3ident2>.z            key certificates for specific authorities
+  /tor/server/all                                 all present server descriptors
+  /tor/server/fp/<fp1>+<fp2>+<fp3>                server descriptors with the given fingerprints
+  /tor/extra/all                                  all present extrainfo descriptors
+  /tor/extra/fp/<fp1>+<fp2>+<fp3>                 extrainfo descriptors with the given fingerprints
+  /tor/micro/d/<hash1>-<hash2>                    microdescriptors with the given hashes
+  /tor/status-vote/current/consensus              present consensus
+  /tor/status-vote/current/consensus-microdesc    present microdescriptor consensus
+  /tor/keys/all                                   key certificates for the authorities
+  /tor/keys/fp/<v3ident1>+<v3ident2>              key certificates for specific authorities
   =============================================== ===========
 
-  The '.z' suffix can be excluded to get a plaintext rather than compressed
-  response. Compression is handled transparently, so this shouldn't matter to
-  the caller.
+  **LZMA** compression requires the `lzma module
+  <https://docs.python.org/3/library/lzma.html>`_ which was added in Python
+  3.3.
+
+  For legacy reasons if our resource has a '.z' suffix then our **compression**
+  argument is overwritten with Compression.GZIP.
+
+  .. versionchanged:: 1.7.0
+     Added the compression argument.
 
-  :var str resource: resource being fetched, such as '/tor/server/all.z'
+  :var str resource: resource being fetched, such as '/tor/server/all'
   :var str descriptor_type: type of descriptors being fetched (for options see
     :func:`~stem.descriptor.__init__.parse_file`), this is guessed from the
     resource if **None**
 
   :var list endpoints: (address, dirport) tuples of the authority or mirror
     we're querying, this uses authorities if undefined
+  :var list compression: list of :data:`stem.descriptor.remote.Compression`
+    we're willing to accept, when none are mutually supported downloads fall
+    back to Compression.PLAINTEXT
   :var int retries: number of times to attempt the request if downloading it
     fails
   :var bool fall_back_to_authority: when retrying request issues the last
@@ -305,17 +349,37 @@ class Query(object):
     the same as running **query.run(True)** (default is **False**)
   """
 
-  def __init__(self, resource, descriptor_type = None, endpoints = None, retries = 2, fall_back_to_authority = False, timeout = None, start = True, block = False, validate = False, document_handler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs):
+  def __init__(self, resource, descriptor_type = None, endpoints = None, compression = None, retries = 2, fall_back_to_authority = False, timeout = None, start = True, block = False, validate = False, document_handler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs):
     if not resource.startswith('/'):
       raise ValueError("Resources should start with a '/': %s" % resource)
 
-    self.resource = resource
+    if resource.endswith('.z'):
+      compression = [Compression.GZIP]
+      resource = resource[:-2]
+    elif compression is None:
+      compression = [Compression.PLAINTEXT]
+    else:
+      if isinstance(compression, str):
+        compression = [compression]  # caller provided only a single option
+
+      if Compression.LZMA in compression and not LZMA_SUPPORTED:
+        log.log_once('stem.descriptor.remote.lzma_unavailable', log.INFO, LZMA_UNAVAILABLE_MSG)
+        compression.remove(Compression.LZMA)
+
+      if Compression.ZSTD in compression and not ZSTD_SUPPORTED:
+        log.log_once('stem.descriptor.remote.zstd_unavailable', log.INFO, ZSTD_UNAVAILABLE_MSG)
+        compression.remove(Compression.ZSTD)
+
+      if not compression:
+        compression = [Compression.PLAINTEXT]
 
     if descriptor_type:
       self.descriptor_type = descriptor_type
     else:
       self.descriptor_type = _guess_descriptor_type(resource)
 
+    self.resource = resource
+    self.compression = compression
     self.endpoints = endpoints if endpoints else []
     self.retries = retries
     self.fall_back_to_authority = fall_back_to_authority
@@ -352,7 +416,7 @@ class Query(object):
         self._downloader_thread = threading.Thread(
           name = 'Descriptor Query',
           target = self._download_descriptors,
-          args = (self.retries,)
+          args = (self.compression, self.retries,)
         )
 
         self._downloader_thread.setDaemon(True)
@@ -435,26 +499,41 @@ class Query(object):
     if use_authority or not self.endpoints:
       directories = get_authorities().values()
 
-      picked = random.choice(directories)
+      picked = random.choice(list(directories))
       address, dirport = picked.address, picked.dir_port
     else:
       address, dirport = random.choice(self.endpoints)
 
     return 'http://%s:%i/%s' % (address, dirport, self.resource.lstrip('/'))
 
-  def _download_descriptors(self, retries):
+  def _download_descriptors(self, compression, retries):
     try:
       use_authority = retries == 0 and self.fall_back_to_authority
       self.download_url = self._pick_url(use_authority)
-
       self.start_time = time.time()
-      response = urllib.urlopen(self.download_url, timeout = self.timeout).read()
 
-      if self.download_url.endswith('.z'):
-        response = zlib.decompress(response)
+      response = urllib.urlopen(
+        urllib.Request(
+          self.download_url,
+          headers = {'Accept-Encoding': ', '.join(compression)},
+        ),
+        timeout = self.timeout,
+      )
+
+      data = response.read()
+      encoding = response.info().getheader('Content-Encoding')
 
-      self.content = response.strip()
+      if encoding in (Compression.GZIP, 'deflate'):
+        # The '32' is for automatic header detection...
+        # https://stackoverflow.com/questions/3122145/zlib-error-error-3-while-decompressing-incorrect-header-check/22310760#22310760
 
+        data = zlib.decompress(data, zlib.MAX_WBITS | 32)
+      elif encoding == Compression.ZSTD:
+        pass  # TODO: implement
+      elif encoding == Compression.LZMA and LZMA_SUPPORTED:
+        data = lzma.decompress(data)
+
+      self.content = data.strip()
       self.runtime = time.time() - self.start_time
       log.trace("Descriptors retrieved from '%s' in %0.2fs" % (self.download_url, self.runtime))
     except:
@@ -462,7 +541,7 @@ class Query(object):
 
       if retries > 0:
         log.debug("Unable to download descriptors from '%s' (%i retries remaining): %s" % (self.download_url, retries, exc))
-        return self._download_descriptors(retries - 1)
+        return self._download_descriptors(compression, retries - 1)
       else:
         log.debug("Unable to download descriptors from '%s': %s" % (self.download_url, exc))
         self.error = exc
@@ -539,7 +618,7 @@ class DescriptorDownloader(object):
       fingerprints (this is due to a limit on the url length by squid proxies).
     """
 
-    resource = '/tor/server/all.z'
+    resource = '/tor/server/all'
 
     if isinstance(fingerprints, str):
       fingerprints = [fingerprints]
@@ -548,7 +627,7 @@ class DescriptorDownloader(object):
       if len(fingerprints) > MAX_FINGERPRINTS:
         raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)
 
-      resource = '/tor/server/fp/%s.z' % '+'.join(fingerprints)
+      resource = '/tor/server/fp/%s' % '+'.join(fingerprints)
 
     return self.query(resource, **query_args)
 
@@ -569,7 +648,7 @@ class DescriptorDownloader(object):
       fingerprints (this is due to a limit on the url length by squid proxies).
     """
 
-    resource = '/tor/extra/all.z'
+    resource = '/tor/extra/all'
 
     if isinstance(fingerprints, str):
       fingerprints = [fingerprints]
@@ -578,7 +657,7 @@ class DescriptorDownloader(object):
       if len(fingerprints) > MAX_FINGERPRINTS:
         raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)
 
-      resource = '/tor/extra/fp/%s.z' % '+'.join(fingerprints)
+      resource = '/tor/extra/fp/%s' % '+'.join(fingerprints)
 
     return self.query(resource, **query_args)
 
@@ -613,7 +692,7 @@ class DescriptorDownloader(object):
     if len(hashes) > MAX_MICRODESCRIPTOR_HASHES:
       raise ValueError('Unable to request more than %i microdescriptors at a time by their hashes' % MAX_MICRODESCRIPTOR_HASHES)
 
-    return self.query('/tor/micro/d/%s.z' % '-'.join(hashes), **query_args)
+    return self.query('/tor/micro/d/%s' % '-'.join(hashes), **query_args)
 
   def get_consensus(self, authority_v3ident = None, microdescriptor = False, **query_args):
     """
@@ -643,7 +722,7 @@ class DescriptorDownloader(object):
     if authority_v3ident:
       resource += '/%s' % authority_v3ident
 
-    consensus_query = self.query(resource + '.z', **query_args)
+    consensus_query = self.query(resource, **query_args)
 
     # if we're performing validation then check that it's signed by the
     # authority key certificates
@@ -672,7 +751,7 @@ class DescriptorDownloader(object):
     if 'endpoint' not in query_args:
       query_args['endpoints'] = [(authority.address, authority.dir_port)]
 
-    return self.query(resource + '.z', **query_args)
+    return self.query(resource, **query_args)
 
   def get_key_certificates(self, authority_v3idents = None, **query_args):
     """
@@ -694,7 +773,7 @@ class DescriptorDownloader(object):
       squid proxies).
     """
 
-    resource = '/tor/keys/all.z'
+    resource = '/tor/keys/all'
 
     if isinstance(authority_v3idents, str):
       authority_v3idents = [authority_v3idents]
@@ -703,7 +782,7 @@ class DescriptorDownloader(object):
       if len(authority_v3idents) > MAX_FINGERPRINTS:
         raise ValueError('Unable to request more than %i key certificates at a time by their identity fingerprints' % MAX_FINGERPRINTS)
 
-      resource = '/tor/keys/fp/%s.z' % '+'.join(authority_v3idents)
+      resource = '/tor/keys/fp/%s' % '+'.join(authority_v3idents)
 
     return self.query(resource, **query_args)
 
@@ -711,7 +790,7 @@ class DescriptorDownloader(object):
     """
     Issues a request for the given resource.
 
-    :param str resource: resource being fetched, such as '/tor/server/all.z'
+    :param str resource: resource being fetched, such as '/tor/server/all'
     :param query_args: additional arguments for the
       :class:`~stem.descriptor.remote.Query` constructor
 
diff --git a/stem/version.py b/stem/version.py
index 9de2f1a5..9036effb 100644
--- a/stem/version.py
+++ b/stem/version.py
@@ -35,6 +35,7 @@ easily parsed and compared, for instance...
   Requirement                           Description
   ===================================== ===========
   **AUTH_SAFECOOKIE**                   SAFECOOKIE authentication method
+  **DESCRIPTOR_COMPRESSION**            `Expanded compression support for ZSTD and LZMA <https://gitweb.torproject.org/torspec.git/commit/?id=1cb56afdc1e55e303e3e6b69e90d983ee217d93f>`_
   **DROPGUARDS**                        DROPGUARDS requests
   **EVENT_AUTHDIR_NEWDESCS**            AUTHDIR_NEWDESC events
   **EVENT_BUILDTIMEOUT_SET**            BUILDTIMEOUT_SET events
@@ -353,6 +354,7 @@ safecookie_req.greater_than(Version('0.2.3.13'))
 
 Requirement = stem.util.enum.Enum(
   ('AUTH_SAFECOOKIE', safecookie_req),
+  ('DESCRIPTOR_COMPRESSION', Version('0.3.1.1-alpha')),
   ('DROPGUARDS', Version('0.2.5.1-alpha')),
   ('EVENT_AUTHDIR_NEWDESCS', Version('0.1.1.10-alpha')),
   ('EVENT_BUILDTIMEOUT_SET', Version('0.2.2.7-alpha')),
diff --git a/test/integ/descriptor/remote.py b/test/integ/descriptor/remote.py
index 3123ef06..4e10eac2 100644
--- a/test/integ/descriptor/remote.py
+++ b/test/integ/descriptor/remote.py
@@ -16,6 +16,26 @@ import test.require
 class TestDescriptorDownloader(unittest.TestCase):
   @test.require.only_run_once
   @test.require.online
+  def test_compression(self):
+    """
+    Issue a request for a plaintext descriptor.
+    """
+
+    moria1 = stem.descriptor.remote.get_authorities()['moria1']
+
+    descriptors = list(stem.descriptor.remote.Query(
+      '/tor/server/fp/%s' % moria1.fingerprint,
+      'server-descriptor 1.0',
+      endpoints = [(moria1.address, moria1.dir_port)],
+      timeout = 30,
+      validate = True,
+    ).run())
+
+    self.assertEqual(1, len(descriptors))
+    self.assertEqual('moria1', descriptors[0].nickname)
+
+  @test.require.only_run_once
+  @test.require.online
   def test_shorthand_aliases(self):
     """
     Quick sanity test that we can call our shorthand aliases for getting
diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py
index ac150d5c..3661a54b 100644
--- a/test/unit/descriptor/remote.py
+++ b/test/unit/descriptor/remote.py
@@ -11,6 +11,8 @@ import stem.descriptor.remote
 import stem.prereq
 import stem.util.conf
 
+from stem.descriptor.remote import Compression
+
 try:
   # added in python 2.7
   from collections import OrderedDict
@@ -29,6 +31,8 @@ except ImportError:
 
 URL_OPEN = 'urllib.request.urlopen' if stem.prereq.is_python_3() else 'urllib2.urlopen'
 
+TEST_RESOURCE = '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31'
+
 # Output from requesting moria1's descriptor from itself...
 # % curl http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31
 
@@ -108,6 +112,33 @@ FALLBACK_ENTRY = b"""\
 
 
 class TestDescriptorDownloader(unittest.TestCase):
+  def test_gzip_url_override(self):
+    query = stem.descriptor.remote.Query(TEST_RESOURCE, start = False)
+    self.assertEqual([Compression.PLAINTEXT], query.compression)
+    self.assertEqual(TEST_RESOURCE, query.resource)
+
+    query = stem.descriptor.remote.Query(TEST_RESOURCE + '.z', compression = Compression.PLAINTEXT, start = False)
+    self.assertEqual([Compression.GZIP], query.compression)
+    self.assertEqual(TEST_RESOURCE, query.resource)
+
+  def test_zstd_support_check(self):
+    with patch('stem.descriptor.remote.ZSTD_SUPPORTED', True):
+      query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.ZSTD, start = False)
+      self.assertEqual([Compression.ZSTD], query.compression)
+
+    with patch('stem.descriptor.remote.ZSTD_SUPPORTED', False):
+      query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.ZSTD, start = False)
+      self.assertEqual([Compression.PLAINTEXT], query.compression)
+
+  def test_lzma_support_check(self):
+    with patch('stem.descriptor.remote.LZMA_SUPPORTED', True):
+      query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.LZMA, start = False)
+      self.assertEqual([Compression.LZMA], query.compression)
+
+    with patch('stem.descriptor.remote.LZMA_SUPPORTED', False):
+      query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.LZMA, start = False)
+      self.assertEqual([Compression.PLAINTEXT], query.compression)
+
   @patch(URL_OPEN)
   def test_query_download(self, urlopen_mock):
     """
@@ -117,13 +148,14 @@ class TestDescriptorDownloader(unittest.TestCase):
     urlopen_mock.return_value = io.BytesIO(TEST_DESCRIPTOR)
 
     query = stem.descriptor.remote.Query(
-      '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
+      TEST_RESOURCE,
       'server-descriptor 1.0',
       endpoints = [('128.31.0.39', 9131)],
+      compression = Compression.PLAINTEXT,
       validate = True,
     )
 
-    expeced_url = 'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31'
+    expeced_url = 'http://128.31.0.39:9131' + TEST_RESOURCE
     self.assertEqual(expeced_url, query._pick_url())
 
     descriptors = list(query)
@@ -135,7 +167,7 @@ class TestDescriptorDownloader(unittest.TestCase):
     self.assertEqual('9695DFC35FFEB861329B9F1AB04C46397020CE31', desc.fingerprint)
     self.assertEqual(TEST_DESCRIPTOR.strip(), desc.get_bytes())
 
-    urlopen_mock.assert_called_once_with(expeced_url, timeout = None)
+    self.assertEqual(1, urlopen_mock.call_count)
 
   @patch(URL_OPEN)
   def test_query_with_malformed_content(self, urlopen_mock):
@@ -147,9 +179,10 @@ class TestDescriptorDownloader(unittest.TestCase):
     urlopen_mock.return_value = io.BytesIO(descriptor_content)
 
     query = stem.descriptor.remote.Query(
-      '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
+      TEST_RESOURCE,
       'server-descriptor 1.0',
       endpoints = [('128.31.0.39', 9131)],
+      compression = Compression.PLAINTEXT,
       validate = True,
     )
 
@@ -171,7 +204,7 @@ class TestDescriptorDownloader(unittest.TestCase):
     urlopen_mock.side_effect = socket.timeout('connection timed out')
 
     query = stem.descriptor.remote.Query(
-      '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
+      TEST_RESOURCE,
       'server-descriptor 1.0',
       endpoints = [('128.31.0.39', 9131)],
       fall_back_to_authority = False,
@@ -180,10 +213,6 @@ class TestDescriptorDownloader(unittest.TestCase):
     )
 
     self.assertRaises(socket.timeout, query.run)
-    urlopen_mock.assert_called_with(
-      'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
-      timeout = 5,
-    )
     self.assertEqual(3, urlopen_mock.call_count)
 
   @patch(URL_OPEN)
@@ -191,9 +220,10 @@ class TestDescriptorDownloader(unittest.TestCase):
     urlopen_mock.return_value = io.BytesIO(TEST_DESCRIPTOR)
 
     query = stem.descriptor.remote.Query(
-      '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31',
+      TEST_RESOURCE,
       'server-descriptor 1.0',
       endpoints = [('128.31.0.39', 9131)],
+      compression = Compression.PLAINTEXT,
       validate = True,
     )
 



More information about the tor-commits mailing list