[tor-commits] [stem/master] Drop prototype classes

atagar at torproject.org atagar at torproject.org
Sun Nov 17 23:40:39 UTC 2019


commit 5dc1f48975b10c8ebd1712ce3e749576505bfb1f
Author: Damian Johnson <atagar at torproject.org>
Date:   Thu Oct 31 15:40:17 2019 -0700

    Drop prototype classes
    
    We're now at a point where we can drop our prototype introduction point and
    certificate classes. Our test_encode_decode_certificate() test is redundant
    with test_certificate_encoding().
---
 stem/descriptor/certificate.py            | 106 ----------------------
 stem/descriptor/hidden_service.py         | 144 ++----------------------------
 test/unit/descriptor/certificate.py       |  29 ------
 test/unit/descriptor/hidden_service_v3.py |  22 -----
 4 files changed, 7 insertions(+), 294 deletions(-)

diff --git a/stem/descriptor/certificate.py b/stem/descriptor/certificate.py
index 973957f1..dca54aee 100644
--- a/stem/descriptor/certificate.py
+++ b/stem/descriptor/certificate.py
@@ -470,109 +470,3 @@ class Ed25519CertificateV1(Ed25519Certificate):
       key.verify(self.signature, base64.b64decode(stem.util.str_tools._to_bytes(self.encoded))[:-ED25519_SIGNATURE_LENGTH])
     except InvalidSignature:
       raise ValueError('Ed25519KeyCertificate signing key is invalid (signature forged or corrupt)')
-
-
-class MyED25519Certificate(object):
-  """
-  This class represents an ed25519 certificate and it's made for encoding it into a string.
-  We should merge this class with the one above.
-  """
-  def __init__(self, cert_type, expiration_date,
-               cert_key_type, certified_pub_key,
-               signing_priv_key, include_signing_key,
-               version=1):
-    """
-    :var int version
-    :var stem.client.datatype.CertType cert_type
-    :var int cert_type_int
-    :var datetime expiration_date
-    :var int cert_key_type
-    :var ED25519PublicKey certified_pub_key
-    :var ED25519PrivateKey signing_priv_key
-    :var bool include_signing_key
-    """
-    self.version = version
-    self.cert_type, self.cert_type_int = ClientCertType.get(cert_type)
-    self.expiration_date = expiration_date
-    self.cert_key_type = cert_key_type
-    self.certified_pub_key = certified_pub_key
-
-    self.signing_priv_key = signing_priv_key
-    self.signing_pub_key = signing_priv_key.public_key()
-
-    self.include_signing_key = include_signing_key
-    # XXX validate params
-
-  def _get_certificate_signature(self, msg_body):
-    return self.signing_priv_key.sign(msg_body)
-
-  def _get_cert_extensions_bytes(self):
-    """
-    Build the cert extensions part of the certificate
-    """
-
-    from cryptography.hazmat.primitives import serialization
-    n_extensions = 0
-
-    # If we need to include the signing key, let's create the extension body
-    #         ExtLength [2 bytes]
-    #         ExtType   [1 byte]
-    #         ExtFlags  [1 byte]
-    #         ExtData   [ExtLength bytes]
-    if self.include_signing_key:
-      n_extensions += 1
-
-      signing_pubkey_bytes = self.signing_pub_key.public_bytes(encoding=serialization.Encoding.Raw,
-                                                               format=serialization.PublicFormat.Raw)
-
-      ext_length = len(signing_pubkey_bytes)
-      ext_type = 4
-      ext_flags = 0
-      ext_data = signing_pubkey_bytes
-
-    # Now build the actual byte representation of any extensions
-    ext_obj = bytearray()
-    ext_obj += Size.CHAR.pack(n_extensions)
-
-    if self.include_signing_key:
-      ext_obj += Size.SHORT.pack(ext_length)
-      ext_obj += Size.CHAR.pack(ext_type)
-      ext_obj += Size.CHAR.pack(ext_flags)
-      ext_obj += ext_data
-
-    return bytes(ext_obj)
-
-  def encode(self):
-    """Return a bytes representation of this certificate."""
-    from cryptography.hazmat.primitives import serialization
-    obj = bytearray()
-
-    obj += Size.CHAR.pack(self.version)
-    obj += Size.CHAR.pack(self.cert_type_int)
-
-    # Encode EXPIRATION_DATE
-    expiration_seconds_since_epoch = stem.util.datetime_to_unix(self.expiration_date)
-    expiration_hours_since_epoch = int(expiration_seconds_since_epoch) // 3600
-    obj += Size.LONG.pack(expiration_hours_since_epoch)
-
-    # Encode CERT_KEY_TYPE
-    obj += Size.CHAR.pack(self.cert_key_type)
-
-    # Encode CERTIFIED_KEY
-    certified_pub_key_bytes = self.certified_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
-    assert(len(certified_pub_key_bytes) == 32)
-    obj += certified_pub_key_bytes
-
-    # Encode N_EXTENSIONS and EXTENSIONS
-    obj += self._get_cert_extensions_bytes()
-
-    # Do the signature on the body we have so far
-    obj += self._get_certificate_signature(bytes(obj))
-
-    return bytes(obj)
-
-  def encode_for_descriptor(self):
-    cert_bytes = self.encode()
-    cert_b64 = base64.b64encode(cert_bytes)
-    cert_b64 = b'\n'.join(stem.util.str_tools._split_by_length(cert_b64, 64))
-    return b'-----BEGIN ED25519 CERT-----\n%s\n-----END ED25519 CERT-----' % cert_b64
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index 68d150b1..bb8937d8 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -45,7 +45,6 @@ import stem.util.connection
 import stem.util.str_tools
 import stem.util.tor_tools
 
