commit eca1fe8073c1f43e3f9eab114bcc192f6ca0deac Author: Damian Johnson atagar@torproject.org Date: Fri Nov 15 13:22:45 2019 -0800
HSv3 descriptor creation test
Now that we've revised HiddenServiceDescriptorV3.content() we can test it in a similar fashion to inner/outer layers. This provides the same coverage as test_encode_decode_descriptor() so that test is now redundant.
Fixing a few bugs this spotted along the way and renaming the 'public_key_from_address' functions to specify that they take identity keys instead (we have so many key types I lost track of what these truly did when I originally tacked 'em on). --- stem/descriptor/hidden_service.py | 28 ++++++---- test/unit/descriptor/hidden_service_v3.py | 88 ++++++++++++++----------------- 2 files changed, 57 insertions(+), 59 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py index dd4ab934..cdb8d645 100644 --- a/stem/descriptor/hidden_service.py +++ b/stem/descriptor/hidden_service.py @@ -957,6 +957,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
blinded_key = stem.descriptor.hsv3_crypto.HSv3PrivateBlindedKey(identity_key, blinding_param = blinding_param) subcredential = HiddenServiceDescriptorV3._subcredential(identity_key, blinded_key.blinded_pubkey) + custom_sig = attr.pop('signature') if (attr and 'signature' in attr) else None
if not outer_layer: outer_layer = OuterLayer.create( @@ -983,7 +984,9 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): ('superencrypted', b'\n' + outer_layer._encrypt(revision_counter, subcredential, blinded_key.blinded_pubkey)), ), ()) + b'\n'
- if 'signature' not in exclude: + if custom_sig: + desc_content += b'signature %s' % custom_sig + elif 'signature' not in exclude: sig_content = stem.descriptor.certificate.SIG_PREFIX_HS_V3 + desc_content desc_content += b'signature %s' % base64.b64encode(signing_key.sign(sig_content)).rstrip(b'=')
@@ -991,7 +994,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
@classmethod def create(cls, attr = None, exclude = (), validate = True, sign = False, inner_layer = None, outer_layer = None, identity_key = None, signing_key = None, signing_cert = None, revision_counter = None, blinding_param = None): - return cls(cls.content(attr, exclude, sign, inner_layer, outer_layer, identity_key, signing_key, signing_cert, revision_counter, blinding_param), validate = validate, skip_crypto_validation = not sign) + return cls(cls.content(attr, exclude, sign, inner_layer, outer_layer, identity_key, signing_key, signing_cert, revision_counter, blinding_param), validate = validate)
def __init__(self, raw_contents, validate = False): super(HiddenServiceDescriptorV3, self).__init__(raw_contents, lazy_load = not validate) @@ -1045,7 +1048,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): if not blinded_key: raise ValueError('No signing key is present')
- identity_public_key = HiddenServiceDescriptorV3.public_key_from_address(onion_address) + identity_public_key = HiddenServiceDescriptorV3.identity_key_from_address(onion_address) subcredential = HiddenServiceDescriptorV3._subcredential(identity_public_key, blinded_key)
outer_layer = OuterLayer._decrypt(self.superencrypted, self.revision_counter, subcredential, blinded_key) @@ -1054,11 +1057,12 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): return self._inner_layer
@staticmethod - def address_from_public_key(pubkey, suffix = True): + def address_from_identity_key(key, suffix = True): """ - Converts a hidden service public key into its address. + Converts a hidden service identity key into its address. This accepts all + key formats (private, public, or public bytes).
- :param bytes pubkey: hidden service public key + :param Ed25519PublicKey,Ed25519PrivateKey,bytes key: hidden service identity key :param bool suffix: includes the '.onion' suffix if true, excluded otherwise
:returns: **unicode** hidden service address @@ -1069,20 +1073,22 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): if not stem.prereq._is_sha3_available(): raise ImportError('Hidden service address conversion requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
+ key = stem.util._pubkey_bytes(key) # normalize key into bytes + version = stem.client.datatype.Size.CHAR.pack(3) - checksum = hashlib.sha3_256(CHECKSUM_CONSTANT + pubkey + version).digest()[:2] - onion_address = base64.b32encode(pubkey + checksum + version) + checksum = hashlib.sha3_256(CHECKSUM_CONSTANT + key + version).digest()[:2] + onion_address = base64.b32encode(key + checksum + version)
return stem.util.str_tools._to_unicode(onion_address + b'.onion' if suffix else onion_address).lower()
@staticmethod - def public_key_from_address(onion_address): + def identity_key_from_address(onion_address): """ - Converts a hidden service address into its public key. + Converts a hidden service address into its public identity key.
:param str onion_address: hidden service address
- :returns: **bytes** for the hidden service's public key + :returns: **bytes** for the hidden service's public identity key
:raises: * **ImportError** if sha3 unsupported diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py index 5bfd5cc4..34108188 100644 --- a/test/unit/descriptor/hidden_service_v3.py +++ b/test/unit/descriptor/hidden_service_v3.py @@ -3,7 +3,6 @@ Unit tests for stem.descriptor.hidden_service for version 3. """
import base64 -import datetime import functools import unittest
@@ -249,14 +248,14 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): expect_invalid_attr(self, {'revision-counter': test_value}, 'revision_counter')
@require_sha3 - def test_address_from_public_key(self): - self.assertEqual(HS_ADDRESS, HiddenServiceDescriptorV3.address_from_public_key(HS_PUBKEY)) + def test_address_from_identity_key(self): + self.assertEqual(HS_ADDRESS, HiddenServiceDescriptorV3.address_from_identity_key(HS_PUBKEY))
@require_sha3 - def test_public_key_from_address(self): - self.assertEqual(HS_PUBKEY, HiddenServiceDescriptorV3.public_key_from_address(HS_ADDRESS)) - self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3.public_key_from_address, 'boom') - self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3.public_key_from_address, '5' * 56) + def test_identity_key_from_address(self): + self.assertEqual(HS_PUBKEY, HiddenServiceDescriptorV3.identity_key_from_address(HS_ADDRESS)) + self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3.identity_key_from_address, 'boom') + self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3.identity_key_from_address, '5' * 56)
def test_intro_point_parse(self): """ @@ -430,60 +429,53 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address)
@test.require.ed25519_support - def test_encode_decode_descriptor(self): + def test_descriptor_creation(self): """ - Encode an HSv3 descriptor and then decode it and make sure you get the intended results. - - This test is from the point of view of the onionbalance, so the object that - this test generates is the data that onionbalance also has available when - making onion service descriptors. + HiddenServiceDescriptorV3 creation. """
if SKIP_SLOW_TESTS: return
- from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey - from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey - - onion_key = X25519PrivateKey.generate() - enc_key = X25519PrivateKey.generate() - auth_key = Ed25519PrivateKey.generate() - signing_key = Ed25519PrivateKey.generate() + # minimal descriptor
- expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54) + self.assertTrue(HiddenServiceDescriptorV3.content().startswith('hs-descriptor 3\ndescriptor-lifetime 180\n')) + self.assertEqual(180, HiddenServiceDescriptorV3.create().lifetime)
- # Build the service - private_identity_key = Ed25519PrivateKey.from_private_bytes(b'a' * 32) - public_identity_key = private_identity_key.public_key() + # specify the parameters
- onion_address = HiddenServiceDescriptorV3.address_from_public_key(stem.util._pubkey_bytes(public_identity_key)) + desc = HiddenServiceDescriptorV3.create({ + 'hs-descriptor': '4', + 'descriptor-lifetime': '123', + 'descriptor-signing-key-cert': '\n-----BEGIN ED25519 CERT-----\nmalformed block\n-----END ED25519 CERT-----', + 'revision-counter': '5', + 'superencrypted': '\n-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', + 'signature': 'abcde', + }, validate = False) + + self.assertEqual(4, desc.version) + self.assertEqual(123, desc.lifetime) + self.assertEqual(None, desc.signing_cert) # malformed cert dropped because validation is disabled + self.assertEqual(5, desc.revision_counter) + self.assertEqual('-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', desc.superencrypted) + self.assertEqual('abcde', desc.signature)
- intro_points = [ - IntroductionPointV3.create('1.1.1.1', 9001, expiration, onion_key, enc_key, auth_key, signing_key), - IntroductionPointV3.create('2.2.2.2', 9001, expiration, onion_key, enc_key, auth_key, signing_key), - IntroductionPointV3.create('3.3.3.3', 9001, expiration, onion_key, enc_key, auth_key, signing_key), - ] + # include introduction points
- # TODO: replace with bytes.fromhex() when we drop python 2.x support + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
- blind_param = bytearray.fromhex('677776AE42464CAAB0DF0BF1E68A5FB651A390A6A8243CF4B60EE73A6AC2E4E3') + identity_key = Ed25519PrivateKey.generate() + onion_address = HiddenServiceDescriptorV3.address_from_identity_key(identity_key)
- # Build the descriptor - desc_string = HiddenServiceDescriptorV3.content(identity_key = private_identity_key, inner_layer = InnerLayer.create(introduction_points = intro_points), blinding_param = blind_param) + desc = HiddenServiceDescriptorV3.create( + identity_key = identity_key, + inner_layer = InnerLayer.create(introduction_points = [ + IntroductionPointV3.create('1.1.1.1', 9001), + IntroductionPointV3.create('2.2.2.2', 9001), + IntroductionPointV3.create('3.3.3.3', 9001), + ]), + )
- # Parse the descriptor - desc = HiddenServiceDescriptorV3.from_str(desc_string) inner_layer = desc.decrypt(onion_address) - self.assertEqual(3, len(inner_layer.introduction_points)) - - for i, intro_point in enumerate(inner_layer.introduction_points): - original = intro_points[i] - - self.assertEqual(original.enc_key_raw, intro_point.enc_key_raw) - self.assertEqual(original.onion_key_raw, intro_point.onion_key_raw) - self.assertEqual(original.auth_key_cert.key, intro_point.auth_key_cert.key) - - self.assertEqual(intro_point.enc_key_raw, base64.b64encode(stem.util._pubkey_bytes(intro_point.enc_key()))) - self.assertEqual(intro_point.onion_key_raw, base64.b64encode(stem.util._pubkey_bytes(intro_point.onion_key()))) - self.assertEqual(intro_point.auth_key_cert.key, stem.util._pubkey_bytes(intro_point.auth_key())) + self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address)