[tor-commits] [stem/master] Simplified introduction point constructor

atagar at torproject.org atagar at torproject.org
Sun Nov 17 23:40:39 UTC 2019


commit 76fe67b0f377ffe340ea77b79e1055012ed7fc56
Author: Damian Johnson <atagar at torproject.org>
Date:   Mon Nov 4 15:39:25 2019 -0800

    Simplified introduction point constructor
    
    Our test's _helper_get_intro() provided a good recipie for creating
    introduction points. Productionizing this into a helper we can provide
    to make these without worrying about too many details.
---
 stem/descriptor/hidden_service.py         | 164 +++++++++++++++++++++++++-----
 stem/prereq.py                            |   3 +
 test/unit/descriptor/hidden_service_v3.py | 147 ++++++++------------------
 3 files changed, 189 insertions(+), 125 deletions(-)

diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index 9168123d..70ca6d25 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -41,12 +41,14 @@ import time
 
 import stem.client.datatype
 import stem.descriptor.certificate
+import stem.descriptor.hsv3_crypto
 import stem.prereq
 import stem.util.connection
 import stem.util.str_tools
 import stem.util.tor_tools
 
-from stem.descriptor import hsv3_crypto
+from stem.client.datatype import CertType
+from stem.descriptor.certificate import ExtensionType, Ed25519Extension, Ed25519Certificate, Ed25519CertificateV1
 
 from stem.descriptor import (
   PGP_BLOCK_END,
@@ -183,7 +185,7 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
     onion_key = onion_key_line[5:] if onion_key_line.startswith('ntor ') else None
 
     _, block_type, auth_key_cert = entry['auth-key'][0]
-    auth_key_cert = stem.descriptor.certificate.Ed25519Certificate.from_base64(auth_key_cert)
+    auth_key_cert = Ed25519Certificate.from_base64(auth_key_cert)
 
     if block_type != 'ED25519 CERT':
       raise ValueError('Expected auth-key to have an ed25519 certificate, but was %s' % block_type)
@@ -192,7 +194,7 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
     enc_key = enc_key_line[5:] if enc_key_line.startswith('ntor ') else None
 
     _, block_type, enc_key_cert = entry['enc-key-cert'][0]
-    enc_key_cert = stem.descriptor.certificate.Ed25519Certificate.from_base64(enc_key_cert)
+    enc_key_cert = Ed25519Certificate.from_base64(enc_key_cert)
 
     if block_type != 'ED25519 CERT':
       raise ValueError('Expected enc-key-cert to have an ed25519 certificate, but was %s' % block_type)
@@ -202,6 +204,64 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
 
     return IntroductionPointV3(link_specifiers, onion_key, auth_key_cert, enc_key, enc_key_cert, legacy_key, legacy_key_cert)
 
+  @staticmethod
+  def create(address, port, expiration, onion_key, enc_key, auth_key, signing_key):
+    """
+    Simplified constructor. For more sophisticated use cases you can use this
+    as a template for how introduction points are properly created.
+
+    :param str address: IPv4 or IPv6 address where the service is reachable
+    :param int port: port where the service is reachable
+    :param datetime.datetime expiration: when certificates should expire
+    :param str onion_key: encoded, X25519PublicKey, or X25519PrivateKey onion key
+    :param str enc_key: encoded, X25519PublicKey, or X25519PrivateKey encryption key
+    :param str auth_key: encoded, Ed25519PublicKey, or Ed25519PrivateKey authentication key
+    :param cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey signing_key: service signing key
+
+    :returns: :class:`~stem.descriptor.hidden_service.IntroductionPointV3` with these attributes
+
+    :raises: **ValueError** if the address, port, or keys are malformed
+    """
+
+    def _key_bytes(key):
+      class_name = type(key).__name__
+
+      if isinstance(key, str):
+        return key
+      elif not class_name.endswith('PublicKey') and not class_name.endswith('PrivateKey'):
+        raise ValueError('Key must be a string or cryptographic public/private key (was %s)' % class_name)
+      elif not stem.prereq.is_crypto_available():
+        raise ImportError('Serializing keys requires the cryptography module')
+
+      from cryptography.hazmat.primitives import serialization
+
+      if class_name.endswith('PrivateKey'):
+        key = key.public_key()
+
+      return key.public_bytes(
+        encoding = serialization.Encoding.Raw,
+        format = serialization.PublicFormat.Raw,
+      )
+
+    if not stem.util.connection.is_valid_port(port):
+      raise ValueError("'%s' is an invalid port" % port)
+
+    if stem.util.connection.is_valid_ipv4_address(address):
+      link_specifiers = [stem.client.datatype.LinkByIPv4(address, port)]
+    elif stem.util.connection.is_valid_ipv6_address(address):
+      link_specifiers = [stem.client.datatype.LinkByIPv6(address, port)]
+    else:
+      raise ValueError("'%s' is not a valid IPv4 or IPv6 address" % address)
+
+    onion_key = base64.b64encode(_key_bytes(onion_key))
+    enc_key = base64.b64encode(_key_bytes(enc_key))
+
+    extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, _key_bytes(signing_key))]
+    auth_key_cert = Ed25519CertificateV1(CertType.HS_V3_INTRO_AUTH, expiration, 1, _key_bytes(auth_key), extensions, signing_key = signing_key)
+    enc_key_cert = Ed25519CertificateV1(CertType.HS_V3_NTOR_ENC, expiration, 1, _key_bytes(auth_key), extensions, signing_key = signing_key)
+
+    return IntroductionPointV3(link_specifiers, onion_key, auth_key_cert, enc_key, enc_key_cert, None, None)
+
   def encode(self):
     """
     Descriptor representation of this introduction point.
@@ -244,7 +304,20 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
       * **EnvironmentError** if OpenSSL x25519 unsupported
     """
 
