commit 6a17f072d74718c73be5fb003b98fd3d8babb02d Author: George Kadianakis desnacked@riseup.net Date: Thu Oct 10 21:01:01 2019 +0300
Introduce encode-to-decode unittest for v3 descriptors.
This unittest encodes a v3 descriptor, then decodes it, and checks that the parsing went right by checking the parsed attributes (in this case the introduction points). --- stem/descriptor/hsv3_crypto.py | 37 ++++++++++++++ test/unit/descriptor/certificate.py | 36 ++++++++++++++ test/unit/descriptor/hidden_service_v3.py | 82 ++++++++++++++++++++++++++++++- 3 files changed, 153 insertions(+), 2 deletions(-)
diff --git a/stem/descriptor/hsv3_crypto.py b/stem/descriptor/hsv3_crypto.py index a4fb7bf5..275450c0 100644 --- a/stem/descriptor/hsv3_crypto.py +++ b/stem/descriptor/hsv3_crypto.py @@ -11,6 +11,17 @@ from cryptography.hazmat.primitives import serialization import stem.descriptor.ed25519_exts_ref as ed25519_exts_ref import stem.descriptor.slow_ed25519 as slow_ed25519
+def pubkeys_are_equal(pubkey1, pubkey2): + """ + Compare the raw bytes of the two pubkeys and return True if they are the same + """ + pubkey1_bytes = pubkey1.public_bytes(encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw) + pubkey2_bytes = pubkey2.public_bytes(encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw) + + return pubkey1_bytes == pubkey2_bytes + """ HSv3 Key blinding
@@ -72,6 +83,32 @@ def get_subcredential(public_identity_key, blinded_key):
return subcredential
+""" +Onion address + + onion_address = base32(PUBKEY | CHECKSUM | VERSION) + ".onion" + CHECKSUM = H(".onion checksum" | PUBKEY | VERSION)[:2] + + - PUBKEY is the 32 bytes ed25519 master pubkey of the hidden service. + - VERSION is an one byte version field (default value '\x03') + - ".onion checksum" is a constant string + - CHECKSUM is truncated to two bytes before inserting it in onion_address +""" +CHECKSUM_CONSTANT = b".onion checksum" + +def encode_onion_address(ed25519_pub_key_bytes): + """ + Given the public key, return the onion address + """ + version = 3 + checksum_body = b"%s%s%d" % (CHECKSUM_CONSTANT, ed25519_pub_key_bytes, version) + checksum = hashlib.sha3_256(checksum_body).digest()[:2] + + onion_address_bytes = b"%s%s%d" % (ed25519_pub_key_bytes, checksum, version) + onion_address = base64.b32encode(onion_address_bytes) + b".onion" + assert(len(onion_address) == 56 + len(".onion")) + + return onion_address.lower()
""" Basic descriptor logic: diff --git a/test/unit/descriptor/certificate.py b/test/unit/descriptor/certificate.py index 51960525..29a06b42 100644 --- a/test/unit/descriptor/certificate.py +++ b/test/unit/descriptor/certificate.py @@ -15,6 +15,8 @@ import test.require from stem.descriptor.certificate import ED25519_SIGNATURE_LENGTH, CertType, ExtensionType, ExtensionFlag, Ed25519Certificate, Ed25519CertificateV1, Ed25519Extension from test.unit.descriptor import get_resource
+from cryptography.hazmat.primitives import serialization + ED25519_CERT = """ AQQABhtZAaW2GoBED1IjY3A6f6GNqBEl5A83fD2Za9upGke51JGqAQAgBABnprVR ptIr43bWPo2fIzo3uOywfoMrryprpbm4HhCkZMaO064LP+1KNuLvlc8sGG8lTjx1 @@ -193,3 +195,37 @@ class TestEd25519Certificate(unittest.TestCase):
cert = Ed25519Certificate.parse(certificate()) self.assertRaisesWith(ValueError, 'Ed25519KeyCertificate signing key is invalid (Signature was forged or corrupt)', cert.validate, desc) + + @test.require.ed25519_support + def test_encode_decode_certificate(self): + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + + certified_priv_key = Ed25519PrivateKey.generate() + certified_pub_key = certified_priv_key.public_key() + + signing_priv_key = Ed25519PrivateKey.generate() + + expiration_date = datetime.datetime(2037, 8, 28, 17, 0) + + my_ed_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type=CertType.HS_V3_DESC_SIGNING_KEY, + expiration_date=expiration_date, + cert_key_type=1, + certified_pub_key=certified_pub_key, + signing_priv_key=signing_priv_key, + include_signing_key=True) + + ed_cert_bytes = my_ed_cert.encode() + self.assertTrue(my_ed_cert) + + # base64 the cert since that's what the parsing func expects + ed_cert_bytes_b64 = base64.b64encode(ed_cert_bytes) + + ed_cert_parsed = stem.descriptor.certificate.Ed25519Certificate.parse(ed_cert_bytes_b64) + + self.assertEqual(ed_cert_parsed.type, my_ed_cert.cert_type) + self.assertEqual(ed_cert_parsed.expiration, my_ed_cert.expiration_date) + self.assertEqual(ed_cert_parsed.key_type, my_ed_cert.cert_key_type) + self.assertEqual(ed_cert_parsed.key, my_ed_cert.certified_pub_key.public_bytes(encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw)) + self.assertEqual(ed_cert_parsed.get_signing_key(), my_ed_cert.signing_pub_key.public_bytes(encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw)) diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py index 37781b5f..b2a58fbc 100644 --- a/test/unit/descriptor/hidden_service_v3.py +++ b/test/unit/descriptor/hidden_service_v3.py @@ -5,12 +5,19 @@ Unit tests for stem.descriptor.hidden_service for version 3. import functools import unittest
+from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey +from cryptography.hazmat.primitives import serialization + import stem.client.datatype import stem.descriptor import stem.prereq
+import stem.descriptor.hsv3_crypto as hsv3_crypto + from stem.descriptor.hidden_service import ( REQUIRED_V3_FIELDS, + IntroductionPointV3, HiddenServiceDescriptorV3, OuterLayer, InnerLayer, @@ -44,7 +51,6 @@ with open(get_resource('hidden_service_v3_outer_layer')) as outer_layer_file: with open(get_resource('hidden_service_v3_inner_layer')) as inner_layer_file: INNER_LAYER_STR = inner_layer_file.read()
- class TestHiddenServiceDescriptorV3(unittest.TestCase): def test_real_descriptor(self): """ @@ -145,8 +151,10 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): 'signature': 'signature', }
+ private_identity_key = Ed25519PrivateKey.generate() for line in REQUIRED_V3_FIELDS: - desc_text = HiddenServiceDescriptorV3.content(exclude = (line,)) + desc_text = HiddenServiceDescriptorV3.content(exclude = (line,), + ed25519_private_identity_key=private_identity_key) expect_invalid_attr_for_text(self, desc_text, line_to_attr[line], None)
def test_invalid_version(self): @@ -202,3 +210,73 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): self.assertEqual(b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1', HiddenServiceDescriptorV3._public_key_from_address(HS_ADDRESS)) self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3._public_key_from_address, 'boom') self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3._public_key_from_address, '5' * 56) + + def _helper_get_intro(self): + link_specifiers = [] + + link1, _ = stem.client.datatype.LinkSpecifier.pop(b'\x03\x20CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC') + link_specifiers.append(link1) + + onion_privkey = X25519PrivateKey.generate() + onion_pubkey = onion_privkey.public_key() + + auth_privkey = Ed25519PrivateKey.generate() + auth_pubkey = auth_privkey.public_key() + + enc_privkey = X25519PrivateKey.generate() + enc_pubkey = enc_privkey.public_key() + + intro = IntroductionPointV3(link_specifiers, onion_key=onion_pubkey, enc_key=enc_pubkey, auth_key=auth_pubkey) + + return intro + + def test_encode_decode_descriptor(self): + """ + Encode an HSv3 descriptor and then decode it and make sure you get the intended results. + + This test is from the point of view of the onionbalance, so the object that + this test generates is the data that onionbalance also has available when + making onion service descriptors. + """ + # Build the service + private_identity_key = Ed25519PrivateKey.from_private_bytes(b"a"*32) + public_identity_key = private_identity_key.public_key() + pubkey_bytes = public_identity_key.public_bytes(encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw) + + onion_address = hsv3_crypto.encode_onion_address(pubkey_bytes).decode() + + # Build the introduction points + intro1 = self._helper_get_intro() + intro2 = self._helper_get_intro() + intro3 = self._helper_get_intro() + intro_points = [intro1, intro2, intro3] + + blind_param = bytes.fromhex("677776AE42464CAAB0DF0BF1E68A5FB651A390A6A8243CF4B60EE73A6AC2E4E3") + + # Build the descriptor + desc_string = HiddenServiceDescriptorV3.content(ed25519_private_identity_key=private_identity_key, + intro_points=intro_points, + blinding_param=blind_param) + desc_string = desc_string.decode() + + # Parse the descriptor + desc = HiddenServiceDescriptorV3.from_str(desc_string) + inner_layer = desc.decrypt(onion_address) + + self.assertEqual(len(inner_layer.introduction_points), 3) + + # Match introduction points of the parsed descriptor and the generated + # descriptor and do some sanity checks between them to make sure that + # parsing was done right! + for desc_intro in inner_layer.introduction_points: + original_found = False # Make sure we found all the intro points + + for original_intro in intro_points: + # Match intro points + if hsv3_crypto.pubkeys_are_equal(desc_intro.auth_key, original_intro.auth_key): + original_found = True + self.assertTrue(hsv3_crypto.pubkeys_are_equal(desc_intro.enc_key, original_intro.enc_key)) + self.assertTrue(hsv3_crypto.pubkeys_are_equal(desc_intro.onion_key, original_intro.onion_key)) + + self.assertTrue(original_found)