[tor-commits] [stem/master] HSv3 descriptor creation test

atagar at torproject.org atagar at torproject.org
Sun Nov 17 23:40:39 UTC 2019


commit eca1fe8073c1f43e3f9eab114bcc192f6ca0deac
Author: Damian Johnson <atagar at torproject.org>
Date:   Fri Nov 15 13:22:45 2019 -0800

    HSv3 descriptor creation test
    
    Now that we've revised HiddenServiceDescriptorV3.content() we can test it
    in a similar fashion to inner/outer layers. This provides the same coverage
    as test_encode_decode_descriptor() so that test is now redundant.
    
    Fixing a few bugs this spotted along the way and renaming the
    'public_key_from_address' functions to specify that they take identity keys
    instead (we have so many key types I lost track of what these truly did when I
    originally tacked 'em on).
---
 stem/descriptor/hidden_service.py         | 28 ++++++----
 test/unit/descriptor/hidden_service_v3.py | 88 ++++++++++++++-----------------
 2 files changed, 57 insertions(+), 59 deletions(-)

diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index dd4ab934..cdb8d645 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -957,6 +957,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
 
     blinded_key = stem.descriptor.hsv3_crypto.HSv3PrivateBlindedKey(identity_key, blinding_param = blinding_param)
     subcredential = HiddenServiceDescriptorV3._subcredential(identity_key, blinded_key.blinded_pubkey)
+    custom_sig = attr.pop('signature') if (attr and 'signature' in attr) else None
 
     if not outer_layer:
       outer_layer = OuterLayer.create(
@@ -983,7 +984,9 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
       ('superencrypted', b'\n' + outer_layer._encrypt(revision_counter, subcredential, blinded_key.blinded_pubkey)),
     ), ()) + b'\n'
 
-    if 'signature' not in exclude:
+    if custom_sig:
+      desc_content += b'signature %s' % custom_sig
+    elif 'signature' not in exclude:
       sig_content = stem.descriptor.certificate.SIG_PREFIX_HS_V3 + desc_content
       desc_content += b'signature %s' % base64.b64encode(signing_key.sign(sig_content)).rstrip(b'=')
 
@@ -991,7 +994,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
 
   @classmethod
   def create(cls, attr = None, exclude = (), validate = True, sign = False, inner_layer = None, outer_layer = None, identity_key = None, signing_key = None, signing_cert = None, revision_counter = None, blinding_param = None):
-    return cls(cls.content(attr, exclude, sign, inner_layer, outer_layer, identity_key, signing_key, signing_cert, revision_counter, blinding_param), validate = validate, skip_crypto_validation = not sign)
+    return cls(cls.content(attr, exclude, sign, inner_layer, outer_layer, identity_key, signing_key, signing_cert, revision_counter, blinding_param), validate = validate)
 
   def __init__(self, raw_contents, validate = False):
     super(HiddenServiceDescriptorV3, self).__init__(raw_contents, lazy_load = not validate)
@@ -1045,7 +1048,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
       if not blinded_key:
         raise ValueError('No signing key is present')
 
-      identity_public_key = HiddenServiceDescriptorV3.public_key_from_address(onion_address)
+      identity_public_key = HiddenServiceDescriptorV3.identity_key_from_address(onion_address)
       subcredential = HiddenServiceDescriptorV3._subcredential(identity_public_key, blinded_key)
 
       outer_layer = OuterLayer._decrypt(self.superencrypted, self.revision_counter, subcredential, blinded_key)
@@ -1054,11 +1057,12 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
     return self._inner_layer
 
   @staticmethod
-  def address_from_public_key(pubkey, suffix = True):
+  def address_from_identity_key(key, suffix = True):
     """
-    Converts a hidden service public key into its address.
+    Converts a hidden service identity key into its address. This accepts all
+    key formats (private, public, or public bytes).
 
-    :param bytes pubkey: hidden service public key
+    :param Ed25519PublicKey,Ed25519PrivateKey,bytes key: hidden service identity key
     :param bool suffix: includes the '.onion' suffix if true, excluded otherwise
 
     :returns: **unicode** hidden service address
@@ -1069,20 +1073,22 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
     if not stem.prereq._is_sha3_available():
       raise ImportError('Hidden service address conversion requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
 
+    key = stem.util._pubkey_bytes(key)  # normalize key into bytes
+
     version = stem.client.datatype.Size.CHAR.pack(3)
-    checksum = hashlib.sha3_256(CHECKSUM_CONSTANT + pubkey + version).digest()[:2]
-    onion_address = base64.b32encode(pubkey + checksum + version)
+    checksum = hashlib.sha3_256(CHECKSUM_CONSTANT + key + version).digest()[:2]
+    onion_address = base64.b32encode(key + checksum + version)
 
     return stem.util.str_tools._to_unicode(onion_address + b'.onion' if suffix else onion_address).lower()
 
   @staticmethod
-  def public_key_from_address(onion_address):
+  def identity_key_from_address(onion_address):
     """
-    Converts a hidden service address into its public key.
+    Converts a hidden service address into its public identity key.
 
     :param str onion_address: hidden service address
 
