commit 76fe67b0f377ffe340ea77b79e1055012ed7fc56 Author: Damian Johnson atagar@torproject.org Date: Mon Nov 4 15:39:25 2019 -0800
Simplified introduction point constructor
Our test's _helper_get_intro() provided a good recipie for creating introduction points. Productionizing this into a helper we can provide to make these without worrying about too many details. --- stem/descriptor/hidden_service.py | 164 +++++++++++++++++++++++++----- stem/prereq.py | 3 + test/unit/descriptor/hidden_service_v3.py | 147 ++++++++------------------ 3 files changed, 189 insertions(+), 125 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py index 9168123d..70ca6d25 100644 --- a/stem/descriptor/hidden_service.py +++ b/stem/descriptor/hidden_service.py @@ -41,12 +41,14 @@ import time
import stem.client.datatype import stem.descriptor.certificate +import stem.descriptor.hsv3_crypto import stem.prereq import stem.util.connection import stem.util.str_tools import stem.util.tor_tools
-from stem.descriptor import hsv3_crypto +from stem.client.datatype import CertType +from stem.descriptor.certificate import ExtensionType, Ed25519Extension, Ed25519Certificate, Ed25519CertificateV1
from stem.descriptor import ( PGP_BLOCK_END, @@ -183,7 +185,7 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s onion_key = onion_key_line[5:] if onion_key_line.startswith('ntor ') else None
_, block_type, auth_key_cert = entry['auth-key'][0] - auth_key_cert = stem.descriptor.certificate.Ed25519Certificate.from_base64(auth_key_cert) + auth_key_cert = Ed25519Certificate.from_base64(auth_key_cert)
if block_type != 'ED25519 CERT': raise ValueError('Expected auth-key to have an ed25519 certificate, but was %s' % block_type) @@ -192,7 +194,7 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s enc_key = enc_key_line[5:] if enc_key_line.startswith('ntor ') else None
_, block_type, enc_key_cert = entry['enc-key-cert'][0] - enc_key_cert = stem.descriptor.certificate.Ed25519Certificate.from_base64(enc_key_cert) + enc_key_cert = Ed25519Certificate.from_base64(enc_key_cert)
if block_type != 'ED25519 CERT': raise ValueError('Expected enc-key-cert to have an ed25519 certificate, but was %s' % block_type) @@ -202,6 +204,64 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
return IntroductionPointV3(link_specifiers, onion_key, auth_key_cert, enc_key, enc_key_cert, legacy_key, legacy_key_cert)
+ @staticmethod + def create(address, port, expiration, onion_key, enc_key, auth_key, signing_key): + """ + Simplified constructor. For more sophisticated use cases you can use this + as a template for how introduction points are properly created. + + :param str address: IPv4 or IPv6 address where the service is reachable + :param int port: port where the service is reachable + :param datetime.datetime expiration: when certificates should expire + :param str onion_key: encoded, X25519PublicKey, or X25519PrivateKey onion key + :param str enc_key: encoded, X25519PublicKey, or X25519PrivateKey encryption key + :param str auth_key: encoded, Ed25519PublicKey, or Ed25519PrivateKey authentication key + :param cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey signing_key: service signing key + + :returns: :class:`~stem.descriptor.hidden_service.IntroductionPointV3` with these attributes + + :raises: **ValueError** if the address, port, or keys are malformed + """ + + def _key_bytes(key): + class_name = type(key).__name__ + + if isinstance(key, str): + return key + elif not class_name.endswith('PublicKey') and not class_name.endswith('PrivateKey'): + raise ValueError('Key must be a string or cryptographic public/private key (was %s)' % class_name) + elif not stem.prereq.is_crypto_available(): + raise ImportError('Serializing keys requires the cryptography module') + + from cryptography.hazmat.primitives import serialization + + if class_name.endswith('PrivateKey'): + key = key.public_key() + + return key.public_bytes( + encoding = serialization.Encoding.Raw, + format = serialization.PublicFormat.Raw, + ) + + if not stem.util.connection.is_valid_port(port): + raise ValueError("'%s' is an invalid port" % port) + + if stem.util.connection.is_valid_ipv4_address(address): + link_specifiers = [stem.client.datatype.LinkByIPv4(address, port)] + elif stem.util.connection.is_valid_ipv6_address(address): + link_specifiers = [stem.client.datatype.LinkByIPv6(address, port)] + else: + raise ValueError("'%s' is not a valid IPv4 or IPv6 address" % address) + + onion_key = base64.b64encode(_key_bytes(onion_key)) + enc_key = base64.b64encode(_key_bytes(enc_key)) + + extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, _key_bytes(signing_key))] + auth_key_cert = Ed25519CertificateV1(CertType.HS_V3_INTRO_AUTH, expiration, 1, _key_bytes(auth_key), extensions, signing_key = signing_key) + enc_key_cert = Ed25519CertificateV1(CertType.HS_V3_NTOR_ENC, expiration, 1, _key_bytes(auth_key), extensions, signing_key = signing_key) + + return IntroductionPointV3(link_specifiers, onion_key, auth_key_cert, enc_key, enc_key_cert, None, None) + def encode(self): """ Descriptor representation of this introduction point. @@ -244,7 +304,20 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s * **EnvironmentError** if OpenSSL x25519 unsupported """
- return IntroductionPointV3._parse_key(self.onion_key_raw) + return IntroductionPointV3._key_as(self.onion_key_raw, x25519 = True) + + def auth_key(self): + """ + Provides our authentication certificate's public key. + + :returns: :class:`~cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey` + + :raises: + * **ImportError** if required the cryptography module is unavailable + * **EnvironmentError** if OpenSSL x25519 unsupported + """ + + return IntroductionPointV3._key_as(self.auth_key_cert.key, ed25519 = True)
def enc_key(self): """ @@ -257,7 +330,7 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s * **EnvironmentError** if OpenSSL x25519 unsupported """
- return IntroductionPointV3._parse_key(self.enc_key_raw) + return IntroductionPointV3._key_as(self.enc_key_raw, x25519 = True)
def legacy_key(self): """ @@ -270,23 +343,31 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s * **EnvironmentError** if OpenSSL x25519 unsupported """
- return IntroductionPointV3._parse_key(self.legacy_key_raw) + return IntroductionPointV3._key_as(self.legacy_key_raw, x25519 = True)
@staticmethod - def _parse_key(value): - if value is None: + def _key_as(value, x25519 = False, ed25519 = False): + if value is None or (not x25519 and not ed25519): return value elif not stem.prereq.is_crypto_available(): raise ImportError('cryptography module unavailable') - elif not X25519_AVAILABLE: - # without this the cryptography raises... - # cryptography.exceptions.UnsupportedAlgorithm: X25519 is not supported by this version of OpenSSL.
- raise EnvironmentError('OpenSSL x25519 unsupported') + if x25519: + if not X25519_AVAILABLE: + # without this the cryptography raises... + # cryptography.exceptions.UnsupportedAlgorithm: X25519 is not supported by this version of OpenSSL. + + raise EnvironmentError('OpenSSL x25519 unsupported')
- from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PublicKey + from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PublicKey + return X25519PublicKey.from_public_bytes(base64.b64decode(value))
- return X25519PublicKey.from_public_bytes(base64.b64decode(value)) + if ed25519: + if not stem.prereq.is_crypto_available(ed25519 = True): + raise EnvironmentError('cryptography ed25519 unsupported') + + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + return Ed25519PublicKey.from_public_bytes(value)
@staticmethod def _parse_link_specifiers(content): @@ -482,7 +563,7 @@ _parse_v2_signature_line = _parse_key_block('signature', 'signature', 'SIGNATURE
_parse_v3_version_line = _parse_int_line('hs-descriptor', 'version', allow_negative = False) _parse_lifetime_line = _parse_int_line('descriptor-lifetime', 'lifetime', allow_negative = False) -_parse_signing_cert = stem.descriptor.certificate.Ed25519Certificate._from_descriptor('descriptor-signing-key-cert', 'signing_cert') +_parse_signing_cert = Ed25519Certificate._from_descriptor('descriptor-signing-key-cert', 'signing_cert') _parse_revision_counter_line = _parse_int_line('revision-counter', 'revision_counter', allow_negative = False) _parse_superencrypted_line = _parse_key_block('superencrypted', 'superencrypted', 'MESSAGE') _parse_v3_signature_line = _parse_simple_line('signature', 'signature') @@ -779,9 +860,9 @@ def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54)
signing_key = descriptor_signing_public_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) - extensions = [stem.descriptor.certificate.Ed25519Extension(stem.descriptor.certificate.ExtensionType.HAS_SIGNING_KEY, None, blinded_priv_key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))] + extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, blinded_priv_key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))]
- desc_signing_cert = stem.descriptor.certificate.Ed25519CertificateV1(stem.client.datatype.CertType.HS_V3_DESC_SIGNING, expiration_date, 1, signing_key, extensions, signing_key = blinded_priv_key) + desc_signing_cert = Ed25519CertificateV1(CertType.HS_V3_DESC_SIGNING, expiration_date, 1, signing_key, extensions, signing_key = blinded_priv_key)
return '\n' + desc_signing_cert.to_base64(pem = True)
@@ -864,11 +945,11 @@ def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_ """
inner_descriptor_layer = stem.util.str_tools._to_bytes(_get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey)) - inner_ciphertext = hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential) + inner_ciphertext = stem.descriptor.hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential) inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b'encrypted')
middle_descriptor_layer = _get_middle_descriptor_layer_body(inner_ciphertext_b64) - outter_ciphertext = hsv3_crypto.encrypt_outter_layer(middle_descriptor_layer, revision_counter, blinded_key_bytes, subcredential) + outter_ciphertext = stem.descriptor.hsv3_crypto.encrypt_outter_layer(middle_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
return b64_and_wrap_desc_layer(outter_ciphertext)
@@ -898,6 +979,8 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
**\*** attribute is either required when we're parsed with validation or has a default value, others are left as **None** if undefined + + .. versionadded:: 1.8.0 """
# TODO: requested this @type on https://trac.torproject.org/projects/tor/ticket/31481 @@ -973,7 +1056,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): public_identity_key_bytes = public_identity_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
# Blind the identity key to get ephemeral blinded key - blinded_privkey = hsv3_crypto.HSv3PrivateBlindedKey(ed25519_private_identity_key, blinding_param = blinding_param) + blinded_privkey = stem.descriptor.hsv3_crypto.HSv3PrivateBlindedKey(ed25519_private_identity_key, blinding_param = blinding_param) blinded_pubkey = blinded_privkey.public_key() blinded_pubkey_bytes = blinded_pubkey.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
@@ -1063,7 +1146,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): if not blinded_key: raise ValueError('No signing key is present')
- identity_public_key = HiddenServiceDescriptorV3._public_key_from_address(onion_address) + identity_public_key = HiddenServiceDescriptorV3.public_key_from_address(onion_address) subcredential = HiddenServiceDescriptorV3._subcredential(identity_public_key, blinded_key)
outer_layer = OuterLayer._decrypt(self.superencrypted, self.revision_counter, subcredential, blinded_key) @@ -1072,8 +1155,43 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): return self._inner_layer
@staticmethod - def _public_key_from_address(onion_address): - # provides our hidden service ed25519 public key + def address_from_public_key(pubkey, suffix = True): + """ + Converts a hidden service public key into its address. + + :param bytes pubkey: hidden service public key + :param bool suffix: includes the '.onion' suffix if true, excluded otherwise + + :returns: **unicode** hidden service address + + :raises: **ImportError** if sha3 unsupported + """ + + if not stem.prereq._is_sha3_available(): + raise ImportError('Hidden service address conversion requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)') + + version = stem.client.datatype.Size.CHAR.pack(3) + checksum = hashlib.sha3_256(CHECKSUM_CONSTANT + pubkey + version).digest()[:2] + onion_address = base64.b32encode(pubkey + checksum + version) + + return stem.util.str_tools._to_unicode(onion_address + b'.onion' if suffix else onion_address).lower() + + @staticmethod + def public_key_from_address(onion_address): + """ + Converts a hidden service address into its public key. + + :param str onion_address: hidden service address + + :returns: **bytes** for the hidden service's public key + + :raises: + * **ImportError** if sha3 unsupported + * **ValueError** if address malformed or checksum is invalid + """ + + if not stem.prereq._is_sha3_available(): + raise ImportError('Hidden service address conversion requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
if onion_address.endswith('.onion'): onion_address = onion_address[:-6] diff --git a/stem/prereq.py b/stem/prereq.py index 40c2f006..4af6c093 100644 --- a/stem/prereq.py +++ b/stem/prereq.py @@ -26,6 +26,9 @@ import inspect import platform import sys
+# TODO: in stem 2.x consider replacing these functions with requirement +# annotations (like our tests) + CRYPTO_UNAVAILABLE = "Unable to import the cryptography module. Because of this we'll be unable to verify descriptor signature integrity. You can get cryptography from: https://pypi.org/project/cryptography/" ZSTD_UNAVAILABLE = 'ZSTD compression requires the zstandard module (https://pypi.org/project/zstandard/)' LZMA_UNAVAILABLE = 'LZMA compression requires the lzma module (https://docs.python.org/3/library/lzma.html)' diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py index 14df30ec..61752e11 100644 --- a/test/unit/descriptor/hidden_service_v3.py +++ b/test/unit/descriptor/hidden_service_v3.py @@ -5,34 +5,28 @@ Unit tests for stem.descriptor.hidden_service for version 3. import base64 import datetime import functools -import hashlib import unittest
import stem.client.datatype import stem.descriptor +import stem.descriptor.hidden_service import stem.prereq
import test.require
-from stem.client.datatype import CertType, LinkByIPv4 -from stem.descriptor.certificate import ExtensionType, Ed25519Extension, Ed25519CertificateV1 +from test.unit.descriptor import ( + get_resource, + base_expect_invalid_attr, + base_expect_invalid_attr_for_text, +)
from stem.descriptor.hidden_service import ( - CHECKSUM_CONSTANT, - REQUIRED_V3_FIELDS, - X25519_AVAILABLE, IntroductionPointV3, HiddenServiceDescriptorV3, OuterLayer, InnerLayer, )
-from test.unit.descriptor import ( - get_resource, - base_expect_invalid_attr, - base_expect_invalid_attr_for_text, -) - try: # added in python 3.3 from unittest.mock import patch, Mock @@ -40,12 +34,13 @@ except ImportError: from mock import patch, Mock
require_sha3 = test.require.needs(stem.prereq._is_sha3_available, 'requires sha3') -require_x25519 = test.require.needs(lambda: X25519_AVAILABLE, 'requires openssl x5509') +require_x25519 = test.require.needs(lambda: stem.descriptor.hidden_service.X25519_AVAILABLE, 'requires openssl x5509')
expect_invalid_attr = functools.partial(base_expect_invalid_attr, HiddenServiceDescriptorV3, 'version', 3) expect_invalid_attr_for_text = functools.partial(base_expect_invalid_attr_for_text, HiddenServiceDescriptorV3, 'version', 3)
-HS_ADDRESS = 'sltib6sxkuxh2scmtuvd5w2g7pahnzkovefxpo4e4ptnkzl5kkq5h2ad.onion' +HS_ADDRESS = u'sltib6sxkuxh2scmtuvd5w2g7pahnzkovefxpo4e4ptnkzl5kkq5h2ad.onion' +HS_PUBKEY = b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1'
EXPECTED_SIGNING_CERT = """\ -----BEGIN ED25519 CERT----- @@ -68,58 +63,9 @@ with open(get_resource('hidden_service_v3_intro_point')) as intro_point_file: INTRO_POINT_STR = intro_point_file.read()
-def _pubkeys_are_equal(pubkey1, pubkey2): - """ - Compare the raw bytes of the two pubkeys and return True if they are the same - """ - - from cryptography.hazmat.primitives import serialization - - pubkey1_bytes = pubkey1.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) - pubkey2_bytes = pubkey2.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) - - return pubkey1_bytes == pubkey2_bytes - - -def _encode_onion_address(ed25519_pub_key_bytes): - """ - Given the public key, return the onion address - """ - - if not stem.prereq._is_sha3_available(): - raise ImportError('Encoding onion addresses requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)') - - version = 3 - checksum_body = b'%s%s%d' % (CHECKSUM_CONSTANT, ed25519_pub_key_bytes, version) - checksum = hashlib.sha3_256(checksum_body).digest()[:2] - - onion_address_bytes = b'%s%s%d' % (ed25519_pub_key_bytes, checksum, version) - onion_address = base64.b32encode(onion_address_bytes) + b'.onion' - assert(len(onion_address) == 56 + len('.onion')) - - return onion_address.lower() - - -def _helper_get_intro(): - from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey - from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey +def key_bytes(key): from cryptography.hazmat.primitives import serialization - - def public_key(key): - return key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) - - onion_key = public_key(X25519PrivateKey.generate()) - enc_key = public_key(X25519PrivateKey.generate()) - auth_key = public_key(Ed25519PrivateKey.generate()) - signing_key = Ed25519PrivateKey.generate() - - expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54) - extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, public_key(signing_key))] - - auth_key_cert = Ed25519CertificateV1(CertType.HS_V3_INTRO_AUTH, expiration, 1, auth_key, extensions, signing_key = signing_key) - enc_key_cert = Ed25519CertificateV1(CertType.HS_V3_NTOR_ENC, expiration, 1, auth_key, extensions, signing_key = signing_key) - - return IntroductionPointV3([LinkByIPv4('1.2.3.4', 9001)], base64.b64encode(onion_key), auth_key_cert, base64.b64encode(enc_key), enc_key_cert, None, None) + return key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
class TestHiddenServiceDescriptorV3(unittest.TestCase): @@ -221,7 +167,7 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): 'signature': 'signature', }
- for line in REQUIRED_V3_FIELDS: + for line in stem.descriptor.hidden_service.REQUIRED_V3_FIELDS: desc_text = HiddenServiceDescriptorV3.content(exclude = (line,)) expect_invalid_attr_for_text(self, desc_text, line_to_attr[line], None)
@@ -268,11 +214,14 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): expect_invalid_attr(self, {'revision-counter': test_value}, 'revision_counter')
@require_sha3 - @test.require.ed25519_support + def test_address_from_public_key(self): + self.assertEqual(HS_ADDRESS, HiddenServiceDescriptorV3.address_from_public_key(HS_PUBKEY)) + + @require_sha3 def test_public_key_from_address(self): - self.assertEqual(b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1', HiddenServiceDescriptorV3._public_key_from_address(HS_ADDRESS)) - self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3._public_key_from_address, 'boom') - self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3._public_key_from_address, '5' * 56) + self.assertEqual(HS_PUBKEY, HiddenServiceDescriptorV3.public_key_from_address(HS_ADDRESS)) + self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3.public_key_from_address, 'boom') + self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3.public_key_from_address, '5' * 56)
def test_intro_point_parse(self): """ @@ -304,7 +253,6 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): """
from cryptography.hazmat.backends.openssl.x25519 import X25519PublicKey - from cryptography.hazmat.primitives import serialization
intro_point = InnerLayer(INNER_LAYER_STR).introduction_points[0]
@@ -314,15 +262,8 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): self.assertTrue(isinstance(intro_point.onion_key(), X25519PublicKey)) self.assertTrue(isinstance(intro_point.enc_key(), X25519PublicKey))
- self.assertEqual(intro_point.onion_key_raw, base64.b64encode(intro_point.onion_key().public_bytes( - encoding = serialization.Encoding.Raw, - format = serialization.PublicFormat.Raw, - ))) - - self.assertEqual(intro_point.enc_key_raw, base64.b64encode(intro_point.enc_key().public_bytes( - encoding = serialization.Encoding.Raw, - format = serialization.PublicFormat.Raw, - ))) + self.assertEqual(intro_point.onion_key_raw, base64.b64encode(key_bytes(intro_point.onion_key()))) + self.assertEqual(intro_point.enc_key_raw, base64.b64encode(key_bytes(intro_point.enc_key())))
self.assertEqual(None, intro_point.legacy_key_raw) self.assertEqual(None, intro_point.legacy_key()) @@ -346,21 +287,28 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): making onion service descriptors. """
- from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey, Ed25519PrivateKey - from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey + + onion_key = X25519PrivateKey.generate() + enc_key = X25519PrivateKey.generate() + auth_key = Ed25519PrivateKey.generate() + signing_key = Ed25519PrivateKey.generate() + + expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54)
# Build the service private_identity_key = Ed25519PrivateKey.from_private_bytes(b'a' * 32) public_identity_key = private_identity_key.public_key() - pubkey_bytes = public_identity_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) + pubkey_bytes = key_bytes(public_identity_key)
- onion_address = _encode_onion_address(pubkey_bytes).decode() + onion_address = HiddenServiceDescriptorV3.address_from_public_key(pubkey_bytes)
- # Build the introduction points - intro1 = _helper_get_intro() - intro2 = _helper_get_intro() - intro3 = _helper_get_intro() - intro_points = [intro1, intro2, intro3] + intro_points = [ + IntroductionPointV3.create('1.1.1.1', 9001, expiration, onion_key, enc_key, auth_key, signing_key), + IntroductionPointV3.create('2.2.2.2', 9001, expiration, onion_key, enc_key, auth_key, signing_key), + IntroductionPointV3.create('3.3.3.3', 9001, expiration, onion_key, enc_key, auth_key, signing_key), + ]
# TODO: replace with bytes.fromhex() when we drop python 2.x support
@@ -368,7 +316,6 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
# Build the descriptor desc_string = HiddenServiceDescriptorV3.content(ed25519_private_identity_key = private_identity_key, intro_points = intro_points, blinding_param = blind_param) - desc_string = desc_string.decode()
# Parse the descriptor desc = HiddenServiceDescriptorV3.from_str(desc_string) @@ -376,17 +323,13 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
self.assertEqual(3, len(inner_layer.introduction_points))
- # Match introduction points of the parsed descriptor and the generated - # descriptor and do some sanity checks between them to make sure that - # parsing was done right! - - for i, desc_intro in enumerate(inner_layer.introduction_points): - original_intro = intro_points[i] - - auth_key_1 = Ed25519PublicKey.from_public_bytes(desc_intro.auth_key_cert.key) - auth_key_2 = Ed25519PublicKey.from_public_bytes(original_intro.auth_key_cert.key) + for i, intro_point in enumerate(inner_layer.introduction_points): + original = intro_points[i]
- self.assertTrue(_pubkeys_are_equal(desc_intro.enc_key(), original_intro.enc_key())) - self.assertTrue(_pubkeys_are_equal(desc_intro.onion_key(), original_intro.onion_key())) + self.assertEqual(original.enc_key_raw, intro_point.enc_key_raw) + self.assertEqual(original.onion_key_raw, intro_point.onion_key_raw) + self.assertEqual(original.auth_key_cert.key, intro_point.auth_key_cert.key)
- self.assertTrue(_pubkeys_are_equal(auth_key_1, auth_key_2)) + self.assertEqual(intro_point.enc_key_raw, base64.b64encode(key_bytes(intro_point.enc_key()))) + self.assertEqual(intro_point.onion_key_raw, base64.b64encode(key_bytes(intro_point.onion_key()))) + self.assertEqual(intro_point.auth_key_cert.key, key_bytes(intro_point.auth_key()))
tor-commits@lists.torproject.org