commit 72700087b94f2889b5b364738a1178c324862ba5 Author: Damian Johnson atagar@torproject.org Date: Wed Mar 28 11:57:10 2018 -0700
Expanded descriptor compression support
We supported plaintext and gzip when downloading descriptors, but recently tor added lzma and zstd support as well...
https://gitweb.torproject.org/torspec.git/commit/?id=1cb56af
In practice I'm having difficulty finding an example of this working in the live tor network...
https://trac.torproject.org/projects/tor/ticket/25667
That said, gonna go ahead and push this since even setting lzma/zstd aside it's a nice step forward since it allows callers to specify compression headers. --- docs/change_log.rst | 1 + stem/descriptor/remote.py | 159 ++++++++++++++++++++++++++++++---------- stem/version.py | 2 + test/integ/descriptor/remote.py | 20 +++++ test/unit/descriptor/remote.py | 50 ++++++++++--- 5 files changed, 182 insertions(+), 50 deletions(-)
diff --git a/docs/change_log.rst b/docs/change_log.rst index bb42c982..a67aae2d 100644 --- a/docs/change_log.rst +++ b/docs/change_log.rst @@ -55,6 +55,7 @@ The following are only available within Stem's `git repository * **Descriptors**
* `Fallback directory v2 support https://lists.torproject.org/pipermail/tor-dev/2017-December/012721.html`_, which adds *nickname* and *extrainfo* + * Added zstd and lzma compression support (:spec:`1cb56af`) * Reduced maximum descriptors fetched by the remote module to match tor's new limit (:trac:`24743`) * Consensus **shared_randomness_*_reveal_count** attributes undocumented, and unavailable if retrieved before their corresponding shared_randomness_*_value attribute (:trac:`25046`) * Allow 'proto' line to have blank values (:spec:`a8455f4`) diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py index 9181dbcf..ab61d3a7 100644 --- a/stem/descriptor/remote.py +++ b/stem/descriptor/remote.py @@ -80,6 +80,21 @@ content. For example...
Maximum number of microdescriptors that can requested at a time by their hashes. + +.. data:: Compression (enum) + + Compression when downloading descriptors. + + .. versionadded:: 1.7.0 + + =============== =========== + Compression Description + =============== =========== + **PLAINTEXT** Uncompressed data. + **GZIP** `GZip compression https://www.gnu.org/software/gzip/`_. + **ZSTD** `Zstandard compression https://www.zstd.net`_ + **LZMA** `LZMA compression https://en.wikipedia.org/wiki/LZMA`_. + =============== =========== """
import io @@ -91,6 +106,13 @@ import threading import time import zlib
+import stem.descriptor +import stem.prereq +import stem.util.enum + +from stem import Flag +from stem.util import _hash_attr, connection, log, str_tools, tor_tools + try: # added in python 2.7 from collections import OrderedDict @@ -103,11 +125,24 @@ try: except ImportError: import urllib2 as urllib
-import stem.descriptor -import stem.prereq +try: + # added in python 3.3 + import lzma + LZMA_SUPPORTED = True +except ImportError: + LZMA_SUPPORTED = False
-from stem import Flag -from stem.util import _hash_attr, connection, log, str_tools, tor_tools +ZSTD_SUPPORTED = False + +Compression = stem.util.enum.Enum( + ('PLAINTEXT', 'identity'), + ('GZIP', 'gzip'), # can also be 'deflate' + ('ZSTD', 'x-zstd'), + ('LZMA', 'x-tor-lzma'), +) + +ZSTD_UNAVAILABLE_MSG = 'ZSTD is not yet supported' +LZMA_UNAVAILABLE_MSG = 'LZMA compression was requested but requires the lzma module, which was added in python 3.3'
# Tor has a limited number of descriptors we can fetch explicitly by their # fingerprint or hashes due to a limit on the url length by squid proxies. @@ -224,7 +259,7 @@ class Query(object): from stem.descriptor.remote import Query
query = Query( - '/tor/server/all.z', + '/tor/server/all', block = True, timeout = 30, ) @@ -243,7 +278,7 @@ class Query(object):
print('Current relays:')
- for desc in Query('/tor/server/all.z', 'server-descriptor 1.0'): + for desc in Query('/tor/server/all', 'server-descriptor 1.0'): print(desc.fingerprint)
In either case exceptions are available via our 'error' attribute. @@ -256,28 +291,37 @@ class Query(object): =============================================== =========== Resource Description =============================================== =========== - /tor/server/all.z all present server descriptors - /tor/server/fp/<fp1>+<fp2>+<fp3>.z server descriptors with the given fingerprints - /tor/extra/all.z all present extrainfo descriptors - /tor/extra/fp/<fp1>+<fp2>+<fp3>.z extrainfo descriptors with the given fingerprints - /tor/micro/d/<hash1>-<hash2>.z microdescriptors with the given hashes - /tor/status-vote/current/consensus.z present consensus - /tor/status-vote/current/consensus-microdesc.z present microdescriptor consensus - /tor/keys/all.z key certificates for the authorities - /tor/keys/fp/<v3ident1>+<v3ident2>.z key certificates for specific authorities + /tor/server/all all present server descriptors + /tor/server/fp/<fp1>+<fp2>+<fp3> server descriptors with the given fingerprints + /tor/extra/all all present extrainfo descriptors + /tor/extra/fp/<fp1>+<fp2>+<fp3> extrainfo descriptors with the given fingerprints + /tor/micro/d/<hash1>-<hash2> microdescriptors with the given hashes + /tor/status-vote/current/consensus present consensus + /tor/status-vote/current/consensus-microdesc present microdescriptor consensus + /tor/keys/all key certificates for the authorities + /tor/keys/fp/<v3ident1>+<v3ident2> key certificates for specific authorities =============================================== ===========
- The '.z' suffix can be excluded to get a plaintext rather than compressed - response. Compression is handled transparently, so this shouldn't matter to - the caller. + **LZMA** compression requires the `lzma module + https://docs.python.org/3/library/lzma.html`_ which was added in Python + 3.3. + + For legacy reasons if our resource has a '.z' suffix then our **compression** + argument is overwritten with Compression.GZIP. + + .. versionchanged:: 1.7.0 + Added the compression argument.
- :var str resource: resource being fetched, such as '/tor/server/all.z' + :var str resource: resource being fetched, such as '/tor/server/all' :var str descriptor_type: type of descriptors being fetched (for options see :func:`~stem.descriptor.__init__.parse_file`), this is guessed from the resource if **None**
:var list endpoints: (address, dirport) tuples of the authority or mirror we're querying, this uses authorities if undefined + :var list compression: list of :data:`stem.descriptor.remote.Compression` + we're willing to accept, when none are mutually supported downloads fall + back to Compression.PLAINTEXT :var int retries: number of times to attempt the request if downloading it fails :var bool fall_back_to_authority: when retrying request issues the last @@ -305,17 +349,37 @@ class Query(object): the same as running **query.run(True)** (default is **False**) """
- def __init__(self, resource, descriptor_type = None, endpoints = None, retries = 2, fall_back_to_authority = False, timeout = None, start = True, block = False, validate = False, document_handler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs): + def __init__(self, resource, descriptor_type = None, endpoints = None, compression = None, retries = 2, fall_back_to_authority = False, timeout = None, start = True, block = False, validate = False, document_handler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs): if not resource.startswith('/'): raise ValueError("Resources should start with a '/': %s" % resource)
- self.resource = resource + if resource.endswith('.z'): + compression = [Compression.GZIP] + resource = resource[:-2] + elif compression is None: + compression = [Compression.PLAINTEXT] + else: + if isinstance(compression, str): + compression = [compression] # caller provided only a single option + + if Compression.LZMA in compression and not LZMA_SUPPORTED: + log.log_once('stem.descriptor.remote.lzma_unavailable', log.INFO, LZMA_UNAVAILABLE_MSG) + compression.remove(Compression.LZMA) + + if Compression.ZSTD in compression and not ZSTD_SUPPORTED: + log.log_once('stem.descriptor.remote.zstd_unavailable', log.INFO, ZSTD_UNAVAILABLE_MSG) + compression.remove(Compression.ZSTD) + + if not compression: + compression = [Compression.PLAINTEXT]
if descriptor_type: self.descriptor_type = descriptor_type else: self.descriptor_type = _guess_descriptor_type(resource)
+ self.resource = resource + self.compression = compression self.endpoints = endpoints if endpoints else [] self.retries = retries self.fall_back_to_authority = fall_back_to_authority @@ -352,7 +416,7 @@ class Query(object): self._downloader_thread = threading.Thread( name = 'Descriptor Query', target = self._download_descriptors, - args = (self.retries,) + args = (self.compression, self.retries,) )
self._downloader_thread.setDaemon(True) @@ -435,26 +499,41 @@ class Query(object): if use_authority or not self.endpoints: directories = get_authorities().values()
- picked = random.choice(directories) + picked = random.choice(list(directories)) address, dirport = picked.address, picked.dir_port else: address, dirport = random.choice(self.endpoints)
return 'http://%s:%i/%s' % (address, dirport, self.resource.lstrip('/'))
- def _download_descriptors(self, retries): + def _download_descriptors(self, compression, retries): try: use_authority = retries == 0 and self.fall_back_to_authority self.download_url = self._pick_url(use_authority) - self.start_time = time.time() - response = urllib.urlopen(self.download_url, timeout = self.timeout).read()
- if self.download_url.endswith('.z'): - response = zlib.decompress(response) + response = urllib.urlopen( + urllib.Request( + self.download_url, + headers = {'Accept-Encoding': ', '.join(compression)}, + ), + timeout = self.timeout, + ) + + data = response.read() + encoding = response.info().getheader('Content-Encoding')
- self.content = response.strip() + if encoding in (Compression.GZIP, 'deflate'): + # The '32' is for automatic header detection... + # https://stackoverflow.com/questions/3122145/zlib-error-error-3-while-decompr...
+ data = zlib.decompress(data, zlib.MAX_WBITS | 32) + elif encoding == Compression.ZSTD: + pass # TODO: implement + elif encoding == Compression.LZMA and LZMA_SUPPORTED: + data = lzma.decompress(data) + + self.content = data.strip() self.runtime = time.time() - self.start_time log.trace("Descriptors retrieved from '%s' in %0.2fs" % (self.download_url, self.runtime)) except: @@ -462,7 +541,7 @@ class Query(object):
if retries > 0: log.debug("Unable to download descriptors from '%s' (%i retries remaining): %s" % (self.download_url, retries, exc)) - return self._download_descriptors(retries - 1) + return self._download_descriptors(compression, retries - 1) else: log.debug("Unable to download descriptors from '%s': %s" % (self.download_url, exc)) self.error = exc @@ -539,7 +618,7 @@ class DescriptorDownloader(object): fingerprints (this is due to a limit on the url length by squid proxies). """
- resource = '/tor/server/all.z' + resource = '/tor/server/all'
if isinstance(fingerprints, str): fingerprints = [fingerprints] @@ -548,7 +627,7 @@ class DescriptorDownloader(object): if len(fingerprints) > MAX_FINGERPRINTS: raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)
- resource = '/tor/server/fp/%s.z' % '+'.join(fingerprints) + resource = '/tor/server/fp/%s' % '+'.join(fingerprints)
return self.query(resource, **query_args)
@@ -569,7 +648,7 @@ class DescriptorDownloader(object): fingerprints (this is due to a limit on the url length by squid proxies). """
- resource = '/tor/extra/all.z' + resource = '/tor/extra/all'
if isinstance(fingerprints, str): fingerprints = [fingerprints] @@ -578,7 +657,7 @@ class DescriptorDownloader(object): if len(fingerprints) > MAX_FINGERPRINTS: raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS)
- resource = '/tor/extra/fp/%s.z' % '+'.join(fingerprints) + resource = '/tor/extra/fp/%s' % '+'.join(fingerprints)
return self.query(resource, **query_args)
@@ -613,7 +692,7 @@ class DescriptorDownloader(object): if len(hashes) > MAX_MICRODESCRIPTOR_HASHES: raise ValueError('Unable to request more than %i microdescriptors at a time by their hashes' % MAX_MICRODESCRIPTOR_HASHES)
- return self.query('/tor/micro/d/%s.z' % '-'.join(hashes), **query_args) + return self.query('/tor/micro/d/%s' % '-'.join(hashes), **query_args)
def get_consensus(self, authority_v3ident = None, microdescriptor = False, **query_args): """ @@ -643,7 +722,7 @@ class DescriptorDownloader(object): if authority_v3ident: resource += '/%s' % authority_v3ident
- consensus_query = self.query(resource + '.z', **query_args) + consensus_query = self.query(resource, **query_args)
# if we're performing validation then check that it's signed by the # authority key certificates @@ -672,7 +751,7 @@ class DescriptorDownloader(object): if 'endpoint' not in query_args: query_args['endpoints'] = [(authority.address, authority.dir_port)]
- return self.query(resource + '.z', **query_args) + return self.query(resource, **query_args)
def get_key_certificates(self, authority_v3idents = None, **query_args): """ @@ -694,7 +773,7 @@ class DescriptorDownloader(object): squid proxies). """
- resource = '/tor/keys/all.z' + resource = '/tor/keys/all'
if isinstance(authority_v3idents, str): authority_v3idents = [authority_v3idents] @@ -703,7 +782,7 @@ class DescriptorDownloader(object): if len(authority_v3idents) > MAX_FINGERPRINTS: raise ValueError('Unable to request more than %i key certificates at a time by their identity fingerprints' % MAX_FINGERPRINTS)
- resource = '/tor/keys/fp/%s.z' % '+'.join(authority_v3idents) + resource = '/tor/keys/fp/%s' % '+'.join(authority_v3idents)
return self.query(resource, **query_args)
@@ -711,7 +790,7 @@ class DescriptorDownloader(object): """ Issues a request for the given resource.
- :param str resource: resource being fetched, such as '/tor/server/all.z' + :param str resource: resource being fetched, such as '/tor/server/all' :param query_args: additional arguments for the :class:`~stem.descriptor.remote.Query` constructor
diff --git a/stem/version.py b/stem/version.py index 9de2f1a5..9036effb 100644 --- a/stem/version.py +++ b/stem/version.py @@ -35,6 +35,7 @@ easily parsed and compared, for instance... Requirement Description ===================================== =========== **AUTH_SAFECOOKIE** SAFECOOKIE authentication method + **DESCRIPTOR_COMPRESSION** `Expanded compression support for ZSTD and LZMA https://gitweb.torproject.org/torspec.git/commit/?id=1cb56afdc1e55e303e3e6b69e90d983ee217d93f`_ **DROPGUARDS** DROPGUARDS requests **EVENT_AUTHDIR_NEWDESCS** AUTHDIR_NEWDESC events **EVENT_BUILDTIMEOUT_SET** BUILDTIMEOUT_SET events @@ -353,6 +354,7 @@ safecookie_req.greater_than(Version('0.2.3.13'))
Requirement = stem.util.enum.Enum( ('AUTH_SAFECOOKIE', safecookie_req), + ('DESCRIPTOR_COMPRESSION', Version('0.3.1.1-alpha')), ('DROPGUARDS', Version('0.2.5.1-alpha')), ('EVENT_AUTHDIR_NEWDESCS', Version('0.1.1.10-alpha')), ('EVENT_BUILDTIMEOUT_SET', Version('0.2.2.7-alpha')), diff --git a/test/integ/descriptor/remote.py b/test/integ/descriptor/remote.py index 3123ef06..4e10eac2 100644 --- a/test/integ/descriptor/remote.py +++ b/test/integ/descriptor/remote.py @@ -16,6 +16,26 @@ import test.require class TestDescriptorDownloader(unittest.TestCase): @test.require.only_run_once @test.require.online + def test_compression(self): + """ + Issue a request for a plaintext descriptor. + """ + + moria1 = stem.descriptor.remote.get_authorities()['moria1'] + + descriptors = list(stem.descriptor.remote.Query( + '/tor/server/fp/%s' % moria1.fingerprint, + 'server-descriptor 1.0', + endpoints = [(moria1.address, moria1.dir_port)], + timeout = 30, + validate = True, + ).run()) + + self.assertEqual(1, len(descriptors)) + self.assertEqual('moria1', descriptors[0].nickname) + + @test.require.only_run_once + @test.require.online def test_shorthand_aliases(self): """ Quick sanity test that we can call our shorthand aliases for getting diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py index ac150d5c..3661a54b 100644 --- a/test/unit/descriptor/remote.py +++ b/test/unit/descriptor/remote.py @@ -11,6 +11,8 @@ import stem.descriptor.remote import stem.prereq import stem.util.conf
+from stem.descriptor.remote import Compression + try: # added in python 2.7 from collections import OrderedDict @@ -29,6 +31,8 @@ except ImportError:
URL_OPEN = 'urllib.request.urlopen' if stem.prereq.is_python_3() else 'urllib2.urlopen'
+TEST_RESOURCE = '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31' + # Output from requesting moria1's descriptor from itself... # % curl http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE...
@@ -108,6 +112,33 @@ FALLBACK_ENTRY = b"""\
class TestDescriptorDownloader(unittest.TestCase): + def test_gzip_url_override(self): + query = stem.descriptor.remote.Query(TEST_RESOURCE, start = False) + self.assertEqual([Compression.PLAINTEXT], query.compression) + self.assertEqual(TEST_RESOURCE, query.resource) + + query = stem.descriptor.remote.Query(TEST_RESOURCE + '.z', compression = Compression.PLAINTEXT, start = False) + self.assertEqual([Compression.GZIP], query.compression) + self.assertEqual(TEST_RESOURCE, query.resource) + + def test_zstd_support_check(self): + with patch('stem.descriptor.remote.ZSTD_SUPPORTED', True): + query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.ZSTD, start = False) + self.assertEqual([Compression.ZSTD], query.compression) + + with patch('stem.descriptor.remote.ZSTD_SUPPORTED', False): + query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.ZSTD, start = False) + self.assertEqual([Compression.PLAINTEXT], query.compression) + + def test_lzma_support_check(self): + with patch('stem.descriptor.remote.LZMA_SUPPORTED', True): + query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.LZMA, start = False) + self.assertEqual([Compression.LZMA], query.compression) + + with patch('stem.descriptor.remote.LZMA_SUPPORTED', False): + query = stem.descriptor.remote.Query(TEST_RESOURCE, compression = Compression.LZMA, start = False) + self.assertEqual([Compression.PLAINTEXT], query.compression) + @patch(URL_OPEN) def test_query_download(self, urlopen_mock): """ @@ -117,13 +148,14 @@ class TestDescriptorDownloader(unittest.TestCase): urlopen_mock.return_value = io.BytesIO(TEST_DESCRIPTOR)
query = stem.descriptor.remote.Query( - '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31', + TEST_RESOURCE, 'server-descriptor 1.0', endpoints = [('128.31.0.39', 9131)], + compression = Compression.PLAINTEXT, validate = True, )
- expeced_url = 'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE...' + expeced_url = 'http://128.31.0.39:9131' + TEST_RESOURCE self.assertEqual(expeced_url, query._pick_url())
descriptors = list(query) @@ -135,7 +167,7 @@ class TestDescriptorDownloader(unittest.TestCase): self.assertEqual('9695DFC35FFEB861329B9F1AB04C46397020CE31', desc.fingerprint) self.assertEqual(TEST_DESCRIPTOR.strip(), desc.get_bytes())
- urlopen_mock.assert_called_once_with(expeced_url, timeout = None) + self.assertEqual(1, urlopen_mock.call_count)
@patch(URL_OPEN) def test_query_with_malformed_content(self, urlopen_mock): @@ -147,9 +179,10 @@ class TestDescriptorDownloader(unittest.TestCase): urlopen_mock.return_value = io.BytesIO(descriptor_content)
query = stem.descriptor.remote.Query( - '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31', + TEST_RESOURCE, 'server-descriptor 1.0', endpoints = [('128.31.0.39', 9131)], + compression = Compression.PLAINTEXT, validate = True, )
@@ -171,7 +204,7 @@ class TestDescriptorDownloader(unittest.TestCase): urlopen_mock.side_effect = socket.timeout('connection timed out')
query = stem.descriptor.remote.Query( - '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31', + TEST_RESOURCE, 'server-descriptor 1.0', endpoints = [('128.31.0.39', 9131)], fall_back_to_authority = False, @@ -180,10 +213,6 @@ class TestDescriptorDownloader(unittest.TestCase): )
self.assertRaises(socket.timeout, query.run) - urlopen_mock.assert_called_with( - 'http://128.31.0.39:9131/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE...', - timeout = 5, - ) self.assertEqual(3, urlopen_mock.call_count)
@patch(URL_OPEN) @@ -191,9 +220,10 @@ class TestDescriptorDownloader(unittest.TestCase): urlopen_mock.return_value = io.BytesIO(TEST_DESCRIPTOR)
query = stem.descriptor.remote.Query( - '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31', + TEST_RESOURCE, 'server-descriptor 1.0', endpoints = [('128.31.0.39', 9131)], + compression = Compression.PLAINTEXT, validate = True, )