[tor-commits] [stem/master] Introduce encode-to-decode unittest for v3 descriptors.

atagar at torproject.org atagar at torproject.org
Sun Nov 17 23:40:39 UTC 2019


commit 6a17f072d74718c73be5fb003b98fd3d8babb02d
Author: George Kadianakis <desnacked at riseup.net>
Date:   Thu Oct 10 21:01:01 2019 +0300

    Introduce encode-to-decode unittest for v3 descriptors.
    
    This unittest encodes a v3 descriptor, then decodes it, and checks that the
    parsing went right by checking the parsed attributes (in this case the
    introduction points).
---
 stem/descriptor/hsv3_crypto.py            | 37 ++++++++++++++
 test/unit/descriptor/certificate.py       | 36 ++++++++++++++
 test/unit/descriptor/hidden_service_v3.py | 82 ++++++++++++++++++++++++++++++-
 3 files changed, 153 insertions(+), 2 deletions(-)

diff --git a/stem/descriptor/hsv3_crypto.py b/stem/descriptor/hsv3_crypto.py
index a4fb7bf5..275450c0 100644
--- a/stem/descriptor/hsv3_crypto.py
+++ b/stem/descriptor/hsv3_crypto.py
@@ -11,6 +11,17 @@ from cryptography.hazmat.primitives import serialization
 import stem.descriptor.ed25519_exts_ref as ed25519_exts_ref
 import stem.descriptor.slow_ed25519 as slow_ed25519
 
+def pubkeys_are_equal(pubkey1, pubkey2):
+    """
+    Compare the raw bytes of the two pubkeys and return True if they are the same
+    """
+    pubkey1_bytes = pubkey1.public_bytes(encoding=serialization.Encoding.Raw,
+                                         format=serialization.PublicFormat.Raw)
+    pubkey2_bytes = pubkey2.public_bytes(encoding=serialization.Encoding.Raw,
+                                         format=serialization.PublicFormat.Raw)
+
+    return pubkey1_bytes == pubkey2_bytes
+
 """
 HSv3 Key blinding
 
@@ -72,6 +83,32 @@ def get_subcredential(public_identity_key, blinded_key):
 
     return subcredential
 
+"""
+Onion address
+
+     onion_address = base32(PUBKEY | CHECKSUM | VERSION) + ".onion"
+     CHECKSUM = H(".onion checksum" | PUBKEY | VERSION)[:2]
+
+       - PUBKEY is the 32 bytes ed25519 master pubkey of the hidden service.
+       - VERSION is an one byte version field (default value '\x03')
+       - ".onion checksum" is a constant string
+       - CHECKSUM is truncated to two bytes before inserting it in onion_address
+"""
+CHECKSUM_CONSTANT = b".onion checksum"
+
+def encode_onion_address(ed25519_pub_key_bytes):
+    """
+    Given the public key, return the onion address
+    """
+    version = 3
+    checksum_body = b"%s%s%d" % (CHECKSUM_CONSTANT, ed25519_pub_key_bytes, version)
+    checksum = hashlib.sha3_256(checksum_body).digest()[:2]
+
+    onion_address_bytes = b"%s%s%d" % (ed25519_pub_key_bytes, checksum, version)
+    onion_address = base64.b32encode(onion_address_bytes) + b".onion"
+    assert(len(onion_address) == 56 + len(".onion"))
+
+    return onion_address.lower()
 
 """
 Basic descriptor logic:
diff --git a/test/unit/descriptor/certificate.py b/test/unit/descriptor/certificate.py
index 51960525..29a06b42 100644
--- a/test/unit/descriptor/certificate.py
+++ b/test/unit/descriptor/certificate.py
@@ -15,6 +15,8 @@ import test.require
 from stem.descriptor.certificate import ED25519_SIGNATURE_LENGTH, CertType, ExtensionType, ExtensionFlag, Ed25519Certificate, Ed25519CertificateV1, Ed25519Extension
 from test.unit.descriptor import get_resource
 
+from cryptography.hazmat.primitives import serialization
+
 ED25519_CERT = """
 AQQABhtZAaW2GoBED1IjY3A6f6GNqBEl5A83fD2Za9upGke51JGqAQAgBABnprVR
 ptIr43bWPo2fIzo3uOywfoMrryprpbm4HhCkZMaO064LP+1KNuLvlc8sGG8lTjx1
@@ -193,3 +195,37 @@ class TestEd25519Certificate(unittest.TestCase):
 
     cert = Ed25519Certificate.parse(certificate())
     self.assertRaisesWith(ValueError, 'Ed25519KeyCertificate signing key is invalid (Signature was forged or corrupt)', cert.validate, desc)
+
+  @test.require.ed25519_support
+  def test_encode_decode_certificate(self):
+    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+
+    certified_priv_key = Ed25519PrivateKey.generate()
+    certified_pub_key = certified_priv_key.public_key()
+
+    signing_priv_key = Ed25519PrivateKey.generate()
+
+    expiration_date = datetime.datetime(2037, 8, 28, 17, 0)
+
+    my_ed_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type=CertType.HS_V3_DESC_SIGNING_KEY,
+                                                                  expiration_date=expiration_date,
+                                                                  cert_key_type=1,
+                                                                  certified_pub_key=certified_pub_key,
+                                                                  signing_priv_key=signing_priv_key,
+                                                                  include_signing_key=True)
+
+    ed_cert_bytes = my_ed_cert.encode()
+    self.assertTrue(my_ed_cert)
+
+    # base64 the cert since that's what the parsing func expects
+    ed_cert_bytes_b64 = base64.b64encode(ed_cert_bytes)
+
+    ed_cert_parsed = stem.descriptor.certificate.Ed25519Certificate.parse(ed_cert_bytes_b64)
+
+    self.assertEqual(ed_cert_parsed.type, my_ed_cert.cert_type)
+    self.assertEqual(ed_cert_parsed.expiration, my_ed_cert.expiration_date)
+    self.assertEqual(ed_cert_parsed.key_type, my_ed_cert.cert_key_type)
+    self.assertEqual(ed_cert_parsed.key, my_ed_cert.certified_pub_key.public_bytes(encoding=serialization.Encoding.Raw,
+                                                                                   format=serialization.PublicFormat.Raw))
+    self.assertEqual(ed_cert_parsed.get_signing_key(), my_ed_cert.signing_pub_key.public_bytes(encoding=serialization.Encoding.Raw,
+                                                                                   format=serialization.PublicFormat.Raw))
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 37781b5f..b2a58fbc 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -5,12 +5,19 @@ Unit tests for stem.descriptor.hidden_service for version 3.
 import functools
 import unittest
 
+from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
+from cryptography.hazmat.primitives import serialization
+
 import stem.client.datatype
 import stem.descriptor
 import stem.prereq
 
+import stem.descriptor.hsv3_crypto as hsv3_crypto
+
 from stem.descriptor.hidden_service import (
   REQUIRED_V3_FIELDS,
+  IntroductionPointV3,
   HiddenServiceDescriptorV3,
   OuterLayer,
   InnerLayer,
@@ -44,7 +51,6 @@ with open(get_resource('hidden_service_v3_outer_layer')) as outer_layer_file:
 with open(get_resource('hidden_service_v3_inner_layer')) as inner_layer_file:
   INNER_LAYER_STR = inner_layer_file.read()
 
-
 class TestHiddenServiceDescriptorV3(unittest.TestCase):
   def test_real_descriptor(self):
     """