-    :returns: **bytes** for the hidden service's public key
+    :returns: **bytes** for the hidden service's public identity key
 
     :raises:
       * **ImportError** if sha3 unsupported
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 5bfd5cc4..34108188 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -3,7 +3,6 @@ Unit tests for stem.descriptor.hidden_service for version 3.
 """
 
 import base64
-import datetime
 import functools
 import unittest
 
@@ -249,14 +248,14 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
       expect_invalid_attr(self, {'revision-counter': test_value}, 'revision_counter')
 
   @require_sha3
-  def test_address_from_public_key(self):
-    self.assertEqual(HS_ADDRESS, HiddenServiceDescriptorV3.address_from_public_key(HS_PUBKEY))
+  def test_address_from_identity_key(self):
+    self.assertEqual(HS_ADDRESS, HiddenServiceDescriptorV3.address_from_identity_key(HS_PUBKEY))
 
   @require_sha3
-  def test_public_key_from_address(self):
-    self.assertEqual(HS_PUBKEY, HiddenServiceDescriptorV3.public_key_from_address(HS_ADDRESS))
-    self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3.public_key_from_address, 'boom')
-    self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3.public_key_from_address, '5' * 56)
+  def test_identity_key_from_address(self):
+    self.assertEqual(HS_PUBKEY, HiddenServiceDescriptorV3.identity_key_from_address(HS_ADDRESS))
+    self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3.identity_key_from_address, 'boom')
+    self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3.identity_key_from_address, '5' * 56)
 
   def test_intro_point_parse(self):
     """
@@ -430,60 +429,53 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
     self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address)
 
   @test.require.ed25519_support
-  def test_encode_decode_descriptor(self):
+  def test_descriptor_creation(self):
     """
-    Encode an HSv3 descriptor and then decode it and make sure you get the intended results.
-
-    This test is from the point of view of the onionbalance, so the object that
-    this test generates is the data that onionbalance also has available when
-    making onion service descriptors.
+    HiddenServiceDescriptorV3 creation.
     """
 
     if SKIP_SLOW_TESTS:
       return
 
-    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
-    from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
-
-    onion_key = X25519PrivateKey.generate()
-    enc_key = X25519PrivateKey.generate()
-    auth_key = Ed25519PrivateKey.generate()
-    signing_key = Ed25519PrivateKey.generate()
+    # minimal descriptor
 
-    expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54)
+    self.assertTrue(HiddenServiceDescriptorV3.content().startswith('hs-descriptor 3\ndescriptor-lifetime 180\n'))
+    self.assertEqual(180, HiddenServiceDescriptorV3.create().lifetime)
 
-    # Build the service
-    private_identity_key = Ed25519PrivateKey.from_private_bytes(b'a' * 32)
-    public_identity_key = private_identity_key.public_key()
+    # specify the parameters
 
-    onion_address = HiddenServiceDescriptorV3.address_from_public_key(stem.util._pubkey_bytes(public_identity_key))
+    desc = HiddenServiceDescriptorV3.create({
+      'hs-descriptor': '4',
+      'descriptor-lifetime': '123',
+      'descriptor-signing-key-cert': '\n-----BEGIN ED25519 CERT-----\nmalformed block\n-----END ED25519 CERT-----',
+      'revision-counter': '5',
+      'superencrypted': '\n-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----',
+      'signature': 'abcde',
+    }, validate = False)
+
+    self.assertEqual(4, desc.version)
+    self.assertEqual(123, desc.lifetime)
+    self.assertEqual(None, desc.signing_cert)  # malformed cert dropped because validation is disabled
+    self.assertEqual(5, desc.revision_counter)
+    self.assertEqual('-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', desc.superencrypted)
+    self.assertEqual('abcde', desc.signature)
 
-    intro_points = [
-      IntroductionPointV3.create('1.1.1.1', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
-      IntroductionPointV3.create('2.2.2.2', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
-      IntroductionPointV3.create('3.3.3.3', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
-    ]
+    # include introduction points
 
-    # TODO: replace with bytes.fromhex() when we drop python 2.x support
+    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
 
-    blind_param = bytearray.fromhex('677776AE42464CAAB0DF0BF1E68A5FB651A390A6A8243CF4B60EE73A6AC2E4E3')
+    identity_key = Ed25519PrivateKey.generate()
+    onion_address = HiddenServiceDescriptorV3.address_from_identity_key(identity_key)
 
-    # Build the descriptor
-    desc_string = HiddenServiceDescriptorV3.content(identity_key = private_identity_key, inner_layer = InnerLayer.create(introduction_points = intro_points), blinding_param = blind_param)
+    desc = HiddenServiceDescriptorV3.create(
+      identity_key = identity_key,
+      inner_layer = InnerLayer.create(introduction_points = [
+        IntroductionPointV3.create('1.1.1.1', 9001),
+        IntroductionPointV3.create('2.2.2.2', 9001),
+        IntroductionPointV3.create('3.3.3.3', 9001),
+      ]),
+    )
 
-    # Parse the descriptor
-    desc = HiddenServiceDescriptorV3.from_str(desc_string)
     inner_layer = desc.decrypt(onion_address)
-
     self.assertEqual(3, len(inner_layer.introduction_points))
-
-    for i, intro_point in enumerate(inner_layer.introduction_points):
-      original = intro_points[i]
-
-      self.assertEqual(original.enc_key_raw, intro_point.enc_key_raw)
-      self.assertEqual(original.onion_key_raw, intro_point.onion_key_raw)
-      self.assertEqual(original.auth_key_cert.key, intro_point.auth_key_cert.key)
-
-      self.assertEqual(intro_point.enc_key_raw, base64.b64encode(stem.util._pubkey_bytes(intro_point.enc_key())))
-      self.assertEqual(intro_point.onion_key_raw, base64.b64encode(stem.util._pubkey_bytes(intro_point.onion_key())))
-      self.assertEqual(intro_point.auth_key_cert.key, stem.util._pubkey_bytes(intro_point.auth_key()))
+    self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address)





More information about the tor-commits mailing list