commit 04747884a33c5a0874a0143376010a4793245ec8 Author: Damian Johnson atagar@torproject.org Date: Tue Nov 12 16:50:30 2019 -0800
Outer layer creation and encryption
Similar approach as the inner layer. This will provide more flexibility once we propagate the attribute up HiddenServiceDescriptorV3.content(). --- stem/descriptor/hidden_service.py | 143 ++++++++++++------------------ test/unit/descriptor/hidden_service_v3.py | 70 +++++++++++++++ 2 files changed, 126 insertions(+), 87 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py index fcd29032..56ba4d22 100644 --- a/stem/descriptor/hidden_service.py +++ b/stem/descriptor/hidden_service.py @@ -883,62 +883,6 @@ def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key return '\n' + desc_signing_cert.to_base64(pem = True)
-def _get_fake_clients_bytes(): - """ - Generate fake client authorization data for the middle layer - """ - - final_bytes = b'' - num_fake_clients = 16 # default for when client auth is disabled - - for _ in range(num_fake_clients): - client_id = base64.b64encode(os.urandom(8)).rstrip(b'=') - client_iv = base64.b64encode(os.urandom(16)).rstrip(b'=') - descriptor_cookie = base64.b64encode(os.urandom(16)).rstrip(b'=') - - final_bytes += b'%s %s %s %s\n' % (b'auth-client', client_id, client_iv, descriptor_cookie) - - return final_bytes - - -def _get_middle_descriptor_layer_body(encrypted): - """ - Get the middle descriptor layer as bytes - (It's just fake client auth data since client auth is disabled) - """ - - from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey - - fake_pub_key = X25519PrivateKey.generate().public_key() - fake_pub_key_bytes = stem.util._pubkey_bytes(fake_pub_key) - fake_pub_key_bytes_b64 = base64.b64encode(fake_pub_key_bytes) - fake_clients = _get_fake_clients_bytes() - - return b'desc-auth-type x25519\n' \ - b'desc-auth-ephemeral-key %s\n' \ - b'%s' \ - b'%s' % (fake_pub_key_bytes_b64, fake_clients, encrypted) - - -def _get_superencrypted_blob(intro_points, revision_counter, blinded_key, subcredential): - """ - Get the superencrypted blob (which also includes the encrypted blob) that - should be attached to the descriptor - """ - - inner_layer = InnerLayer.create(introduction_points = intro_points) - inner_ciphertext_b64 = b'encrypted\n' + inner_layer.encrypt(revision_counter, blinded_key, subcredential) - - middle_descriptor_layer = _get_middle_descriptor_layer_body(inner_ciphertext_b64) - - # Spec mandated padding: "Before encryption the plaintext is padded with NUL - # bytes to the nearest multiple of 10k bytes." - - middle_descriptor_layer = middle_descriptor_layer + b'\x00' * (len(middle_descriptor_layer) % 10000) - - return b'\n' + _encrypt_layer(middle_descriptor_layer, b'hsdir-superencrypted-data', revision_counter, subcredential, blinded_key) - - class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): """ Version 3 hidden service descriptor. @@ -1023,33 +967,33 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): if not blinding_param: raise ValueError('Need to provide a blinding param for this descriptor')
- # Get the identity public key - public_identity_key_bytes = stem.util._pubkey_bytes(ed25519_private_identity_key) - # Blind the identity key to get ephemeral blinded key blinded_privkey = stem.descriptor.hsv3_crypto.HSv3PrivateBlindedKey(ed25519_private_identity_key, blinding_param = blinding_param) - blinded_pubkey_bytes = blinded_privkey.blinded_pubkey + blinded_key = blinded_privkey.blinded_pubkey
# Generate descriptor signing key signing_key = Ed25519PrivateKey.generate() descriptor_signing_public_key = signing_key.public_key()
# Get the main encrypted descriptor body - revision_counter_int = int(time.time()) - subcredential = HiddenServiceDescriptorV3._subcredential(public_identity_key_bytes, blinded_pubkey_bytes) - - # XXX It would be more elegant to have all the above variables attached to - # this descriptor object so that we don't have to carry them around - # functions and instead we could use e.g. self.descriptor_signing_public_key - # But because this is a @classmethod this is not possible :/ - superencrypted_blob = _get_superencrypted_blob(intro_points, revision_counter_int, blinded_pubkey_bytes, subcredential) + revision_counter = int(time.time()) + subcredential = HiddenServiceDescriptorV3._subcredential(ed25519_private_identity_key, blinded_key) + + outer_layer = OuterLayer.create( + inner_layer = InnerLayer.create( + introduction_points = intro_points, + ), + revision_counter = revision_counter, + subcredential = subcredential, + blinded_key = blinded_key, + )
desc_content = _descriptor_content(attr, exclude, ( ('hs-descriptor', '3'), ('descriptor-lifetime', '180'), ('descriptor-signing-key-cert', _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_privkey)), - ('revision-counter', str(revision_counter_int)), - ('superencrypted', superencrypted_blob), + ('revision-counter', str(revision_counter)), + ('superencrypted', b'\n' + outer_layer._encrypt(revision_counter, subcredential, blinded_key)), ), ())
# Add a final newline before the signature block @@ -1189,11 +1133,11 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): return pubkey
@staticmethod - def _subcredential(public_key, blinded_key): + def _subcredential(identity_key, blinded_key): # credential = H('credential' | public-identity-key) # subcredential = H('subcredential' | credential | blinded-public-key)
- credential = hashlib.sha3_256(b'credential%s' % public_key).digest() + credential = hashlib.sha3_256(b'credential%s' % stem.util._pubkey_bytes(identity_key)).digest() return hashlib.sha3_256(b'subcredential%s%s' % (credential, blinded_key)).digest()
@@ -1233,6 +1177,41 @@ class OuterLayer(Descriptor): plaintext = _decrypt_layer(encrypted, b'hsdir-superencrypted-data', revision_counter, subcredential, blinded_key) return OuterLayer(plaintext)
+ def _encrypt(self, revision_counter, subcredential, blinded_key): + # Spec mandated padding: "Before encryption the plaintext is padded with + # NUL bytes to the nearest multiple of 10k bytes." + + content = self.get_bytes() + b'\x00' * (len(self.get_bytes()) % 10000) + + # encrypt back into a hidden service descriptor's 'superencrypted' field + + return _encrypt_layer(content, b'hsdir-superencrypted-data', revision_counter, subcredential, blinded_key) + + @classmethod + def content(cls, attr = None, exclude = (), validate = True, sign = False, inner_layer = None, revision_counter = None, subcredential = None, blinded_key = None): + if not stem.prereq.is_crypto_available(ed25519 = True): + raise ImportError('Hidden service layer creation requires cryptography version 2.6') + elif sign: + raise NotImplementedError('Signing of %s not implemented' % cls.__name__) + + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + + inner_layer = inner_layer if inner_layer else InnerLayer.create() + revision_counter = revision_counter if revision_counter else 1 + blinded_key = blinded_key if blinded_key else stem.util._pubkey_bytes(Ed25519PrivateKey.generate()) + subcredential = subcredential if subcredential else HiddenServiceDescriptorV3._subcredential(Ed25519PrivateKey.generate(), blinded_key) + + return _descriptor_content(attr, exclude, ( + ('desc-auth-type', 'x25519'), + ('desc-auth-ephemeral-key', base64.b64encode(os.urandom(32))), + ), ( + ('encrypted', b'\n' + inner_layer._encrypt(revision_counter, subcredential, blinded_key)), + )) + + @classmethod + def create(cls, attr = None, exclude = (), validate = True, sign = False, inner_layer = None, revision_counter = None, subcredential = None, blinded_key = None): + return cls(cls.content(attr, exclude, validate, sign, inner_layer, revision_counter, subcredential, blinded_key), validate = validate) + def __init__(self, content, validate = False): content = content.rstrip('\x00') # strip null byte padding
@@ -1281,6 +1260,11 @@ class InnerLayer(Descriptor): plaintext = _decrypt_layer(outer_layer.encrypted, b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key) return InnerLayer(plaintext, validate = True, outer_layer = outer_layer)
+ def _encrypt(self, revision_counter, subcredential, blinded_key): + # encrypt back into an outer layer's 'encrypted' field + + return _encrypt_layer(self.get_bytes(), b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key) + @classmethod def content(cls, attr = None, exclude = (), sign = False, introduction_points = None): if sign: @@ -1322,21 +1306,6 @@ class InnerLayer(Descriptor): else: self._entries = entries
- def encrypt(self, revision_counter, blinded_key, subcredential): - """ - Encrypts into the content contained within the OuterLayer. - - :param int revision_counter: descriptor revision number - :param bytes blinded_key: descriptor signing key - :param bytes subcredential: public key hash - - :returns: base64 encoded content of the outer layer's 'encrypted' field - """ - - if not stem.prereq.is_crypto_available(ed25519 = True): - raise ImportError('Hidden service descriptor encryption requires cryptography version 2.6') - - return _encrypt_layer(self.get_bytes(), b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
# TODO: drop this alias in stem 2.x
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py index f6173768..73f3e2c3 100644 --- a/test/unit/descriptor/hidden_service_v3.py +++ b/test/unit/descriptor/hidden_service_v3.py @@ -17,6 +17,7 @@ import test.require from stem.descriptor.hidden_service import ( IntroductionPointV3, HiddenServiceDescriptorV3, + AuthorizedClient, OuterLayer, InnerLayer, ) @@ -56,6 +57,17 @@ BDwQZ8rhp05oCqhhY3oFHqG9KS7HGzv9g2v1/PrVJMbkfpwu1YK4b3zIZAk= -----END ED25519 CERT-----\ """
+EXPECTED_OUTER_LAYER = """\ +desc-auth-type foo +desc-auth-ephemeral-key bar +auth-client JNil86N07AA epkaL79NtajmgME/egi8oA qosYH4rXisxda3X7p9b6fw +auth-client 1D8VBAh9hdM 6K/uO3sRqBp6URrKC7GB6Q ElwRj5+6SN9kb8bRhiiQvA +encrypted +-----BEGIN MESSAGE----- +malformed block +-----END MESSAGE-----\ +""" + with open(get_resource('hidden_service_v3')) as descriptor_file: HS_DESC_STR = descriptor_file.read()
@@ -333,6 +345,64 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): ]).startswith('create2-formats 2\nintroduction-point AQAGAQEBASMp'))
@test.require.ed25519_support + def test_outer_layer_creation(self): + """ + Outer layer creation. + """ + + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + + # minimal layer + + self.assertTrue(OuterLayer.content().startswith('desc-auth-type x25519\ndesc-auth-ephemeral-key ')) + self.assertEqual('x25519', OuterLayer.create().auth_type) + + # specify the parameters + + desc = OuterLayer.create({ + 'desc-auth-type': 'foo', + 'desc-auth-ephemeral-key': 'bar', + 'auth-client': [ + 'JNil86N07AA epkaL79NtajmgME/egi8oA qosYH4rXisxda3X7p9b6fw', + '1D8VBAh9hdM 6K/uO3sRqBp6URrKC7GB6Q ElwRj5+6SN9kb8bRhiiQvA', + ], + 'encrypted': '\n-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', + }) + + self.assertEqual('foo', desc.auth_type) + self.assertEqual('bar', desc.ephemeral_key) + self.assertEqual('-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', desc.encrypted) + + self.assertEqual({ + '1D8VBAh9hdM': AuthorizedClient(id = '1D8VBAh9hdM', iv = '6K/uO3sRqBp6URrKC7GB6Q', cookie = 'ElwRj5+6SN9kb8bRhiiQvA'), + 'JNil86N07AA': AuthorizedClient(id = 'JNil86N07AA', iv = 'epkaL79NtajmgME/egi8oA', cookie = 'qosYH4rXisxda3X7p9b6fw'), + }, desc.clients) + + self.assertEqual(EXPECTED_OUTER_LAYER, str(desc)) + + # create an inner layer then decrypt it + + revision_counter = 5 + blinded_key = stem.util._pubkey_bytes(Ed25519PrivateKey.generate()) + subcredential = HiddenServiceDescriptorV3._subcredential(Ed25519PrivateKey.generate(), blinded_key) + + outer_layer = OuterLayer.create( + inner_layer = InnerLayer.create( + introduction_points = [ + IntroductionPointV3.create('1.1.1.1', 9001), + ] + ), + revision_counter = revision_counter, + subcredential = subcredential, + blinded_key = blinded_key, + ) + + inner_layer = InnerLayer._decrypt(outer_layer, revision_counter, subcredential, blinded_key) + + self.assertEqual(1, len(inner_layer.introduction_points)) + self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address) + + @test.require.ed25519_support def test_encode_decode_descriptor(self): """ Encode an HSv3 descriptor and then decode it and make sure you get the intended results.