commit a67425403fd90dd0b0e7b55161c8345091dfd36b Author: Damian Johnson atagar@torproject.org Date: Thu Oct 31 14:50:08 2019 -0700
Test using new IntroductionPoint class
Adjusting our test_encode_decode_descriptor() test to use our present IntroductionPoint class rather than the prototype. --- stem/descriptor/certificate.py | 29 +++++++++++++++++++++----- stem/descriptor/hidden_service.py | 9 +++++--- test/unit/descriptor/hidden_service_v3.py | 34 +++++++++++++++++++++++++++---- 3 files changed, 60 insertions(+), 12 deletions(-)
diff --git a/stem/descriptor/certificate.py b/stem/descriptor/certificate.py index b48a0899..973957f1 100644 --- a/stem/descriptor/certificate.py +++ b/stem/descriptor/certificate.py @@ -123,10 +123,10 @@ class Ed25519Extension(Field): def __init__(self, ext_type, flag_val, data): self.type = ext_type self.flags = [] - self.flag_int = flag_val + self.flag_int = flag_val if flag_val else 0 self.data = data
- if flag_val % 2 == 1: + if flag_val and flag_val % 2 == 1: self.flags.append(ExtensionFlag.AFFECTS_VALIDATION) flag_val -= 1
@@ -284,18 +284,34 @@ class Ed25519CertificateV1(Ed25519Certificate): :var bytes key: key content :var list extensions: :class:`~stem.descriptor.certificate.Ed25519Extension` in this certificate :var bytes signature: certificate signature + + :param bytes signature: pre-calculated certificate signature + :param cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey: certificate signing key """
- def __init__(self, type_int, expiration, key_type, key, extensions, signature): + def __init__(self, cert_type, expiration, key_type, key, extensions, signature = None, signing_key = None): super(Ed25519CertificateV1, self).__init__(1)
- self.type, self.type_int = ClientCertType.get(type_int) + if not signature and not signing_key: + raise ValueError('Certificate signature or signing key is required') + + self.type, self.type_int = ClientCertType.get(cert_type) self.expiration = expiration self.key_type = key_type self.key = key self.extensions = extensions self.signature = signature
+ if signing_key: + calculated_sig = signing_key.sign(self.pack()) + + # if caller provides both signing key *and* signature then ensure they match + + if self.signature and self.signature != calculated_sig: + raise ValueError("Signature calculated from its key (%s) mismatches '%s'" % (calculated_sig, self.signature)) + + self.signature = calculated_sig + if self.type in (ClientCertType.LINK, ClientCertType.IDENTITY, ClientCertType.AUTHENTICATE): raise ValueError('Ed25519 certificate cannot have a type of %i. This is reserved for CERTS cells.' % self.type_int) elif self.type == ClientCertType.ED25519_IDENTITY: @@ -315,7 +331,10 @@ class Ed25519CertificateV1(Ed25519Certificate): for extension in self.extensions: encoded += extension.pack()
- return bytes(encoded + self.signature) + if self.signature: + encoded += self.signature + + return bytes(encoded)
@staticmethod def unpack(content): diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py index 09030ca5..68d150b1 100644 --- a/stem/descriptor/hidden_service.py +++ b/stem/descriptor/hidden_service.py @@ -935,7 +935,10 @@ def _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey):
# Now encode all the intro points for intro_point in intro_points: - final_body += intro_point.encode(descriptor_signing_privkey) + if isinstance(intro_point, IntroductionPointV3): + final_body += intro_point.encode() + b'\n' + else: + final_body += intro_point.encode(descriptor_signing_privkey)
return final_body
@@ -984,7 +987,7 @@ def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_ should be attached to the descriptor """
- inner_descriptor_layer = _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey) + inner_descriptor_layer = stem.util.str_tools._to_bytes(_get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey)) inner_ciphertext = hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential) inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b'encrypted')
@@ -1179,7 +1182,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): raise ImportError('Hidden service descriptor decryption requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
if self._inner_layer is None: - blinded_key = self.signing_cert.signing_key() + blinded_key = self.signing_cert.signing_key() if self.signing_cert else None
if not blinded_key: raise ValueError('No signing key is present') diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py index fb190211..060fa4dc 100644 --- a/test/unit/descriptor/hidden_service_v3.py +++ b/test/unit/descriptor/hidden_service_v3.py @@ -3,6 +3,7 @@ Unit tests for stem.descriptor.hidden_service for version 3. """
import base64 +import datetime import functools import hashlib import unittest @@ -13,6 +14,9 @@ import stem.prereq
import test.require
+from stem.client.datatype import CertType, LinkByIPv4 +from stem.descriptor.certificate import ExtensionType, Ed25519Extension, Ed25519CertificateV1 + from stem.descriptor.hidden_service import ( CHECKSUM_CONSTANT, REQUIRED_V3_FIELDS, @@ -100,6 +104,28 @@ def _encode_onion_address(ed25519_pub_key_bytes): def _helper_get_intro(): from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey + from cryptography.hazmat.primitives import serialization + + def public_key(key): + return key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) + + onion_key = public_key(X25519PrivateKey.generate()) + enc_key = public_key(X25519PrivateKey.generate()) + auth_key = public_key(Ed25519PrivateKey.generate()) + signing_key = Ed25519PrivateKey.generate() + + expiration = datetime.datetime.utcnow() + datetime.timedelta(hours = 54) + extensions = [Ed25519Extension(ExtensionType.HAS_SIGNING_KEY, None, public_key(signing_key))] + + auth_key_cert = Ed25519CertificateV1(CertType.HS_V3_INTRO_AUTH, expiration, 1, auth_key, extensions, signing_key = signing_key) + enc_key_cert = Ed25519CertificateV1(CertType.HS_V3_NTOR_ENC, expiration, 1, auth_key, extensions, signing_key = signing_key) + + return IntroductionPointV3([LinkByIPv4('1.2.3.4', 9001)], base64.b64encode(onion_key), auth_key_cert, base64.b64encode(enc_key), enc_key_cert, None, None) + + +def _helper_get_intro_old(): + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey + from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
link_specifiers = []
@@ -370,7 +396,7 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): desc = HiddenServiceDescriptorV3.from_str(desc_string) inner_layer = desc.decrypt(onion_address)
- self.assertEqual(len(inner_layer.introduction_points), 3) + self.assertEqual(3, len(inner_layer.introduction_points))
# Match introduction points of the parsed descriptor and the generated # descriptor and do some sanity checks between them to make sure that @@ -380,9 +406,9 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): original_intro = intro_points[i]
auth_key_1 = Ed25519PublicKey.from_public_bytes(desc_intro.auth_key_cert.key) - auth_key_2 = original_intro.auth_key + auth_key_2 = Ed25519PublicKey.from_public_bytes(original_intro.auth_key_cert.key)
- self.assertTrue(_pubkeys_are_equal(desc_intro.enc_key(), original_intro.enc_key)) - self.assertTrue(_pubkeys_are_equal(desc_intro.onion_key(), original_intro.onion_key)) + self.assertTrue(_pubkeys_are_equal(desc_intro.enc_key(), original_intro.enc_key())) + self.assertTrue(_pubkeys_are_equal(desc_intro.onion_key(), original_intro.onion_key()))
self.assertTrue(_pubkeys_are_equal(auth_key_1, auth_key_2))