commit 5dc1f48975b10c8ebd1712ce3e749576505bfb1f
Author: Damian Johnson <atagar(a)torproject.org>
Date: Thu Oct 31 15:40:17 2019 -0700
Drop prototype classes
We're now at a point where we can drop our prototype introduction point and
certificate classes. Our test_encode_decode_certificate() test is redundant
with test_certificate_encoding().
---
stem/descriptor/certificate.py | 106 ----------------------
stem/descriptor/hidden_service.py | 144 ++----------------------------
test/unit/descriptor/certificate.py | 29 ------
test/unit/descriptor/hidden_service_v3.py | 22 -----
4 files changed, 7 insertions(+), 294 deletions(-)
diff --git a/stem/descriptor/certificate.py b/stem/descriptor/certificate.py
index 973957f1..dca54aee 100644
--- a/stem/descriptor/certificate.py
+++ b/stem/descriptor/certificate.py
@@ -470,109 +470,3 @@ class Ed25519CertificateV1(Ed25519Certificate):
key.verify(self.signature, base64.b64decode(stem.util.str_tools._to_bytes(self.encoded))[:-ED25519_SIGNATURE_LENGTH])
except InvalidSignature:
raise ValueError('Ed25519KeyCertificate signing key is invalid (signature forged or corrupt)')
-
-
-class MyED25519Certificate(object):
- """
- This class represents an ed25519 certificate and it's made for encoding it into a string.
- We should merge this class with the one above.
- """
- def __init__(self, cert_type, expiration_date,
- cert_key_type, certified_pub_key,
- signing_priv_key, include_signing_key,
- version=1):
- """
- :var int version
- :var stem.client.datatype.CertType cert_type
- :var int cert_type_int
- :var datetime expiration_date
- :var int cert_key_type
- :var ED25519PublicKey certified_pub_key
- :var ED25519PrivateKey signing_priv_key
- :var bool include_signing_key
- """
- self.version = version
- self.cert_type, self.cert_type_int = ClientCertType.get(cert_type)
- self.expiration_date = expiration_date
- self.cert_key_type = cert_key_type
- self.certified_pub_key = certified_pub_key
-
- self.signing_priv_key = signing_priv_key
- self.signing_pub_key = signing_priv_key.public_key()
-
- self.include_signing_key = include_signing_key
- # XXX validate params
-
- def _get_certificate_signature(self, msg_body):
- return self.signing_priv_key.sign(msg_body)
-
- def _get_cert_extensions_bytes(self):
- """
- Build the cert extensions part of the certificate
- """
-
- from cryptography.hazmat.primitives import serialization
- n_extensions = 0
-
- # If we need to include the signing key, let's create the extension body
- # ExtLength [2 bytes]
- # ExtType [1 byte]
- # ExtFlags [1 byte]
- # ExtData [ExtLength bytes]
- if self.include_signing_key:
- n_extensions += 1
-
- signing_pubkey_bytes = self.signing_pub_key.public_bytes(encoding=serialization.Encoding.Raw,
- format=serialization.PublicFormat.Raw)
-
- ext_length = len(signing_pubkey_bytes)
- ext_type = 4
- ext_flags = 0
- ext_data = signing_pubkey_bytes
-
- # Now build the actual byte representation of any extensions
- ext_obj = bytearray()
- ext_obj += Size.CHAR.pack(n_extensions)
-
- if self.include_signing_key:
- ext_obj += Size.SHORT.pack(ext_length)
- ext_obj += Size.CHAR.pack(ext_type)
- ext_obj += Size.CHAR.pack(ext_flags)
- ext_obj += ext_data
-
- return bytes(ext_obj)
-
- def encode(self):
- """Return a bytes representation of this certificate."""
- from cryptography.hazmat.primitives import serialization
- obj = bytearray()
-
- obj += Size.CHAR.pack(self.version)
- obj += Size.CHAR.pack(self.cert_type_int)
-
- # Encode EXPIRATION_DATE
- expiration_seconds_since_epoch = stem.util.datetime_to_unix(self.expiration_date)
- expiration_hours_since_epoch = int(expiration_seconds_since_epoch) // 3600
- obj += Size.LONG.pack(expiration_hours_since_epoch)
-
- # Encode CERT_KEY_TYPE
- obj += Size.CHAR.pack(self.cert_key_type)
-
- # Encode CERTIFIED_KEY
- certified_pub_key_bytes = self.certified_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
- assert(len(certified_pub_key_bytes) == 32)
- obj += certified_pub_key_bytes
-
- # Encode N_EXTENSIONS and EXTENSIONS
- obj += self._get_cert_extensions_bytes()
-
- # Do the signature on the body we have so far
- obj += self._get_certificate_signature(bytes(obj))
-
- return bytes(obj)
-
- def encode_for_descriptor(self):
- cert_bytes = self.encode()
- cert_b64 = base64.b64encode(cert_bytes)
- cert_b64 = b'\n'.join(stem.util.str_tools._split_by_length(cert_b64, 64))
- return b'-----BEGIN ED25519 CERT-----\n%s\n-----END ED25519 CERT-----' % cert_b64
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index 68d150b1..bb8937d8 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -45,7 +45,6 @@ import stem.util.connection
import stem.util.str_tools
import stem.util.tor_tools
-from stem.client.datatype import CertType
from stem.descriptor import hsv3_crypto
from stem.descriptor.certificate import Ed25519Certificate
@@ -310,127 +309,6 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
return link_specifiers
-class AlternateIntroductionPointV3(object):
- """
- Introduction point for a v3 hidden service.
-
- We want this class to satisfy two use cases:
-
- - Parsing introduction points directly from the HSv3 descriptor and saving
- their data here.
-
- - Creating introduction points for inclusion to an HSv3 descriptor at a point
- where a descriptor signing key is not yet known (because the descriptor is
- not yet made). In which case, the certificates cannot be created yet and
- hence need to be created at encoding time.
-
- .. versionadded:: 1.8.0
-
- :var list link_specifiers: :class:`~stem.client.datatype.LinkSpecifier` where this service is reachable
- :var X25519PublicKey onion_key: ntor introduction point public key
- :var Ed25519PublicKey auth_key: ed25519 authentication key for this intro point
- :var stem.certificate.Ed25519Certificate auth_key_cert: cross-certifier of the signing key with the auth key
- :var X25519PublicKey enc_key: introduction request encryption key
- :var stem.certificate.Ed25519Certificate enc_key_cert: cross-certifier of the signing key by the encryption key
- :var XXX legacy_key: legacy introduction point RSA public key
- :var stem.certificate.Ed25519Certificate legacy_key_cert: cross-certifier of the signing key by the legacy key
-
- :var Ed25519Certificate descriptor_signing_key: hsv3 descriptor signing key (needed to encode the intro point)
- """
- def __init__(self, link_specifiers, onion_key, enc_key, auth_key=None, auth_key_cert=None, legacy_key=None, enc_key_cert=None, legacy_key_cert=None):
- """
- Initialize this intro point.
-
- While not all attributes are mandatory, at the very least the link
- specifiers, the auth key, the onion key and the encryption key need to be
- provided.
-
- The certificates can be left out (for example in the case of creating a new
- intro point), and they will be created at encode time when the
- descriptor_signing_key is provided.
- """
-
- # if not link_specifiers or not onion_key or not enc_key:
- # raise ValueError('Introduction point missing essential keys')
-
- if not auth_key and not auth_key_cert:
- raise ValueError('Either auth key or auth key cert needs to be provided')
-
- # If we have an auth key cert but not an auth key, extract the key
- if auth_key_cert and not auth_key and stem.prereq.is_crypto_available(ed25519 = True):
- from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
- auth_key = Ed25519PublicKey.from_public_bytes(auth_key_cert.key)
-
- self.link_specifiers = link_specifiers
- self.onion_key = enc_key
- self.enc_key = enc_key
- self.legacy_key = legacy_key
- self.auth_key = auth_key
- self.auth_key_cert = auth_key_cert
- self.enc_key_cert = enc_key_cert
- self.legacy_key_cert = legacy_key_cert
-
- def _encode_link_specifier_block(self):
- """
- See BUILDING-BLOCKS in rend-spec-v3.txt
-
- NSPEC (Number of link specifiers) [1 byte]
- NSPEC times:
- LSTYPE (Link specifier type) [1 byte]
- LSLEN (Link specifier length) [1 byte]
- LSPEC (Link specifier) [LSLEN bytes]
- """
-
- ls_block = b''
- ls_block += chr(len(self.link_specifiers))
-
- for ls in self.link_specifiers:
- ls_block += ls.pack()
-
- return base64.b64encode(ls_block)
-
- def encode(self, descriptor_signing_privkey):
- """
- Encode this introduction point into bytes
- """
-
- if not descriptor_signing_privkey:
- raise ValueError('Cannot encode: Descriptor signing key not provided')
-
- from cryptography.hazmat.primitives import serialization
-
- cert_expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54)
-
- body = b''
-
- body += b'introduction-point %s\n' % (self._encode_link_specifier_block())
-
- # Onion key
- onion_key_bytes = self.onion_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
- body += b'onion-key ntor %s\n' % (base64.b64encode(onion_key_bytes))
-
- # Build auth key certificate
- auth_key_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_INTRO_AUTH, expiration_date = cert_expiration_date, cert_key_type = 1, certified_pub_key = self.auth_key, signing_priv_key = descriptor_signing_privkey, include_signing_key = True)
- auth_key_cert_b64_blob = auth_key_cert.encode_for_descriptor()
- body += b'auth-key\n%s\n' % (auth_key_cert_b64_blob)
-
- # Build enc key line
- enc_key_bytes = self.enc_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
- body += b'enc-key ntor %s\n' % (base64.b64encode(enc_key_bytes))
-
- # Build enc key cert (this does not actually need to certify anything because of #29583)
- enc_key_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_NTOR_ENC, expiration_date = cert_expiration_date, cert_key_type = 1, certified_pub_key = self.auth_key, signing_priv_key = descriptor_signing_privkey, include_signing_key = True)
- enc_key_cert_b64_blob = enc_key_cert.encode_for_descriptor()
- body += b'enc-key-cert\n%s\n' % (enc_key_cert_b64_blob)
-
- # We are called to encode legacy key, but we don't know how
- # TODO do legacy keys!
- if self.legacy_key or self.legacy_key_cert:
- raise NotImplementedError
-
- return body
-
-
class AuthorizedClient(collections.namedtuple('AuthorizedClient', ['id', 'iv', 'cookie'])):
"""
Client authorized to use a v3 hidden service.
@@ -889,22 +767,17 @@ def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key
'blinded_priv_key' key that signs the certificate
"""
+ from cryptography.hazmat.primitives import serialization
+
# 54 hours expiration date like tor does
expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54)
- desc_signing_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type='HS_V3_DESC_SIGNING',
- expiration_date=expiration_date,
- cert_key_type=1,
- certified_pub_key=descriptor_signing_public_key,
- signing_priv_key=blinded_priv_key,
- include_signing_key=True)
-
- signing_cert_bytes = desc_signing_cert.encode()
+ signing_key = descriptor_signing_public_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
+ extensions = [stem.descriptor.certificate.Ed25519Extension(stem.descriptor.certificate.ExtensionType.HAS_SIGNING_KEY, None, blinded_priv_key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))]
- cert_base64 = stem.util.str_tools._to_unicode(base64.b64encode(signing_cert_bytes))
- cert_blob = '\n'.join(stem.util.str_tools._split_by_length(cert_base64, 64))
+ desc_signing_cert = stem.descriptor.certificate.Ed25519CertificateV1(stem.client.datatype.CertType.HS_V3_DESC_SIGNING, expiration_date, 1, signing_key, extensions, signing_key = blinded_priv_key)
- return '\n-----BEGIN %s-----\n%s\n-----END %s-----' % ('ED25519 CERT', cert_blob, 'ED25519 CERT')
+ return '\n' + desc_signing_cert.to_base64(pem = True)
def b64_and_wrap_desc_layer(layer_bytes, prefix_bytes=b''):
@@ -935,10 +808,7 @@ def _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey):
# Now encode all the intro points
for intro_point in intro_points:
- if isinstance(intro_point, IntroductionPointV3):
- final_body += intro_point.encode() + b'\n'
- else:
- final_body += intro_point.encode(descriptor_signing_privkey)
+ final_body += intro_point.encode() + b'\n'
return final_body
diff --git a/test/unit/descriptor/certificate.py b/test/unit/descriptor/certificate.py
index d2b13f72..74ea0a6e 100644
--- a/test/unit/descriptor/certificate.py
+++ b/test/unit/descriptor/certificate.py
@@ -16,8 +16,6 @@ from stem.client.datatype import Size, CertType
from stem.descriptor.certificate import ED25519_SIGNATURE_LENGTH, ExtensionType, Ed25519Certificate, Ed25519CertificateV1, Ed25519Extension
from test.unit.descriptor import get_resource
-from cryptography.hazmat.primitives import serialization
-
ED25519_CERT = """
AQQABhtZAaW2GoBED1IjY3A6f6GNqBEl5A83fD2Za9upGke51JGqAQAgBABnprVR
ptIr43bWPo2fIzo3uOywfoMrryprpbm4HhCkZMaO064LP+1KNuLvlc8sGG8lTjx1
@@ -221,30 +219,3 @@ class TestEd25519Certificate(unittest.TestCase):
cert = Ed25519Certificate.from_base64(certificate())
self.assertRaisesWith(ValueError, 'Ed25519KeyCertificate signing key is invalid (signature forged or corrupt)', cert.validate, desc)
-
- @test.require.ed25519_support
- def test_encode_decode_certificate(self):
- from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
-
- certified_priv_key = Ed25519PrivateKey.generate()
- certified_pub_key = certified_priv_key.public_key()
-
- signing_priv_key = Ed25519PrivateKey.generate()
-
- expiration_date = datetime.datetime(2037, 8, 28, 17, 0)
-
- my_ed_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_DESC_SIGNING, expiration_date = expiration_date, cert_key_type = 1, certified_pub_key = certified_pub_key, signing_priv_key = signing_priv_key, include_signing_key = True)
-
- ed_cert_bytes = my_ed_cert.encode()
- self.assertTrue(my_ed_cert)
-
- # base64 the cert since that's what the parsing func expects
- ed_cert_bytes_b64 = base64.b64encode(ed_cert_bytes)
-
- ed_cert_parsed = Ed25519Certificate.from_base64(ed_cert_bytes_b64)
-
- self.assertEqual(ed_cert_parsed.type, my_ed_cert.cert_type)
- self.assertEqual(ed_cert_parsed.expiration, my_ed_cert.expiration_date)
- self.assertEqual(ed_cert_parsed.key_type, my_ed_cert.cert_key_type)
- self.assertEqual(ed_cert_parsed.key, my_ed_cert.certified_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))
- self.assertEqual(ed_cert_parsed.signing_key(), my_ed_cert.signing_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 060fa4dc..14df30ec 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -22,7 +22,6 @@ from stem.descriptor.hidden_service import (
REQUIRED_V3_FIELDS,
X25519_AVAILABLE,
IntroductionPointV3,
- AlternateIntroductionPointV3,
HiddenServiceDescriptorV3,
OuterLayer,
InnerLayer,
@@ -123,27 +122,6 @@ def _helper_get_intro():
return IntroductionPointV3([LinkByIPv4('1.2.3.4', 9001)], base64.b64encode(onion_key), auth_key_cert, base64.b64encode(enc_key), enc_key_cert, None, None)
-def _helper_get_intro_old():
- from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
- from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
-
- link_specifiers = []
-
- link1, _ = stem.client.datatype.LinkSpecifier.pop(b'\x03\x20CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC')
- link_specifiers.append(link1)
-
- onion_privkey = X25519PrivateKey.generate()
- onion_pubkey = onion_privkey.public_key()
-
- auth_privkey = Ed25519PrivateKey.generate()
- auth_pubkey = auth_privkey.public_key()
-
- enc_privkey = X25519PrivateKey.generate()
- enc_pubkey = enc_privkey.public_key()
-
- return AlternateIntroductionPointV3(link_specifiers, onion_key=onion_pubkey, enc_key=enc_pubkey, auth_key=auth_pubkey)
-
-
class TestHiddenServiceDescriptorV3(unittest.TestCase):
def test_real_descriptor(self):
"""