-from stem.client.datatype import CertType
 from stem.descriptor import hsv3_crypto
 from stem.descriptor.certificate import Ed25519Certificate
 
@@ -310,127 +309,6 @@ class IntroductionPointV3(collections.namedtuple('IntroductionPointV3', ['link_s
     return link_specifiers
 
 
-class AlternateIntroductionPointV3(object):
-  """
-  Introduction point for a v3 hidden service.
-
-  We want this class to satisfy two use cases:
-
-  - Parsing introduction points directly from the HSv3 descriptor and saving
-    their data here.
-
-  - Creating introduction points for inclusion to an HSv3 descriptor at a point
-    where a descriptor signing key is not yet known (because the descriptor is
-    not yet made). In which case, the certificates cannot be created yet and
-    hence need to be created at encoding time.
-
-  .. versionadded:: 1.8.0
-
-  :var list link_specifiers: :class:`~stem.client.datatype.LinkSpecifier` where this service is reachable
-  :var X25519PublicKey onion_key: ntor introduction point public key
-  :var Ed25519PublicKey auth_key: ed25519 authentication key for this intro point
-  :var stem.certificate.Ed25519Certificate auth_key_cert: cross-certifier of the signing key with the auth key
-  :var X25519PublicKey enc_key: introduction request encryption key
-  :var stem.certificate.Ed25519Certificate enc_key_cert: cross-certifier of the signing key by the encryption key
-  :var XXX legacy_key: legacy introduction point RSA public key
-  :var stem.certificate.Ed25519Certificate legacy_key_cert: cross-certifier of the signing key by the legacy key
-
-  :var Ed25519Certificate descriptor_signing_key: hsv3 descriptor signing key (needed to encode the intro point)
-  """
-  def __init__(self, link_specifiers, onion_key, enc_key, auth_key=None, auth_key_cert=None, legacy_key=None, enc_key_cert=None, legacy_key_cert=None):
-    """
-    Initialize this intro point.
-
-    While not all attributes are mandatory, at the very least the link
-    specifiers, the auth key, the onion key and the encryption key need to be
-    provided.
-
-    The certificates can be left out (for example in the case of creating a new
-    intro point), and they will be created at encode time when the
-    descriptor_signing_key is provided.
-    """
-
-    # if not link_specifiers or not onion_key or not enc_key:
-    #   raise ValueError('Introduction point missing essential keys')
-
-    if not auth_key and not auth_key_cert:
-      raise ValueError('Either auth key or auth key cert needs to be provided')
-
-    # If we have an auth key cert but not an auth key, extract the key
-    if auth_key_cert and not auth_key and stem.prereq.is_crypto_available(ed25519 = True):
-      from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey
-      auth_key = Ed25519PublicKey.from_public_bytes(auth_key_cert.key)
-
-    self.link_specifiers = link_specifiers
-    self.onion_key = enc_key
-    self.enc_key = enc_key
-    self.legacy_key = legacy_key
-    self.auth_key = auth_key
-    self.auth_key_cert = auth_key_cert
-    self.enc_key_cert = enc_key_cert
-    self.legacy_key_cert = legacy_key_cert
-
-  def _encode_link_specifier_block(self):
-    """
-    See BUILDING-BLOCKS in rend-spec-v3.txt
-
-         NSPEC      (Number of link specifiers)   [1 byte]
-         NSPEC times:
-           LSTYPE (Link specifier type)           [1 byte]
-           LSLEN  (Link specifier length)         [1 byte]
-           LSPEC  (Link specifier)                [LSLEN bytes]
-    """
-
-    ls_block = b''
-    ls_block += chr(len(self.link_specifiers))
-
-    for ls in self.link_specifiers:
-      ls_block += ls.pack()
-
-    return base64.b64encode(ls_block)
-
-  def encode(self, descriptor_signing_privkey):
-    """
-    Encode this introduction point into bytes
-    """
-
-    if not descriptor_signing_privkey:
-      raise ValueError('Cannot encode: Descriptor signing key not provided')
-
-    from cryptography.hazmat.primitives import serialization
-
-    cert_expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54)
-
-    body = b''
-
-    body += b'introduction-point %s\n' % (self._encode_link_specifier_block())
-
-    # Onion key
-    onion_key_bytes = self.onion_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
-    body += b'onion-key ntor %s\n' % (base64.b64encode(onion_key_bytes))
-
-    # Build auth key certificate
-    auth_key_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_INTRO_AUTH, expiration_date = cert_expiration_date, cert_key_type = 1, certified_pub_key = self.auth_key, signing_priv_key = descriptor_signing_privkey, include_signing_key = True)
-    auth_key_cert_b64_blob = auth_key_cert.encode_for_descriptor()
-    body += b'auth-key\n%s\n' % (auth_key_cert_b64_blob)
-
-    # Build enc key line
-    enc_key_bytes = self.enc_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
-    body += b'enc-key ntor %s\n' % (base64.b64encode(enc_key_bytes))
-
-    # Build enc key cert (this does not actually need to certify anything because of #29583)
-    enc_key_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_NTOR_ENC, expiration_date = cert_expiration_date, cert_key_type = 1, certified_pub_key = self.auth_key, signing_priv_key = descriptor_signing_privkey, include_signing_key = True)
-    enc_key_cert_b64_blob = enc_key_cert.encode_for_descriptor()
-    body += b'enc-key-cert\n%s\n' % (enc_key_cert_b64_blob)
-
-    # We are called to encode legacy key, but we don't know how
-    # TODO do legacy keys!
-    if self.legacy_key or self.legacy_key_cert:
-      raise NotImplementedError
-
-    return body
-
-
 class AuthorizedClient(collections.namedtuple('AuthorizedClient', ['id', 'iv', 'cookie'])):
   """
   Client authorized to use a v3 hidden service.
@@ -889,22 +767,17 @@ def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key
   'blinded_priv_key' key that signs the certificate
   """
 
