commit c723cb5c3eba172b414b61c8e9560ddc89b39303 Author: George Kadianakis desnacked@riseup.net Date: Thu Oct 10 20:43:19 2019 +0300
Implement ed25519 certificate encoding.
I had trouble adapting the certificate.py code to do encoding, since it seems like the Ed25519CertificateV1 class has been made with parsing in mind, but in this case I will need to provide its raw attributes (keys, etc.) and have it encode them into an actual certificate.
I made my own class that is meant for encoding but we should likely merge these into one! --- stem/descriptor/certificate.py | 150 ++++++++++++++++++++++++++++++++++------- 1 file changed, 126 insertions(+), 24 deletions(-)
diff --git a/stem/descriptor/certificate.py b/stem/descriptor/certificate.py index 6ba92ee7..ac6d455f 100644 --- a/stem/descriptor/certificate.py +++ b/stem/descriptor/certificate.py @@ -78,19 +78,19 @@ import stem.prereq import stem.descriptor.server_descriptor import stem.util.enum import stem.util.str_tools +import stem.util + +from cryptography.hazmat.primitives import serialization
ED25519_HEADER_LENGTH = 40 ED25519_SIGNATURE_LENGTH = 64 ED25519_ROUTER_SIGNATURE_PREFIX = b'Tor router descriptor signature v1'
CertType = stem.util.enum.UppercaseEnum( - 'SIGNING', - 'LINK_CERT', - 'AUTH', - 'HS_V3_DESC_SIGNING', - 'HS_V3_INTRO_AUTH', - 'HS_V3_INTRO_ENCRYPT', -) + 'RESERVED_0', 'RESERVED_1', 'RESERVED_2', 'RESERVED_3', + 'SIGNING', 'LINK_CERT', 'AUTH', 'RESERVED_RSA', + 'HS_V3_DESC_SIGNING', 'HS_V3_INTRO_AUTH', 'RESERVED_0A', + 'HS_V3_INTRO_ENC', 'HS_V3_NTOR_ENC')
ExtensionType = stem.util.enum.Enum(('HAS_SIGNING_KEY', 4),) ExtensionFlag = stem.util.enum.UppercaseEnum('AFFECTS_VALIDATION', 'UNKNOWN') @@ -157,6 +157,7 @@ class Ed25519Certificate(object): def _parse(descriptor, entries): value, block_type, block_contents = entries[keyword][0]
+ # XXX ATAGAR This ValueError never actually surfaces if it triggers... if not block_contents or block_type != 'ED25519 CERT': raise ValueError("'%s' should be followed by a ED25519 CERT block, but was a %s" % (keyword, block_type))
@@ -188,26 +189,16 @@ class Ed25519CertificateV1(Ed25519Certificate): raise ValueError('Ed25519 certificate was %i bytes, but should be at least %i' % (len(decoded), ED25519_HEADER_LENGTH + ED25519_SIGNATURE_LENGTH))
cert_type = stem.util.str_tools._to_int(decoded[1:2]) + try: + self.type = CertType.keys()[cert_type] + except IndexError: + raise ValueError('Certificate has wrong cert type')
- if cert_type in (0, 1, 2, 3): + # Catch some invalid cert types + if self.type in ('RESERVED_0', 'RESERVED_1', 'RESERVED_2', 'RESERVED_3'): raise ValueError('Ed25519 certificate cannot have a type of %i. This is reserved to avoid conflicts with tor CERTS cells.' % cert_type) - elif cert_type == 4: - self.type = CertType.SIGNING - elif cert_type == 5: - self.type = CertType.LINK_CERT - elif cert_type == 6: - self.type = CertType.AUTH - elif cert_type == 7: + elif self.type == ('RESERVED_RSA'): raise ValueError('Ed25519 certificate cannot have a type of 7. This is reserved for RSA identity cross-certification.') - elif cert_type == 8: - # see rend-spec-v3.txt appendix E for these definitions - self.type = CertType.HS_V3_DESC_SIGNING - elif cert_type == 9: - self.type = CertType.HS_V3_INTRO_AUTH - elif cert_type == 0x0B: - self.type = CertType.HS_V3_INTRO_ENCRYPT - else: - raise ValueError('Ed25519 certificate type %i is unrecognized' % cert_type)
# expiration time is in hours since epoch try: @@ -333,3 +324,114 @@ class Ed25519CertificateV1(Ed25519Certificate): verify_key.verify(signature_bytes, descriptor_sha256_digest) except InvalidSignature: raise ValueError('Descriptor Ed25519 certificate signature invalid (Signature was forged or corrupt)') + +class MyED25519Certificate(object): + """ + This class represents an ed25519 certificate and it's made for encoding it into a string. + We should merge this class with the one above. + """ + def __init__(self, cert_type, expiration_date, + cert_key_type, certified_pub_key, + signing_priv_key, include_signing_key, + version=1): + """ + :var int version + :var int cert_type + :var CertType cert_type + :var datetime expiration_date + :var int cert_key_type + :var ED25519PublicKey certified_pub_key + :var ED25519PrivateKey signing_priv_key + :var bool include_signing_key + """ + self.version = version + self.cert_type = cert_type + self.expiration_date = expiration_date + self.cert_key_type = cert_key_type + self.certified_pub_key = certified_pub_key + + self.signing_priv_key = signing_priv_key + self.signing_pub_key = signing_priv_key.public_key() + + self.include_signing_key = include_signing_key + # XXX validate params + + def _get_certificate_signature(self, msg_body): + return self.signing_priv_key.sign(msg_body) + + def _get_cert_extensions_bytes(self): + """ + Build the cert extensions part of the certificate + """ + n_extensions = 0 + + # If we need to include the signing key, let's create the extension body + # ExtLength [2 bytes] + # ExtType [1 byte] + # ExtFlags [1 byte] + # ExtData [ExtLength bytes] + if self.include_signing_key: + n_extensions += 1 + + signing_pubkey_bytes = self.signing_pub_key.public_bytes(encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw) + + ext_length = len(signing_pubkey_bytes) + ext_type = 4 + ext_flags = 0 + ext_data = signing_pubkey_bytes + + # Now build the actual byte representation of any extensions + ext_obj = bytes() + ext_obj += n_extensions.to_bytes(1, 'big') + + if self.include_signing_key: + ext_obj += ext_length.to_bytes(2, 'big') + ext_obj += ext_type.to_bytes(1, 'big') + ext_obj += ext_flags.to_bytes(1, 'big') + ext_obj += ext_data + + return ext_obj + + def encode(self): + """Return a bytes representation of this certificate.""" + obj = bytes() + + # Encode VERSION + obj += self.version.to_bytes(1, 'big') + + # Encode CERT_TYPE + try: + cert_type_int = CertType.index_of(self.cert_type) + except ValueError: + raise ValueError("Bad cert type %s" % self.cert_type) + + obj += cert_type_int.to_bytes(1, 'big') + + # Encode EXPIRATION_DATE + expiration_seconds_since_epoch = stem.util.datetime_to_unix(self.expiration_date) + expiration_hours_since_epoch = int(expiration_seconds_since_epoch) // 3600 + obj += expiration_hours_since_epoch.to_bytes(4, 'big') + + # Encode CERT_KEY_TYPE + obj += self.cert_key_type.to_bytes(1, 'big') + + # Encode CERTIFIED_KEY + certified_pub_key_bytes = self.certified_pub_key.public_bytes(encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw) + assert(len(certified_pub_key_bytes) == 32) + obj += certified_pub_key_bytes + + # Encode N_EXTENSIONS and EXTENSIONS + obj += self._get_cert_extensions_bytes() + + # Do the signature on the body we have so far + obj += self._get_certificate_signature(obj) + + return obj + + def encode_for_descriptor(self): + cert_bytes = self.encode() + cert_b64 = base64.b64encode(cert_bytes) + cert_b64 = b'\n'.join(stem.util.str_tools._split_by_length(cert_b64, 64)) + return b'-----BEGIN ED25519 CERT-----\n%s\n-----END ED25519 CERT-----' % cert_b64