commit 04747884a33c5a0874a0143376010a4793245ec8
Author: Damian Johnson <atagar(a)torproject.org>
Date: Tue Nov 12 16:50:30 2019 -0800
Outer layer creation and encryption
Similar approach as the inner layer. This will provide more flexibility once we
propagate the attribute up HiddenServiceDescriptorV3.content().
---
stem/descriptor/hidden_service.py | 143 ++++++++++++------------------
test/unit/descriptor/hidden_service_v3.py | 70 +++++++++++++++
2 files changed, 126 insertions(+), 87 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index fcd29032..56ba4d22 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -883,62 +883,6 @@ def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key
return '\n' + desc_signing_cert.to_base64(pem = True)
-def _get_fake_clients_bytes():
- """
- Generate fake client authorization data for the middle layer
- """
-
- final_bytes = b''
- num_fake_clients = 16 # default for when client auth is disabled
-
- for _ in range(num_fake_clients):
- client_id = base64.b64encode(os.urandom(8)).rstrip(b'=')
- client_iv = base64.b64encode(os.urandom(16)).rstrip(b'=')
- descriptor_cookie = base64.b64encode(os.urandom(16)).rstrip(b'=')
-
- final_bytes += b'%s %s %s %s\n' % (b'auth-client', client_id, client_iv, descriptor_cookie)
-
- return final_bytes
-
-
-def _get_middle_descriptor_layer_body(encrypted):
- """
- Get the middle descriptor layer as bytes
- (It's just fake client auth data since client auth is disabled)
- """
-
- from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
-
- fake_pub_key = X25519PrivateKey.generate().public_key()
- fake_pub_key_bytes = stem.util._pubkey_bytes(fake_pub_key)
- fake_pub_key_bytes_b64 = base64.b64encode(fake_pub_key_bytes)
- fake_clients = _get_fake_clients_bytes()
-
- return b'desc-auth-type x25519\n' \
- b'desc-auth-ephemeral-key %s\n' \
- b'%s' \
- b'%s' % (fake_pub_key_bytes_b64, fake_clients, encrypted)
-
-
-def _get_superencrypted_blob(intro_points, revision_counter, blinded_key, subcredential):
- """
- Get the superencrypted blob (which also includes the encrypted blob) that
- should be attached to the descriptor
- """
-
- inner_layer = InnerLayer.create(introduction_points = intro_points)
- inner_ciphertext_b64 = b'encrypted\n' + inner_layer.encrypt(revision_counter, blinded_key, subcredential)
-
- middle_descriptor_layer = _get_middle_descriptor_layer_body(inner_ciphertext_b64)
-
- # Spec mandated padding: "Before encryption the plaintext is padded with NUL
- # bytes to the nearest multiple of 10k bytes."
-
- middle_descriptor_layer = middle_descriptor_layer + b'\x00' * (len(middle_descriptor_layer) % 10000)
-
- return b'\n' + _encrypt_layer(middle_descriptor_layer, b'hsdir-superencrypted-data', revision_counter, subcredential, blinded_key)
-
-
class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
"""
Version 3 hidden service descriptor.
@@ -1023,33 +967,33 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
if not blinding_param:
raise ValueError('Need to provide a blinding param for this descriptor')
- # Get the identity public key
- public_identity_key_bytes = stem.util._pubkey_bytes(ed25519_private_identity_key)
-
# Blind the identity key to get ephemeral blinded key
blinded_privkey = stem.descriptor.hsv3_crypto.HSv3PrivateBlindedKey(ed25519_private_identity_key, blinding_param = blinding_param)
- blinded_pubkey_bytes = blinded_privkey.blinded_pubkey
+ blinded_key = blinded_privkey.blinded_pubkey
# Generate descriptor signing key
signing_key = Ed25519PrivateKey.generate()
descriptor_signing_public_key = signing_key.public_key()
# Get the main encrypted descriptor body
- revision_counter_int = int(time.time())
- subcredential = HiddenServiceDescriptorV3._subcredential(public_identity_key_bytes, blinded_pubkey_bytes)
-
- # XXX It would be more elegant to have all the above variables attached to
- # this descriptor object so that we don't have to carry them around
- # functions and instead we could use e.g. self.descriptor_signing_public_key
- # But because this is a @classmethod this is not possible :/
- superencrypted_blob = _get_superencrypted_blob(intro_points, revision_counter_int, blinded_pubkey_bytes, subcredential)
+ revision_counter = int(time.time())
+ subcredential = HiddenServiceDescriptorV3._subcredential(ed25519_private_identity_key, blinded_key)
+
+ outer_layer = OuterLayer.create(
+ inner_layer = InnerLayer.create(
+ introduction_points = intro_points,
+ ),
+ revision_counter = revision_counter,
+ subcredential = subcredential,
+ blinded_key = blinded_key,
+ )
desc_content = _descriptor_content(attr, exclude, (
('hs-descriptor', '3'),
('descriptor-lifetime', '180'),
('descriptor-signing-key-cert', _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_privkey)),
- ('revision-counter', str(revision_counter_int)),
- ('superencrypted', superencrypted_blob),
+ ('revision-counter', str(revision_counter)),
+ ('superencrypted', b'\n' + outer_layer._encrypt(revision_counter, subcredential, blinded_key)),
), ())
# Add a final newline before the signature block
@@ -1189,11 +1133,11 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
return pubkey
@staticmethod
- def _subcredential(public_key, blinded_key):
+ def _subcredential(identity_key, blinded_key):
# credential = H('credential' | public-identity-key)
# subcredential = H('subcredential' | credential | blinded-public-key)
- credential = hashlib.sha3_256(b'credential%s' % public_key).digest()
+ credential = hashlib.sha3_256(b'credential%s' % stem.util._pubkey_bytes(identity_key)).digest()
return hashlib.sha3_256(b'subcredential%s%s' % (credential, blinded_key)).digest()
@@ -1233,6 +1177,41 @@ class OuterLayer(Descriptor):
plaintext = _decrypt_layer(encrypted, b'hsdir-superencrypted-data', revision_counter, subcredential, blinded_key)
return OuterLayer(plaintext)
+ def _encrypt(self, revision_counter, subcredential, blinded_key):
+ # Spec mandated padding: "Before encryption the plaintext is padded with
+ # NUL bytes to the nearest multiple of 10k bytes."
+
+ content = self.get_bytes() + b'\x00' * (len(self.get_bytes()) % 10000)
+
+ # encrypt back into a hidden service descriptor's 'superencrypted' field
+
+ return _encrypt_layer(content, b'hsdir-superencrypted-data', revision_counter, subcredential, blinded_key)
+
+ @classmethod
+ def content(cls, attr = None, exclude = (), validate = True, sign = False, inner_layer = None, revision_counter = None, subcredential = None, blinded_key = None):
+ if not stem.prereq.is_crypto_available(ed25519 = True):
+ raise ImportError('Hidden service layer creation requires cryptography version 2.6')
+ elif sign:
+ raise NotImplementedError('Signing of %s not implemented' % cls.__name__)
+
+ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+
+ inner_layer = inner_layer if inner_layer else InnerLayer.create()
+ revision_counter = revision_counter if revision_counter else 1
+ blinded_key = blinded_key if blinded_key else stem.util._pubkey_bytes(Ed25519PrivateKey.generate())
+ subcredential = subcredential if subcredential else HiddenServiceDescriptorV3._subcredential(Ed25519PrivateKey.generate(), blinded_key)
+
+ return _descriptor_content(attr, exclude, (
+ ('desc-auth-type', 'x25519'),
+ ('desc-auth-ephemeral-key', base64.b64encode(os.urandom(32))),
+ ), (
+ ('encrypted', b'\n' + inner_layer._encrypt(revision_counter, subcredential, blinded_key)),
+ ))
+
+ @classmethod
+ def create(cls, attr = None, exclude = (), validate = True, sign = False, inner_layer = None, revision_counter = None, subcredential = None, blinded_key = None):
+ return cls(cls.content(attr, exclude, validate, sign, inner_layer, revision_counter, subcredential, blinded_key), validate = validate)
+
def __init__(self, content, validate = False):
content = content.rstrip('\x00') # strip null byte padding
@@ -1281,6 +1260,11 @@ class InnerLayer(Descriptor):
plaintext = _decrypt_layer(outer_layer.encrypted, b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
return InnerLayer(plaintext, validate = True, outer_layer = outer_layer)
+ def _encrypt(self, revision_counter, subcredential, blinded_key):
+ # encrypt back into an outer layer's 'encrypted' field
+
+ return _encrypt_layer(self.get_bytes(), b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
+
@classmethod
def content(cls, attr = None, exclude = (), sign = False, introduction_points = None):
if sign:
@@ -1322,21 +1306,6 @@ class InnerLayer(Descriptor):
else:
self._entries = entries
- def encrypt(self, revision_counter, blinded_key, subcredential):
- """
- Encrypts into the content contained within the OuterLayer.
-
- :param int revision_counter: descriptor revision number
- :param bytes blinded_key: descriptor signing key
- :param bytes subcredential: public key hash
-
- :returns: base64 encoded content of the outer layer's 'encrypted' field
- """
-
- if not stem.prereq.is_crypto_available(ed25519 = True):
- raise ImportError('Hidden service descriptor encryption requires cryptography version 2.6')
-
- return _encrypt_layer(self.get_bytes(), b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
# TODO: drop this alias in stem 2.x
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index f6173768..73f3e2c3 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -17,6 +17,7 @@ import test.require
from stem.descriptor.hidden_service import (
IntroductionPointV3,
HiddenServiceDescriptorV3,
+ AuthorizedClient,
OuterLayer,
InnerLayer,
)
@@ -56,6 +57,17 @@ BDwQZ8rhp05oCqhhY3oFHqG9KS7HGzv9g2v1/PrVJMbkfpwu1YK4b3zIZAk=
-----END ED25519 CERT-----\
"""
+EXPECTED_OUTER_LAYER = """\
+desc-auth-type foo
+desc-auth-ephemeral-key bar
+auth-client JNil86N07AA epkaL79NtajmgME/egi8oA qosYH4rXisxda3X7p9b6fw
+auth-client 1D8VBAh9hdM 6K/uO3sRqBp6URrKC7GB6Q ElwRj5+6SN9kb8bRhiiQvA
+encrypted
+-----BEGIN MESSAGE-----
+malformed block
+-----END MESSAGE-----\
+"""
+
with open(get_resource('hidden_service_v3')) as descriptor_file:
HS_DESC_STR = descriptor_file.read()
@@ -333,6 +345,64 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
]).startswith('create2-formats 2\nintroduction-point AQAGAQEBASMp'))
@test.require.ed25519_support
+ def test_outer_layer_creation(self):
+ """
+ Outer layer creation.
+ """
+
+ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+
+ # minimal layer
+
+ self.assertTrue(OuterLayer.content().startswith('desc-auth-type x25519\ndesc-auth-ephemeral-key '))
+ self.assertEqual('x25519', OuterLayer.create().auth_type)
+
+ # specify the parameters
+
+ desc = OuterLayer.create({
+ 'desc-auth-type': 'foo',
+ 'desc-auth-ephemeral-key': 'bar',
+ 'auth-client': [
+ 'JNil86N07AA epkaL79NtajmgME/egi8oA qosYH4rXisxda3X7p9b6fw',
+ '1D8VBAh9hdM 6K/uO3sRqBp6URrKC7GB6Q ElwRj5+6SN9kb8bRhiiQvA',
+ ],
+ 'encrypted': '\n-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----',
+ })
+
+ self.assertEqual('foo', desc.auth_type)
+ self.assertEqual('bar', desc.ephemeral_key)
+ self.assertEqual('-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', desc.encrypted)
+
+ self.assertEqual({
+ '1D8VBAh9hdM': AuthorizedClient(id = '1D8VBAh9hdM', iv = '6K/uO3sRqBp6URrKC7GB6Q', cookie = 'ElwRj5+6SN9kb8bRhiiQvA'),
+ 'JNil86N07AA': AuthorizedClient(id = 'JNil86N07AA', iv = 'epkaL79NtajmgME/egi8oA', cookie = 'qosYH4rXisxda3X7p9b6fw'),
+ }, desc.clients)
+
+ self.assertEqual(EXPECTED_OUTER_LAYER, str(desc))
+
+ # create an inner layer then decrypt it
+
+ revision_counter = 5
+ blinded_key = stem.util._pubkey_bytes(Ed25519PrivateKey.generate())
+ subcredential = HiddenServiceDescriptorV3._subcredential(Ed25519PrivateKey.generate(), blinded_key)
+
+ outer_layer = OuterLayer.create(
+ inner_layer = InnerLayer.create(
+ introduction_points = [
+ IntroductionPointV3.create('1.1.1.1', 9001),
+ ]
+ ),
+ revision_counter = revision_counter,
+ subcredential = subcredential,
+ blinded_key = blinded_key,
+ )
+
+ inner_layer = InnerLayer._decrypt(outer_layer, revision_counter, subcredential, blinded_key)
+
+ self.assertEqual(1, len(inner_layer.introduction_points))
+ self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address)
+
+ @test.require.ed25519_support
def test_encode_decode_descriptor(self):
"""
Encode an HSv3 descriptor and then decode it and make sure you get the intended results.