-    return IntroductionPointV3._parse_key(self.onion_key_raw)
+    return IntroductionPointV3._key_as(self.onion_key_raw, x25519 = True)
+
+  def auth_key(self):
+    """
+    Provides our authentication certificate's public key.
+
+    :returns: :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey`
+
+    :raises:
+      * **ImportError** if required the cryptography module is unavailable
+      * **EnvironmentError** if OpenSSL x25519 unsupported
+    """
+
+    return IntroductionPointV3._key_as(self.auth_key_cert.key, ed25519 = True)
 
   def enc_key(self):
     """
@@ -257,7 +330,7 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
       * **EnvironmentError** if OpenSSL x25519 unsupported
     """
 
-    return IntroductionPointV3._parse_key(self.enc_key_raw)
+    return IntroductionPointV3._key_as(self.enc_key_raw, x25519 = True)
 
   def legacy_key(self):
     """
@@ -270,23 +343,31 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
       * **EnvironmentError** if OpenSSL x25519 unsupported
     """
 
-    return IntroductionPointV3._parse_key(self.legacy_key_raw)
+    return IntroductionPointV3._key_as(self.legacy_key_raw, x25519 = True)
 
   @staticmethod
-  def _parse_key(value):
-    if value is None:
+  def _key_as(value, x25519 = False, ed25519 = False):
+    if value is None or (not x25519 and not ed25519):
       return value
     elif not stem.prereq.is_crypto_available():
       raise ImportError('cryptography module unavailable')
-    elif not X25519_AVAILABLE:
-      # without this the cryptography raises...
-      # cryptography.exceptions.UnsupportedAlgorithm: X25519 is not supported by this version of OpenSSL.
 
-      raise EnvironmentError('OpenSSL x25519 unsupported')
+    if x25519:
+      if not X25519_AVAILABLE:
+        # without this the cryptography raises...
+        # cryptography.exceptions.UnsupportedAlgorithm: X25519 is not supported by this version of OpenSSL.
+
+        raise EnvironmentError('OpenSSL x25519 unsupported')
 
-    from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PublicKey
+      from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PublicKey
+      return X25519PublicKey.from_public_bytes(base64.b64decode(value))
 
-    return X25519PublicKey.from_public_bytes(base64.b64decode(value))
+    if ed25519:
+      if not stem.prereq.is_crypto_available(ed25519 = True):
+        raise EnvironmentError('cryptography ed25519 unsupported')
+
+      from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
+      return Ed25519PublicKey.from_public_bytes(value)
 
   @staticmethod
   def _parse_link_specifiers(content):