+  from cryptography.hazmat.primitives import serialization
+
   # 54 hours expiration date like tor does
   expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54)
 
-  desc_signing_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type='HS_V3_DESC_SIGNING',
-                                                                       expiration_date=expiration_date,
-                                                                       cert_key_type=1,
-                                                                       certified_pub_key=descriptor_signing_public_key,
-                                                                       signing_priv_key=blinded_priv_key,
-                                                                       include_signing_key=True)
-
-  signing_cert_bytes = desc_signing_cert.encode()
+  signing_key = descriptor_signing_public_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
+  extensions = [stem.descriptor.certificate.Ed25519Extension(stem.descriptor.certificate.ExtensionType.HAS_SIGNING_KEY, None, blinded_priv_key.public_key().public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))]
 
-  cert_base64 = stem.util.str_tools._to_unicode(base64.b64encode(signing_cert_bytes))
-  cert_blob = '\n'.join(stem.util.str_tools._split_by_length(cert_base64, 64))
+  desc_signing_cert = stem.descriptor.certificate.Ed25519CertificateV1(stem.client.datatype.CertType.HS_V3_DESC_SIGNING, expiration_date, 1, signing_key, extensions, signing_key = blinded_priv_key)
 
-  return '\n-----BEGIN %s-----\n%s\n-----END %s-----' % ('ED25519 CERT', cert_blob, 'ED25519 CERT')
+  return '\n' + desc_signing_cert.to_base64(pem = True)
 
 
 def b64_and_wrap_desc_layer(layer_bytes, prefix_bytes=b''):
@@ -935,10 +808,7 @@ def _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey):
 
   # Now encode all the intro points
   for intro_point in intro_points:
-    if isinstance(intro_point, IntroductionPointV3):
-      final_body += intro_point.encode() + b'\n'
-    else:
-      final_body += intro_point.encode(descriptor_signing_privkey)
+    final_body += intro_point.encode() + b'\n'
 
   return final_body
 
diff --git a/test/unit/descriptor/certificate.py b/test/unit/descriptor/certificate.py
index d2b13f72..74ea0a6e 100644
--- a/test/unit/descriptor/certificate.py
+++ b/test/unit/descriptor/certificate.py
@@ -16,8 +16,6 @@ from stem.client.datatype import Size, CertType
 from stem.descriptor.certificate import ED25519_SIGNATURE_LENGTH, ExtensionType, Ed25519Certificate, Ed25519CertificateV1, Ed25519Extension
 from test.unit.descriptor import get_resource
 
-from cryptography.hazmat.primitives import serialization
-
 ED25519_CERT = """
 AQQABhtZAaW2GoBED1IjY3A6f6GNqBEl5A83fD2Za9upGke51JGqAQAgBABnprVR
 ptIr43bWPo2fIzo3uOywfoMrryprpbm4HhCkZMaO064LP+1KNuLvlc8sGG8lTjx1
@@ -221,30 +219,3 @@ class TestEd25519Certificate(unittest.TestCase):
 
     cert = Ed25519Certificate.from_base64(certificate())
     self.assertRaisesWith(ValueError, 'Ed25519KeyCertificate signing key is invalid (signature forged or corrupt)', cert.validate, desc)
-
-  @test.require.ed25519_support
-  def test_encode_decode_certificate(self):
-    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
-
-    certified_priv_key = Ed25519PrivateKey.generate()
-    certified_pub_key = certified_priv_key.public_key()
-
-    signing_priv_key = Ed25519PrivateKey.generate()
-
-    expiration_date = datetime.datetime(2037, 8, 28, 17, 0)
-
-    my_ed_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_DESC_SIGNING, expiration_date = expiration_date, cert_key_type = 1, certified_pub_key = certified_pub_key, signing_priv_key = signing_priv_key, include_signing_key = True)
-
-    ed_cert_bytes = my_ed_cert.encode()
-    self.assertTrue(my_ed_cert)
-
-    # base64 the cert since that's what the parsing func expects
-    ed_cert_bytes_b64 = base64.b64encode(ed_cert_bytes)
-
-    ed_cert_parsed = Ed25519Certificate.from_base64(ed_cert_bytes_b64)
-
-    self.assertEqual(ed_cert_parsed.type, my_ed_cert.cert_type)
-    self.assertEqual(ed_cert_parsed.expiration, my_ed_cert.expiration_date)
-    self.assertEqual(ed_cert_parsed.key_type, my_ed_cert.cert_key_type)
-    self.assertEqual(ed_cert_parsed.key, my_ed_cert.certified_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))
-    self.assertEqual(ed_cert_parsed.signing_key(), my_ed_cert.signing_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw))
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 060fa4dc..14df30ec 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -22,7 +22,6 @@ from stem.descriptor.hidden_service import (
   REQUIRED_V3_FIELDS,
   X25519_AVAILABLE,
   IntroductionPointV3,
-  AlternateIntroductionPointV3,
   HiddenServiceDescriptorV3,
   OuterLayer,
   InnerLayer,
@@ -123,27 +122,6 @@ def _helper_get_intro():
   return IntroductionPointV3([LinkByIPv4('1.2.3.4', 9001)], base64.b64encode(onion_key), auth_key_cert, base64.b64encode(enc_key), enc_key_cert, None, None)
 
 
-def _helper_get_intro_old():
-  from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
-  from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
-
-  link_specifiers = []
-
-  link1, _ = stem.client.datatype.LinkSpecifier.pop(b'\x03\x20CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC')
-  link_specifiers.append(link1)
-
-  onion_privkey = X25519PrivateKey.generate()
-  onion_pubkey = onion_privkey.public_key()
-
-  auth_privkey = Ed25519PrivateKey.generate()
-  auth_pubkey = auth_privkey.public_key()
-
-  enc_privkey = X25519PrivateKey.generate()
-  enc_pubkey = enc_privkey.public_key()
-
-  return AlternateIntroductionPointV3(link_specifiers, onion_key=onion_pubkey, enc_key=enc_pubkey, auth_key=auth_pubkey)
-
-
 class TestHiddenServiceDescriptorV3(unittest.TestCase):
   def test_real_descriptor(self):
     """





More information about the tor-commits mailing list