@@ -145,8 +151,10 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
       'signature': 'signature',
     }
 
+    private_identity_key = Ed25519PrivateKey.generate()
     for line in REQUIRED_V3_FIELDS:
-      desc_text = HiddenServiceDescriptorV3.content(exclude = (line,))
+      desc_text = HiddenServiceDescriptorV3.content(exclude = (line,),
+                                                      ed25519_private_identity_key=private_identity_key)
       expect_invalid_attr_for_text(self, desc_text, line_to_attr[line], None)
 
   def test_invalid_version(self):
@@ -202,3 +210,73 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
     self.assertEqual(b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1', HiddenServiceDescriptorV3._public_key_from_address(HS_ADDRESS))
     self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3._public_key_from_address, 'boom')
     self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3._public_key_from_address, '5' * 56)
+
+  def _helper_get_intro(self):
+    link_specifiers = []
+
+    link1, _ = stem.client.datatype.LinkSpecifier.pop(b'\x03\x20CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC')
+    link_specifiers.append(link1)
+
+    onion_privkey = X25519PrivateKey.generate()
+    onion_pubkey = onion_privkey.public_key()
+
+    auth_privkey = Ed25519PrivateKey.generate()
+    auth_pubkey = auth_privkey.public_key()
+
+    enc_privkey = X25519PrivateKey.generate()
+    enc_pubkey = enc_privkey.public_key()
+
+    intro = IntroductionPointV3(link_specifiers, onion_key=onion_pubkey, enc_key=enc_pubkey, auth_key=auth_pubkey)
+
+    return intro
+
+  def test_encode_decode_descriptor(self):
+    """
+    Encode an HSv3 descriptor and then decode it and make sure you get the intended results.
+
+    This test is from the point of view of the onionbalance, so the object that
+    this test generates is the data that onionbalance also has available when
+    making onion service descriptors.
+    """
+    # Build the service
+    private_identity_key = Ed25519PrivateKey.from_private_bytes(b"a"*32)
+    public_identity_key = private_identity_key.public_key()
+    pubkey_bytes = public_identity_key.public_bytes(encoding=serialization.Encoding.Raw,
+                                                    format=serialization.PublicFormat.Raw)
+
+    onion_address = hsv3_crypto.encode_onion_address(pubkey_bytes).decode()
+
+    # Build the introduction points
+    intro1 = self._helper_get_intro()
+    intro2 = self._helper_get_intro()
+    intro3 = self._helper_get_intro()
+    intro_points = [intro1, intro2, intro3]
+
+    blind_param = bytes.fromhex("677776AE42464CAAB0DF0BF1E68A5FB651A390A6A8243CF4B60EE73A6AC2E4E3")
+
+    # Build the descriptor
+    desc_string = HiddenServiceDescriptorV3.content(ed25519_private_identity_key=private_identity_key,
+                                                    intro_points=intro_points,
+                                                    blinding_param=blind_param)
+    desc_string = desc_string.decode()
+
+    # Parse the descriptor
+    desc = HiddenServiceDescriptorV3.from_str(desc_string)
+    inner_layer = desc.decrypt(onion_address)
+
+    self.assertEqual(len(inner_layer.introduction_points), 3)
+
+    # Match introduction points of the parsed descriptor and the generated
+    # descriptor and do some sanity checks between them to make sure that
+    # parsing was done right!
+    for desc_intro in inner_layer.introduction_points:
+      original_found = False # Make sure we found all the intro points
+
+      for original_intro in intro_points:
+        # Match intro points
+        if hsv3_crypto.pubkeys_are_equal(desc_intro.auth_key, original_intro.auth_key):
+          original_found = True
+          self.assertTrue(hsv3_crypto.pubkeys_are_equal(desc_intro.enc_key, original_intro.enc_key))
+          self.assertTrue(hsv3_crypto.pubkeys_are_equal(desc_intro.onion_key, original_intro.onion_key))
+
+      self.assertTrue(original_found)





More information about the tor-commits mailing list