@@ -482,7 +563,7 @@ _parse_v2_signature_line = _parse_key_block('signature', 'signature', 'SIGNATURE
 
 _parse_v3_version_line = _parse_int_line('hs-descriptor', 'version', allow_negative = False)
 _parse_lifetime_line = _parse_int_line('descriptor-lifetime', 'lifetime', allow_negative = False)
-_parse_signing_cert = stem.descriptor.certificate.Ed25519Certificate._from_descriptor('descriptor-signing-key-cert', 'signing_cert')
+_parse_signing_cert = Ed25519Certificate._from_descriptor('descriptor-signing-key-cert', 'signing_cert')
 _parse_revision_counter_line = _parse_int_line('revision-counter', 'revision_counter', allow_negative = False)
 _parse_superencrypted_line = _parse_key_block('superencrypted', 'superencrypted', 'MESSAGE')
 _parse_v3_signature_line = _parse_simple_line('signature', 'signature')
@@ -779,9 +860,9 @@ def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key
   expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54)
 
   signing_key = descriptor_signing_public_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
-  extensions = [stem.descriptor.certificate.Ed25519Extension(stem.descriptor.certificate.ExtensionType.HAS_SIGNING_KEY, None, blinded_priv_key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))]
+  extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, blinded_priv_key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))]
 
-  desc_signing_cert = stem.descriptor.certificate.Ed25519CertificateV1(stem.client.datatype.CertType.HS_V3_DESC_SIGNING, expiration_date, 1, signing_key, extensions, signing_key = blinded_priv_key)
+  desc_signing_cert = Ed25519CertificateV1(CertType.HS_V3_DESC_SIGNING, expiration_date, 1, signing_key, extensions, signing_key = blinded_priv_key)
 
   return '\n' + desc_signing_cert.to_base64(pem = True)
 
@@ -864,11 +945,11 @@ def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_
   """
 
   inner_descriptor_layer = stem.util.str_tools._to_bytes(_get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey))
-  inner_ciphertext = hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
+  inner_ciphertext = stem.descriptor.hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
   inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b'encrypted')
 
   middle_descriptor_layer = _get_middle_descriptor_layer_body(inner_ciphertext_b64)
-  outter_ciphertext = hsv3_crypto.encrypt_outter_layer(middle_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
+  outter_ciphertext = stem.descriptor.hsv3_crypto.encrypt_outter_layer(middle_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
 
   return b64_and_wrap_desc_layer(outter_ciphertext)
 
@@ -898,6 +979,8 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
 
   **\\*** attribute is either required when we're parsed with validation or has
   a default value, others are left as **None** if undefined
+
+  .. versionadded:: 1.8.0
   """
 
   # TODO: requested this @type on https://trac.torproject.org/projects/tor/ticket/31481
@@ -973,7 +1056,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
     public_identity_key_bytes = public_identity_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
 
     # Blind the identity key to get ephemeral blinded key
-    blinded_privkey = hsv3_crypto.HSv3PrivateBlindedKey(ed25519_private_identity_key, blinding_param = blinding_param)
+    blinded_privkey = stem.descriptor.hsv3_crypto.HSv3PrivateBlindedKey(ed25519_private_identity_key, blinding_param = blinding_param)
     blinded_pubkey = blinded_privkey.public_key()
     blinded_pubkey_bytes = blinded_pubkey.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
 
@@ -1063,7 +1146,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
       if not blinded_key:
         raise ValueError('No signing key is present')
 
-      identity_public_key = HiddenServiceDescriptorV3._public_key_from_address(onion_address)
+      identity_public_key = HiddenServiceDescriptorV3.public_key_from_address(onion_address)
       subcredential = HiddenServiceDescriptorV3._subcredential(identity_public_key, blinded_key)
 
       outer_layer = OuterLayer._decrypt(self.superencrypted, self.revision_counter, subcredential, blinded_key)
@@ -1072,8 +1155,43 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
     return self._inner_layer
 
   @staticmethod
