commit 5dc1f48975b10c8ebd1712ce3e749576505bfb1f Author: Damian Johnson atagar@torproject.org Date: Thu Oct 31 15:40:17 2019 -0700
Drop prototype classes
We're now at a point where we can drop our prototype introduction point and certificate classes. Our test_encode_decode_certificate() test is redundant with test_certificate_encoding(). --- stem/descriptor/certificate.py | 106 ---------------------- stem/descriptor/hidden_service.py | 144 ++---------------------------- test/unit/descriptor/certificate.py | 29 ------ test/unit/descriptor/hidden_service_v3.py | 22 ----- 4 files changed, 7 insertions(+), 294 deletions(-)
diff --git a/stem/descriptor/certificate.py b/stem/descriptor/certificate.py index 973957f1..dca54aee 100644 --- a/stem/descriptor/certificate.py +++ b/stem/descriptor/certificate.py @@ -470,109 +470,3 @@ class Ed25519CertificateV1(Ed25519Certificate): key.verify(self.signature, base64.b64decode(stem.util.str_tools._to_bytes(self.encoded))[:-ED25519_SIGNATURE_LENGTH]) except InvalidSignature: raise ValueError('Ed25519KeyCertificate signing key is invalid (signature forged or corrupt)') - - -class MyED25519Certificate(object): - """ - This class represents an ed25519 certificate and it's made for encoding it into a string. - We should merge this class with the one above. - """ - def __init__(self, cert_type, expiration_date, - cert_key_type, certified_pub_key, - signing_priv_key, include_signing_key, - version=1): - """ - :var int version - :var stem.client.datatype.CertType cert_type - :var int cert_type_int - :var datetime expiration_date - :var int cert_key_type - :var ED25519PublicKey certified_pub_key - :var ED25519PrivateKey signing_priv_key - :var bool include_signing_key - """ - self.version = version - self.cert_type, self.cert_type_int = ClientCertType.get(cert_type) - self.expiration_date = expiration_date - self.cert_key_type = cert_key_type - self.certified_pub_key = certified_pub_key - - self.signing_priv_key = signing_priv_key - self.signing_pub_key = signing_priv_key.public_key() - - self.include_signing_key = include_signing_key - # XXX validate params - - def _get_certificate_signature(self, msg_body): - return self.signing_priv_key.sign(msg_body) - - def _get_cert_extensions_bytes(self): - """ - Build the cert extensions part of the certificate - """ - - from cryptography.hazmat.primitives import serialization - n_extensions = 0 - - # If we need to include the signing key, let's create the extension body - # ExtLength [2 bytes] - # ExtType [1 byte] - # ExtFlags [1 byte] - # ExtData [ExtLength bytes] - if self.include_signing_key: - n_extensions += 1 - - signing_pubkey_bytes = self.signing_pub_key.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw) - - ext_length = len(signing_pubkey_bytes) - ext_type = 4 - ext_flags = 0 - ext_data = signing_pubkey_bytes - - # Now build the actual byte representation of any extensions - ext_obj = bytearray() - ext_obj += Size.CHAR.pack(n_extensions) - - if self.include_signing_key: - ext_obj += Size.SHORT.pack(ext_length) - ext_obj += Size.CHAR.pack(ext_type) - ext_obj += Size.CHAR.pack(ext_flags) - ext_obj += ext_data - - return bytes(ext_obj) - - def encode(self): - """Return a bytes representation of this certificate.""" - from cryptography.hazmat.primitives import serialization - obj = bytearray() - - obj += Size.CHAR.pack(self.version) - obj += Size.CHAR.pack(self.cert_type_int) - - # Encode EXPIRATION_DATE - expiration_seconds_since_epoch = stem.util.datetime_to_unix(self.expiration_date) - expiration_hours_since_epoch = int(expiration_seconds_since_epoch) // 3600 - obj += Size.LONG.pack(expiration_hours_since_epoch) - - # Encode CERT_KEY_TYPE - obj += Size.CHAR.pack(self.cert_key_type) - - # Encode CERTIFIED_KEY - certified_pub_key_bytes = self.certified_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) - assert(len(certified_pub_key_bytes) == 32) - obj += certified_pub_key_bytes - - # Encode N_EXTENSIONS and EXTENSIONS - obj += self._get_cert_extensions_bytes() - - # Do the signature on the body we have so far - obj += self._get_certificate_signature(bytes(obj)) - - return bytes(obj) - - def encode_for_descriptor(self): - cert_bytes = self.encode() - cert_b64 = base64.b64encode(cert_bytes) - cert_b64 = b'\n'.join(stem.util.str_tools._split_by_length(cert_b64, 64)) - return b'-----BEGIN ED25519 CERT-----\n%s\n-----END ED25519 CERT-----' % cert_b64 diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py index 68d150b1..bb8937d8 100644 --- a/stem/descriptor/hidden_service.py +++ b/stem/descriptor/hidden_service.py @@ -45,7 +45,6 @@ import stem.util.connection import stem.util.str_tools import stem.util.tor_tools
-from stem.client.datatype import CertType from stem.descriptor import hsv3_crypto from stem.descriptor.certificate import Ed25519Certificate
@@ -310,127 +309,6 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s return link_specifiers
-class AlternateIntroductionPointV3(object): - """ - Introduction point for a v3 hidden service. - - We want this class to satisfy two use cases: - - - Parsing introduction points directly from the HSv3 descriptor and saving - their data here. - - - Creating introduction points for inclusion to an HSv3 descriptor at a point - where a descriptor signing key is not yet known (because the descriptor is - not yet made). In which case, the certificates cannot be created yet and - hence need to be created at encoding time. - - .. versionadded:: 1.8.0 - - :var list link_specifiers: :class:`~stem.client.datatype.LinkSpecifier` where this service is reachable - :var X25519PublicKey onion_key: ntor introduction point public key - :var Ed25519PublicKey auth_key: ed25519 authentication key for this intro point - :var stem.certificate.Ed25519Certificate auth_key_cert: cross-certifier of the signing key with the auth key - :var X25519PublicKey enc_key: introduction request encryption key - :var stem.certificate.Ed25519Certificate enc_key_cert: cross-certifier of the signing key by the encryption key - :var XXX legacy_key: legacy introduction point RSA public key - :var stem.certificate.Ed25519Certificate legacy_key_cert: cross-certifier of the signing key by the legacy key - - :var Ed25519Certificate descriptor_signing_key: hsv3 descriptor signing key (needed to encode the intro point) - """ - def __init__(self, link_specifiers, onion_key, enc_key, auth_key=None, auth_key_cert=None, legacy_key=None, enc_key_cert=None, legacy_key_cert=None): - """ - Initialize this intro point. - - While not all attributes are mandatory, at the very least the link - specifiers, the auth key, the onion key and the encryption key need to be - provided. - - The certificates can be left out (for example in the case of creating a new - intro point), and they will be created at encode time when the - descriptor_signing_key is provided. - """ - - # if not link_specifiers or not onion_key or not enc_key: - # raise ValueError('Introduction point missing essential keys') - - if not auth_key and not auth_key_cert: - raise ValueError('Either auth key or auth key cert needs to be provided') - - # If we have an auth key cert but not an auth key, extract the key - if auth_key_cert and not auth_key and stem.prereq.is_crypto_available(ed25519 = True): - from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey - auth_key = Ed25519PublicKey.from_public_bytes(auth_key_cert.key) - - self.link_specifiers = link_specifiers - self.onion_key = enc_key - self.enc_key = enc_key - self.legacy_key = legacy_key - self.auth_key = auth_key - self.auth_key_cert = auth_key_cert - self.enc_key_cert = enc_key_cert - self.legacy_key_cert = legacy_key_cert - - def _encode_link_specifier_block(self): - """ - See BUILDING-BLOCKS in rend-spec-v3.txt - - NSPEC (Number of link specifiers) [1 byte] - NSPEC times: - LSTYPE (Link specifier type) [1 byte] - LSLEN (Link specifier length) [1 byte] - LSPEC (Link specifier) [LSLEN bytes] - """ - - ls_block = b'' - ls_block += chr(len(self.link_specifiers)) - - for ls in self.link_specifiers: - ls_block += ls.pack() - - return base64.b64encode(ls_block) - - def encode(self, descriptor_signing_privkey): - """ - Encode this introduction point into bytes - """ - - if not descriptor_signing_privkey: - raise ValueError('Cannot encode: Descriptor signing key not provided') - - from cryptography.hazmat.primitives import serialization - - cert_expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54) - - body = b'' - - body += b'introduction-point %s\n' % (self._encode_link_specifier_block()) - - # Onion key - onion_key_bytes = self.onion_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) - body += b'onion-key ntor %s\n' % (base64.b64encode(onion_key_bytes)) - - # Build auth key certificate - auth_key_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_INTRO_AUTH, expiration_date = cert_expiration_date, cert_key_type = 1, certified_pub_key = self.auth_key, signing_priv_key = descriptor_signing_privkey, include_signing_key = True) - auth_key_cert_b64_blob = auth_key_cert.encode_for_descriptor() - body += b'auth-key\n%s\n' % (auth_key_cert_b64_blob) - - # Build enc key line - enc_key_bytes = self.enc_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) - body += b'enc-key ntor %s\n' % (base64.b64encode(enc_key_bytes)) - - # Build enc key cert (this does not actually need to certify anything because of #29583) - enc_key_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_NTOR_ENC, expiration_date = cert_expiration_date, cert_key_type = 1, certified_pub_key = self.auth_key, signing_priv_key = descriptor_signing_privkey, include_signing_key = True) - enc_key_cert_b64_blob = enc_key_cert.encode_for_descriptor() - body += b'enc-key-cert\n%s\n' % (enc_key_cert_b64_blob) - - # We are called to encode legacy key, but we don't know how - # TODO do legacy keys! - if self.legacy_key or self.legacy_key_cert: - raise NotImplementedError - - return body - - class AuthorizedClient(collections.namedtuple('AuthorizedClient', ['id', 'iv', 'cookie'])): """ Client authorized to use a v3 hidden service. @@ -889,22 +767,17 @@ def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key 'blinded_priv_key' key that signs the certificate """
+ from cryptography.hazmat.primitives import serialization + # 54 hours expiration date like tor does expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54)
- desc_signing_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type='HS_V3_DESC_SIGNING', - expiration_date=expiration_date, - cert_key_type=1, - certified_pub_key=descriptor_signing_public_key, - signing_priv_key=blinded_priv_key, - include_signing_key=True) - - signing_cert_bytes = desc_signing_cert.encode() + signing_key = descriptor_signing_public_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) + extensions = [stem.descriptor.certificate.Ed25519Extension(stem.descriptor.certificate.ExtensionType.HAS_SIGNING_KEY, None, blinded_priv_key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))]
- cert_base64 = stem.util.str_tools._to_unicode(base64.b64encode(signing_cert_bytes)) - cert_blob = '\n'.join(stem.util.str_tools._split_by_length(cert_base64, 64)) + desc_signing_cert = stem.descriptor.certificate.Ed25519CertificateV1(stem.client.datatype.CertType.HS_V3_DESC_SIGNING, expiration_date, 1, signing_key, extensions, signing_key = blinded_priv_key)
- return '\n-----BEGIN %s-----\n%s\n-----END %s-----' % ('ED25519 CERT', cert_blob, 'ED25519 CERT') + return '\n' + desc_signing_cert.to_base64(pem = True)
def b64_and_wrap_desc_layer(layer_bytes, prefix_bytes=b''): @@ -935,10 +808,7 @@ def _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey):
# Now encode all the intro points for intro_point in intro_points: - if isinstance(intro_point, IntroductionPointV3): - final_body += intro_point.encode() + b'\n' - else: - final_body += intro_point.encode(descriptor_signing_privkey) + final_body += intro_point.encode() + b'\n'
return final_body
diff --git a/test/unit/descriptor/certificate.py b/test/unit/descriptor/certificate.py index d2b13f72..74ea0a6e 100644 --- a/test/unit/descriptor/certificate.py +++ b/test/unit/descriptor/certificate.py @@ -16,8 +16,6 @@ from stem.client.datatype import Size, CertType from stem.descriptor.certificate import ED25519_SIGNATURE_LENGTH, ExtensionType, Ed25519Certificate, Ed25519CertificateV1, Ed25519Extension from test.unit.descriptor import get_resource
-from cryptography.hazmat.primitives import serialization - ED25519_CERT = """ AQQABhtZAaW2GoBED1IjY3A6f6GNqBEl5A83fD2Za9upGke51JGqAQAgBABnprVR ptIr43bWPo2fIzo3uOywfoMrryprpbm4HhCkZMaO064LP+1KNuLvlc8sGG8lTjx1 @@ -221,30 +219,3 @@ class TestEd25519Certificate(unittest.TestCase):
cert = Ed25519Certificate.from_base64(certificate()) self.assertRaisesWith(ValueError, 'Ed25519KeyCertificate signing key is invalid (signature forged or corrupt)', cert.validate, desc) - - @test.require.ed25519_support - def test_encode_decode_certificate(self): - from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey - - certified_priv_key = Ed25519PrivateKey.generate() - certified_pub_key = certified_priv_key.public_key() - - signing_priv_key = Ed25519PrivateKey.generate() - - expiration_date = datetime.datetime(2037, 8, 28, 17, 0) - - my_ed_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_DESC_SIGNING, expiration_date = expiration_date, cert_key_type = 1, certified_pub_key = certified_pub_key, signing_priv_key = signing_priv_key, include_signing_key = True) - - ed_cert_bytes = my_ed_cert.encode() - self.assertTrue(my_ed_cert) - - # base64 the cert since that's what the parsing func expects - ed_cert_bytes_b64 = base64.b64encode(ed_cert_bytes) - - ed_cert_parsed = Ed25519Certificate.from_base64(ed_cert_bytes_b64) - - self.assertEqual(ed_cert_parsed.type, my_ed_cert.cert_type) - self.assertEqual(ed_cert_parsed.expiration, my_ed_cert.expiration_date) - self.assertEqual(ed_cert_parsed.key_type, my_ed_cert.cert_key_type) - self.assertEqual(ed_cert_parsed.key, my_ed_cert.certified_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)) - self.assertEqual(ed_cert_parsed.signing_key(), my_ed_cert.signing_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)) diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py index 060fa4dc..14df30ec 100644 --- a/test/unit/descriptor/hidden_service_v3.py +++ b/test/unit/descriptor/hidden_service_v3.py @@ -22,7 +22,6 @@ from stem.descriptor.hidden_service import ( REQUIRED_V3_FIELDS, X25519_AVAILABLE, IntroductionPointV3, - AlternateIntroductionPointV3, HiddenServiceDescriptorV3, OuterLayer, InnerLayer, @@ -123,27 +122,6 @@ def _helper_get_intro(): return IntroductionPointV3([LinkByIPv4('1.2.3.4', 9001)], base64.b64encode(onion_key), auth_key_cert, base64.b64encode(enc_key), enc_key_cert, None, None)
-def _helper_get_intro_old(): - from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey - from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey - - link_specifiers = [] - - link1, _ = stem.client.datatype.LinkSpecifier.pop(b'\x03\x20CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC') - link_specifiers.append(link1) - - onion_privkey = X25519PrivateKey.generate() - onion_pubkey = onion_privkey.public_key() - - auth_privkey = Ed25519PrivateKey.generate() - auth_pubkey = auth_privkey.public_key() - - enc_privkey = X25519PrivateKey.generate() - enc_pubkey = enc_privkey.public_key() - - return AlternateIntroductionPointV3(link_specifiers, onion_key=onion_pubkey, enc_key=enc_pubkey, auth_key=auth_pubkey) - - class TestHiddenServiceDescriptorV3(unittest.TestCase): def test_real_descriptor(self): """