commit 41f8a42a9e8735513b72d9ba3f6a5c11acf02943 Author: Damian Johnson atagar@torproject.org Date: Mon Nov 11 15:32:55 2019 -0800
Inner layer creation and encryption
InnerLayer creation, encryption, and test. While _get_superencrypted_blob() provided a great demo, it was limited to just introduction points. Now we'll support anything the layer does. --- stem/descriptor/__init__.py | 2 +- stem/descriptor/hidden_service.py | 41 +++++++++++++++++++++++--- test/unit/descriptor/hidden_service_v3.py | 48 +++++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 5 deletions(-)
diff --git a/stem/descriptor/__init__.py b/stem/descriptor/__init__.py index 3a3d1838..862d5aca 100644 --- a/stem/descriptor/__init__.py +++ b/stem/descriptor/__init__.py @@ -975,7 +975,7 @@ class Descriptor(object): :returns: **bytes** for the descriptor's contents """
- return self._raw_contents + return stem.util.str_tools._to_bytes(self._raw_contents)
def get_unrecognized_lines(self): """ diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py index 141f54b0..fcd29032 100644 --- a/stem/descriptor/hidden_service.py +++ b/stem/descriptor/hidden_service.py @@ -920,14 +920,14 @@ def _get_middle_descriptor_layer_body(encrypted): b'%s' % (fake_pub_key_bytes_b64, fake_clients, encrypted)
-def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_counter, blinded_key, subcredential): +def _get_superencrypted_blob(intro_points, revision_counter, blinded_key, subcredential): """ Get the superencrypted blob (which also includes the encrypted blob) that should be attached to the descriptor """
- inner_descriptor_layer = stem.util.str_tools._to_bytes('create2-formats 2\n' + '\n'.join(map(IntroductionPointV3.encode, intro_points)) + '\n') - inner_ciphertext_b64 = b'encrypted\n' + _encrypt_layer(inner_descriptor_layer, b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key) + inner_layer = InnerLayer.create(introduction_points = intro_points) + inner_ciphertext_b64 = b'encrypted\n' + inner_layer.encrypt(revision_counter, blinded_key, subcredential)
middle_descriptor_layer = _get_middle_descriptor_layer_body(inner_ciphertext_b64)
@@ -1042,7 +1042,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): # this descriptor object so that we don't have to carry them around # functions and instead we could use e.g. self.descriptor_signing_public_key # But because this is a @classmethod this is not possible :/ - superencrypted_blob = _get_superencrypted_blob(intro_points, signing_key, revision_counter_int, blinded_pubkey_bytes, subcredential) + superencrypted_blob = _get_superencrypted_blob(intro_points, revision_counter_int, blinded_pubkey_bytes, subcredential)
desc_content = _descriptor_content(attr, exclude, ( ('hs-descriptor', '3'), @@ -1281,6 +1281,24 @@ class InnerLayer(Descriptor): plaintext = _decrypt_layer(outer_layer.encrypted, b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key) return InnerLayer(plaintext, validate = True, outer_layer = outer_layer)
+ @classmethod + def content(cls, attr = None, exclude = (), sign = False, introduction_points = None): + if sign: + raise NotImplementedError('Signing of %s not implemented' % cls.__name__) + + if introduction_points: + suffix = '\n' + '\n'.join(map(IntroductionPointV3.encode, introduction_points)) + else: + suffix = '' + + return _descriptor_content(attr, exclude, ( + ('create2-formats', '2'), + )) + suffix + + @classmethod + def create(cls, attr = None, exclude = (), validate = True, sign = False, introduction_points = None): + return cls(cls.content(attr, exclude, sign, introduction_points), validate = validate) + def __init__(self, content, validate = False, outer_layer = None): super(InnerLayer, self).__init__(content, lazy_load = not validate) self.outer = outer_layer @@ -1304,6 +1322,21 @@ class InnerLayer(Descriptor): else: self._entries = entries
+ def encrypt(self, revision_counter, blinded_key, subcredential): + """ + Encrypts into the content contained within the OuterLayer. + + :param int revision_counter: descriptor revision number + :param bytes blinded_key: descriptor signing key + :param bytes subcredential: public key hash + + :returns: base64 encoded content of the outer layer's 'encrypted' field + """ + + if not stem.prereq.is_crypto_available(ed25519 = True): + raise ImportError('Hidden service descriptor encryption requires cryptography version 2.6') + + return _encrypt_layer(self.get_bytes(), b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
# TODO: drop this alias in stem 2.x
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py index 163ea3db..f6173768 100644 --- a/test/unit/descriptor/hidden_service_v3.py +++ b/test/unit/descriptor/hidden_service_v3.py @@ -28,6 +28,12 @@ from test.unit.descriptor import ( )
try: + # added in python 2.7 + from collections import OrderedDict +except ImportError: + from stem.util.ordereddict import OrderedDict + +try: # added in python 3.3 from unittest.mock import patch, Mock except ImportError: @@ -284,6 +290,48 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): reparsed = IntroductionPointV3.parse(intro_point.encode()) self.assertEqual(intro_point, reparsed)
+ def test_inner_layer_creation(self): + """ + Internal layer creation. + """ + + # minimal layer + + self.assertEqual('create2-formats 2', InnerLayer.content()) + self.assertEqual([2], InnerLayer.create().formats) + + # specify their only mandatory parameter (formats) + + self.assertEqual('create2-formats 1 2 3', InnerLayer.content({'create2-formats': '1 2 3'})) + self.assertEqual([1, 2, 3], InnerLayer.create({'create2-formats': '1 2 3'}).formats) + + # include optional parameters + + desc = InnerLayer.create(OrderedDict(( + ('intro-auth-required', 'ed25519'), + ('single-onion-service', ''), + ))) + + self.assertEqual([2], desc.formats) + self.assertEqual(['ed25519'], desc.intro_auth) + self.assertEqual(True, desc.is_single_service) + self.assertEqual([], desc.introduction_points) + + # include introduction points + + desc = InnerLayer.create(introduction_points = [ + IntroductionPointV3.create('1.1.1.1', 9001), + IntroductionPointV3.create('2.2.2.2', 9001), + IntroductionPointV3.create('3.3.3.3', 9001), + ]) + + self.assertEqual(3, len(desc.introduction_points)) + self.assertEqual('1.1.1.1', desc.introduction_points[0].link_specifiers[0].address) + + self.assertTrue(InnerLayer.content(introduction_points = [ + IntroductionPointV3.create('1.1.1.1', 9001), + ]).startswith('create2-formats 2\nintroduction-point AQAGAQEBASMp')) + @test.require.ed25519_support def test_encode_decode_descriptor(self): """
tor-commits@lists.torproject.org