-  def _public_key_from_address(onion_address):
-    # provides our hidden service ed25519 public key
+  def address_from_public_key(pubkey, suffix = True):
+    """
+    Converts a hidden service public key into its address.
+
+    :param bytes pubkey: hidden service public key
+    :param bool suffix: includes the '.onion' suffix if true, excluded otherwise
+
+    :returns: **unicode** hidden service address
+
+    :raises: **ImportError** if sha3 unsupported
+    """
+
+    if not stem.prereq._is_sha3_available():
+      raise ImportError('Hidden service address conversion requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
+
+    version = stem.client.datatype.Size.CHAR.pack(3)
+    checksum = hashlib.sha3_256(CHECKSUM_CONSTANT + pubkey + version).digest()[:2]
+    onion_address = base64.b32encode(pubkey + checksum + version)
+
+    return stem.util.str_tools._to_unicode(onion_address + b'.onion' if suffix else onion_address).lower()
+
+  @staticmethod
+  def public_key_from_address(onion_address):
+    """
+    Converts a hidden service address into its public key.
+
+    :param str onion_address: hidden service address
+
+    :returns: **bytes** for the hidden service's public key
+
+    :raises:
+      * **ImportError** if sha3 unsupported
+      * **ValueError** if address malformed or checksum is invalid
+    """
+
+    if not stem.prereq._is_sha3_available():
+      raise ImportError('Hidden service address conversion requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
 
     if onion_address.endswith('.onion'):
       onion_address = onion_address[:-6]
diff --git a/stem/prereq.py b/stem/prereq.py
index 40c2f006..4af6c093 100644
--- a/stem/prereq.py
+++ b/stem/prereq.py
@@ -26,6 +26,9 @@ import inspect
 import platform
 import sys
 
+# TODO: in stem 2.x consider replacing these functions with requirement
+# annotations (like our tests)
+
 CRYPTO_UNAVAILABLE = "Unable to import the cryptography module. Because of this we'll be unable to verify descriptor signature integrity. You can get cryptography from: https://pypi.org/project/cryptography/"
 ZSTD_UNAVAILABLE = 'ZSTD compression requires the zstandard module (https://pypi.org/project/zstandard/)'
 LZMA_UNAVAILABLE = 'LZMA compression requires the lzma module (https://docs.python.org/3/library/lzma.html)'
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 14df30ec..61752e11 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -5,34 +5,28 @@ Unit tests for stem.descriptor.hidden_service for version 3.
 import base64
 import datetime
 import functools
-import hashlib
 import unittest
 
 import stem.client.datatype
 import stem.descriptor
+import stem.descriptor.hidden_service
 import stem.prereq
 
 import test.require
 
-from stem.client.datatype import CertType, LinkByIPv4
-from stem.descriptor.certificate import ExtensionType, Ed25519Extension, Ed25519CertificateV1
+from test.unit.descriptor import (
+  get_resource,
+  base_expect_invalid_attr,
+  base_expect_invalid_attr_for_text,
+)
 
 from stem.descriptor.hidden_service import (
-  CHECKSUM_CONSTANT,
-  REQUIRED_V3_FIELDS,
-  X25519_AVAILABLE,
   IntroductionPointV3,
   HiddenServiceDescriptorV3,
   OuterLayer,
   InnerLayer,
 )
 
-from test.unit.descriptor import (
-  get_resource,
-  base_expect_invalid_attr,
-  base_expect_invalid_attr_for_text,
-)
-
 try:
   # added in python 3.3
   from unittest.mock import patch, Mock
@@ -40,12 +34,13 @@ except ImportError:
   from mock import patch, Mock
 
 require_sha3 = test.require.needs(stem.prereq._is_sha3_available, 'requires sha3')
-require_x25519 = test.require.needs(lambda: X25519_AVAILABLE, 'requires openssl x5509')
+require_x25519 = test.require.needs(lambda: stem.descriptor.hidden_service.X25519_AVAILABLE, 'requires openssl x5509')
 
 expect_invalid_attr = functools.partial(base_expect_invalid_attr, HiddenServiceDescriptorV3, 'version', 3)
 expect_invalid_attr_for_text = functools.partial(base_expect_invalid_attr_for_text, HiddenServiceDescriptorV3, 'version', 3)
 
-HS_ADDRESS = 'sltib6sxkuxh2scmtuvd5w2g7pahnzkovefxpo4e4ptnkzl5kkq5h2ad.onion'
+HS_ADDRESS = u'sltib6sxkuxh2scmtuvd5w2g7pahnzkovefxpo4e4ptnkzl5kkq5h2ad.onion'
+HS_PUBKEY = b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1'
 
 EXPECTED_SIGNING_CERT = """\
 -----BEGIN ED25519 CERT-----
@@ -68,58 +63,9 @@ with open(get_resource('hidden_service_v3_intro_point')) as intro_point_file:
   INTRO_POINT_STR = intro_point_file.read()
 
 
-def _pubkeys_are_equal(pubkey1, pubkey2):
-  """
-  Compare the raw bytes of the two pubkeys and return True if they are the same
-  """
-
-  from cryptography.hazmat.primitives import serialization
-
-  pubkey1_bytes = pubkey1.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
-  pubkey2_bytes = pubkey2.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
-
-  return pubkey1_bytes == pubkey2_bytes
-
-
-def _encode_onion_address(ed25519_pub_key_bytes):
-  """
-  Given the public key, return the onion address
-  """
-
-  if not stem.prereq._is_sha3_available():
-    raise ImportError('Encoding onion addresses requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
-
-  version = 3
-  checksum_body = b'%s%s%d' % (CHECKSUM_CONSTANT, ed25519_pub_key_bytes, version)
-  checksum = hashlib.sha3_256(checksum_body).digest()[:2]
-
-  onion_address_bytes = b'%s%s%d' % (ed25519_pub_key_bytes, checksum, version)
-  onion_address = base64.b32encode(onion_address_bytes) + b'.onion'
-  assert(len(onion_address) == 56 + len('.onion'))
-
-  return onion_address.lower()
-
-
-def _helper_get_intro():
-  from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
-  from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
+def key_bytes(key):
   from cryptography.hazmat.primitives import serialization
-
-  def public_key(key):
-    return key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
-
-  onion_key = public_key(X25519PrivateKey.generate())
-  enc_key = public_key(X25519PrivateKey.generate())
-  auth_key = public_key(Ed25519PrivateKey.generate())
-  signing_key = Ed25519PrivateKey.generate()
-
-  expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54)
-  extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, public_key(signing_key))]
-
-  auth_key_cert = Ed25519CertificateV1(CertType.HS_V3_INTRO_AUTH, expiration, 1, auth_key, extensions, signing_key = signing_key)
-  enc_key_cert = Ed25519CertificateV1(CertType.HS_V3_NTOR_ENC, expiration, 1, auth_key, extensions, signing_key = signing_key)
-
-  return IntroductionPointV3([LinkByIPv4('1.2.3.4', 9001)], base64.b64encode(onion_key), auth_key_cert, base64.b64encode(enc_key), enc_key_cert, None, None)
+  return key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
 
 
 class TestHiddenServiceDescriptorV3(unittest.TestCase):
@@ -221,7 +167,7 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
       'signature': 'signature',
     }
 
-    for line in REQUIRED_V3_FIELDS:
+    for line in stem.descriptor.hidden_service.REQUIRED_V3_FIELDS:
       desc_text = HiddenServiceDescriptorV3.content(exclude = (line,))
       expect_invalid_attr_for_text(self, desc_text, line_to_attr[line], None)
 
@@ -268,11 +214,14 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
       expect_invalid_attr(self, {'revision-counter': test_value}, 'revision_counter')
 
   @require_sha3
-  @test.require.ed25519_support
+  def test_address_from_public_key(self):
+    self.assertEqual(HS_ADDRESS, HiddenServiceDescriptorV3.address_from_public_key(HS_PUBKEY))
+
+  @require_sha3
   def test_public_key_from_address(self):
-    self.assertEqual(b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1', HiddenServiceDescriptorV3._public_key_from_address(HS_ADDRESS))
-    self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3._public_key_from_address, 'boom')
-    self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3._public_key_from_address, '5' * 56)
+    self.assertEqual(HS_PUBKEY, HiddenServiceDescriptorV3.public_key_from_address(HS_ADDRESS))
+    self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3.public_key_from_address, 'boom')
+    self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3.public_key_from_address, '5' * 56)
 
   def test_intro_point_parse(self):
     """
@@ -304,7 +253,6 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
     """
 
     from cryptography.hazmat.backends.openssl.x25519 import X25519PublicKey
-    from cryptography.hazmat.primitives import serialization
 
     intro_point = InnerLayer(INNER_LAYER_STR).introduction_points[0]
 
@@ -314,15 +262,8 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
     self.assertTrue(isinstance(intro_point.onion_key(), X25519PublicKey))
     self.assertTrue(isinstance(intro_point.enc_key(), X25519PublicKey))
 
-    self.assertEqual(intro_point.onion_key_raw, base64.b64encode(intro_point.onion_key().public_bytes(
-      encoding = serialization.Encoding.Raw,
-      format = serialization.PublicFormat.Raw,
-    )))
-
-    self.assertEqual(intro_point.enc_key_raw, base64.b64encode(intro_point.enc_key().public_bytes(
-      encoding = serialization.Encoding.Raw,
-      format = serialization.PublicFormat.Raw,
-    )))
+    self.assertEqual(intro_point.onion_key_raw, base64.b64encode(key_bytes(intro_point.onion_key())))
+    self.assertEqual(intro_point.enc_key_raw, base64.b64encode(key_bytes(intro_point.enc_key())))
 
     self.assertEqual(None, intro_point.legacy_key_raw)
     self.assertEqual(None, intro_point.legacy_key())
@@ -346,21 +287,28 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
     making onion service descriptors.
     """
 
-    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey, Ed25519PrivateKey
-    from cryptography.hazmat.primitives import serialization
+    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+    from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
+
+    onion_key = X25519PrivateKey.generate()
+    enc_key = X25519PrivateKey.generate()
+    auth_key = Ed25519PrivateKey.generate()
+    signing_key = Ed25519PrivateKey.generate()
+
+    expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54)
 
     # Build the service
     private_identity_key = Ed25519PrivateKey.from_private_bytes(b'a' * 32)
     public_identity_key = private_identity_key.public_key()
-    pubkey_bytes = public_identity_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
+    pubkey_bytes = key_bytes(public_identity_key)
 
-    onion_address = _encode_onion_address(pubkey_bytes).decode()
+    onion_address = HiddenServiceDescriptorV3.address_from_public_key(pubkey_bytes)
 
-    # Build the introduction points
-    intro1 = _helper_get_intro()
-    intro2 = _helper_get_intro()
-    intro3 = _helper_get_intro()
-    intro_points = [intro1, intro2, intro3]
+    intro_points = [
+      IntroductionPointV3.create('1.1.1.1', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
+      IntroductionPointV3.create('2.2.2.2', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
+      IntroductionPointV3.create('3.3.3.3', 9001, expiration, onion_key, enc_key, auth_key, signing_key),
+    ]
 
     # TODO: replace with bytes.fromhex() when we drop python 2.x support
 
@@ -368,7 +316,6 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
 
     # Build the descriptor
     desc_string = HiddenServiceDescriptorV3.content(ed25519_private_identity_key = private_identity_key, intro_points = intro_points, blinding_param = blind_param)
-    desc_string = desc_string.decode()
 
     # Parse the descriptor
     desc = HiddenServiceDescriptorV3.from_str(desc_string)
@@ -376,17 +323,13 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
 
     self.assertEqual(3, len(inner_layer.introduction_points))
 
-    # Match introduction points of the parsed descriptor and the generated
-    # descriptor and do some sanity checks between them to make sure that
-    # parsing was done right!
-
-    for i, desc_intro in enumerate(inner_layer.introduction_points):
-      original_intro = intro_points[i]
-
-      auth_key_1 = Ed25519PublicKey.from_public_bytes(desc_intro.auth_key_cert.key)
-      auth_key_2 = Ed25519PublicKey.from_public_bytes(original_intro.auth_key_cert.key)
+    for i, intro_point in enumerate(inner_layer.introduction_points):
+      original = intro_points[i]
 
-      self.assertTrue(_pubkeys_are_equal(desc_intro.enc_key(), original_intro.enc_key()))
-      self.assertTrue(_pubkeys_are_equal(desc_intro.onion_key(), original_intro.onion_key()))
+      self.assertEqual(original.enc_key_raw, intro_point.enc_key_raw)
+      self.assertEqual(original.onion_key_raw, intro_point.onion_key_raw)
+      self.assertEqual(original.auth_key_cert.key, intro_point.auth_key_cert.key)
 
-      self.assertTrue(_pubkeys_are_equal(auth_key_1, auth_key_2))
+      self.assertEqual(intro_point.enc_key_raw, base64.b64encode(key_bytes(intro_point.enc_key())))
+      self.assertEqual(intro_point.onion_key_raw, base64.b64encode(key_bytes(intro_point.onion_key())))
+      self.assertEqual(intro_point.auth_key_cert.key, key_bytes(intro_point.auth_key()))





More information about the tor-commits mailing list