commit 3498ebedccdcd59cfc9072e2f24f5c9eb0c3daae Author: Damian Johnson atagar@torproject.org Date: Sat Oct 5 15:06:27 2019 -0700
Public decryption method
After considering a few APIs descided to keep this simple, and simply return the InnerLayer which references the OuterLayer. --- stem/descriptor/hidden_service.py | 58 ++++++++++++++++++++----------- test/unit/descriptor/hidden_service_v3.py | 36 ++++++++++++++----- 2 files changed, 64 insertions(+), 30 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py index 6911fa3c..e4ee13e3 100644 --- a/stem/descriptor/hidden_service.py +++ b/stem/descriptor/hidden_service.py @@ -671,8 +671,10 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): def create(cls, attr = None, exclude = (), validate = True, sign = False): return cls(cls.content(attr, exclude, sign), validate = validate, skip_crypto_validation = not sign)
- def __init__(self, raw_contents, validate = False, skip_crypto_validation = False): + def __init__(self, raw_contents, validate = False): super(HiddenServiceDescriptorV3, self).__init__(raw_contents, lazy_load = not validate) + + self._inner_layer = None entries = _descriptor_components(raw_contents, validate)
if validate: @@ -691,39 +693,49 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): else: self._entries = entries
- # TODO: The following is only marked as private because it is a work in - # progress. This will probably become something like "body()" which decrypts - # and parses the internal descriptor content. + def decrypt(self, onion_address, validate = False): + """ + Decrypt this descriptor. Hidden serice descriptors contain two encryption + layers (:class:`~stem.descriptor.hidden_service.OuterLayer` and + :class:`~stem.descriptor.hidden_service.InnerLayer`). + + :param str onion_address: hidden service address this descriptor is from + :param bool validate: perform validation checks on decrypted content + + :returns: :class:`~stem.descriptor.hidden_service.InnerLayer` with our + decrypted content + + :raises: + * **ImportError** if required cryptography or sha3 module is unavailable + * **ValueError** if unable to decrypt or validation fails + """
- def _decrypt(self, onion_address, outer_layer = False): if not stem.prereq.is_crypto_available(ed25519 = True): raise ImportError('Hidden service descriptor decryption requires cryptography version 2.6') elif not stem.prereq._is_sha3_available(): raise ImportError('Hidden service descriptor decryption requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
- blinded_key = self.signing_cert.signing_key() - - if not blinded_key: - raise ValueError('No signing key is present') + if self._inner_layer is None: + blinded_key = self.signing_cert.signing_key()
- identity_public_key = HiddenServiceDescriptorV3._public_key_from_address(onion_address) + if not blinded_key: + raise ValueError('No signing key is present')
- # credential = H('credential' | public-identity-key) - # subcredential = H('subcredential' | credential | blinded-public-key) + identity_public_key = HiddenServiceDescriptorV3._public_key_from_address(onion_address)
- credential = hashlib.sha3_256(b'credential%s' % (identity_public_key)).digest() - subcredential = hashlib.sha3_256(b'subcredential%s%s' % (credential, blinded_key)).digest() + # credential = H('credential' | public-identity-key) + # subcredential = H('subcredential' | credential | blinded-public-key)
- outter_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_outter_layer(self.superencrypted, self.revision_counter, blinded_key, subcredential) + credential = hashlib.sha3_256(b'credential%s' % (identity_public_key)).digest() + subcredential = hashlib.sha3_256(b'subcredential%s%s' % (credential, blinded_key)).digest()
- if outer_layer: - return outter_layer_plaintext + outer_layer = OuterLayer(stem.descriptor.hsv3_crypto.decrypt_outter_layer(self.superencrypted, self.revision_counter, blinded_key, subcredential), validate)
- inner_layer_ciphertext = OuterLayer(outter_layer_plaintext).encrypted + inner_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_inner_layer(outer_layer.encrypted, self.revision_counter, blinded_key, subcredential)
- inner_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_inner_layer(inner_layer_ciphertext, self.revision_counter, blinded_key, subcredential) + self._inner_layer = InnerLayer(inner_layer_plaintext, validate, outer_layer)
- return inner_layer_plaintext + return self._inner_layer
@staticmethod def _public_key_from_address(onion_address): @@ -805,6 +817,8 @@ class InnerLayer(Descriptor):
.. versionadded:: 1.8.0
+ :var stem.descriptor.hidden_service.OuterLayer outer: enclosing encryption layer + :var list formats: **\*** recognized CREATE2 cell formats :var list intro_auth: **\*** introduction-layer authentication types :var bool is_single_service: **\*** **True** if this is a `single onion service https://gitweb.torproject.org/torspec.git/tree/proposals/260-rend-single-onion.txt`_, **False** otherwise @@ -827,9 +841,11 @@ class InnerLayer(Descriptor): 'single-onion-service': _parse_v3_inner_single_service, }
- def __init__(self, content, validate = False): + def __init__(self, content, validate = False, outer_layer = None): super(InnerLayer, self).__init__(content, lazy_load = not validate)
+ self.outer = outer_layer + # inner layer begins with a few header fields, followed by multiple any # number of introduction-points
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py index 36b85f7c..093d5cb6 100644 --- a/test/unit/descriptor/hidden_service_v3.py +++ b/test/unit/descriptor/hidden_service_v3.py @@ -35,6 +35,15 @@ BDwQZ8rhp05oCqhhY3oFHqG9KS7HGzv9g2v1/PrVJMbkfpwu1YK4b3zIZAk= -----END ED25519 CERT-----\ """
+with open(get_resource('hidden_service_v3'), 'rb') as descriptor_file: + HS_DESC_STR = descriptor_file.read() + +with open(get_resource('hidden_service_v3_outer_layer'), 'rb') as outer_layer_file: + OUTER_LAYER_STR = outer_layer_file.read() + +with open(get_resource('hidden_service_v3_inner_layer'), 'rb') as inner_layer_file: + INNER_LAYER_STR = inner_layer_file.read() +
class TestHiddenServiceDescriptorV3(unittest.TestCase): def test_real_descriptor(self): @@ -54,20 +63,30 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): self.assertTrue('eaH8VdaTKS' in desc.superencrypted) self.assertEqual('aglChCQF+lbzKgyxJJTpYGVShV/GMDRJ4+cRGCp+a2y/yX/tLSh7hzqI7rVZrUoGj74Xr1CLMYO3fXYCS+DPDQ', desc.signature)
- if stem.prereq.is_crypto_available(ed25519 = True) and stem.prereq._is_sha3_available(): - with open(get_resource('hidden_service_v3_outer_layer'), 'rb') as outer_layer_file: - self.assertEqual(outer_layer_file.read(), desc._decrypt(HS_ADDRESS, outer_layer = True)) + def test_decryption(self): + """ + Decrypt our descriptor and validate its content. + """ + + if not stem.prereq.is_crypto_available(ed25519 = True): + self.skipTest('(requires cryptography ed25519 support)') + return + elif not stem.prereq._is_sha3_available(): + self.skipTest('(requires sha3 support)') + return + + desc = HiddenServiceDescriptorV3.from_str(HS_DESC_STR) + inner_layer = desc.decrypt(HS_ADDRESS)
- with open(get_resource('hidden_service_v3_inner_layer'), 'rb') as outer_layer_file: - self.assertEqual(outer_layer_file.read(), desc._decrypt(HS_ADDRESS, outer_layer = False)) + self.assertEqual(INNER_LAYER_STR, str(inner_layer)) + self.assertEqual(OUTER_LAYER_STR.rstrip(b'\x00'), str(inner_layer.outer))
def test_outer_layer(self): """ Parse the outer layer of our test descriptor. """
- with open(get_resource('hidden_service_v3_outer_layer'), 'rb') as descriptor_file: - desc = OuterLayer(descriptor_file.read()) + desc = OuterLayer(OUTER_LAYER_STR)
self.assertEqual('x25519', desc.auth_type) self.assertEqual('WjZCU9sV1oxkxaPcd7/YozeZgq0lEs6DhWyrdYRNJR4=', desc.ephemeral_key) @@ -85,8 +104,7 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): Parse the inner layer of our test descriptor. """
- with open(get_resource('hidden_service_v3_inner_layer'), 'rb') as descriptor_file: - desc = InnerLayer(descriptor_file.read()) + desc = InnerLayer(INNER_LAYER_STR)
self.assertEqual([2], desc.formats) self.assertEqual(['ed25519'], desc.intro_auth)