commit 492e3cf3f24be483aeb409f3ee46e9f54bef4c9d
Author: Damian Johnson <atagar(a)torproject.org>
Date: Wed Oct 23 14:59:06 2019 -0700
Parse Ed25519CertificateV1 using Size class
Our stem.client.datatype's packing/unpacking is considerably more readable than
what I implemented here. We need packing support anyway, so modeling unpacking
after the datatype module's pattern.
---
stem/descriptor/certificate.py | 65 ++++++++++++++++++-------------------
test/unit/descriptor/certificate.py | 30 ++++++++---------
2 files changed, 46 insertions(+), 49 deletions(-)
diff --git a/stem/descriptor/certificate.py b/stem/descriptor/certificate.py
index ba2d1d55..92f75719 100644
--- a/stem/descriptor/certificate.py
+++ b/stem/descriptor/certificate.py
@@ -83,13 +83,14 @@ import stem.prereq
import stem.util.enum
import stem.util.str_tools
-from stem.client.datatype import Size
+from stem.client.datatype import Size, split
# TODO: Importing under an alternate name until we can deprecate our redundant
# CertType enum in Stem 2.x.
from stem.client.datatype import CertType as ClientCertType
+ED25519_KEY_LENGTH = 32
ED25519_HEADER_LENGTH = 40
ED25519_SIGNATURE_LENGTH = 64
@@ -223,6 +224,13 @@ class Ed25519CertificateV1(Ed25519Certificate):
self.extensions = extensions
self.signature = signature
+ if self.type in (ClientCertType.LINK, ClientCertType.IDENTITY, ClientCertType.AUTHENTICATE):
+ raise ValueError('Ed25519 certificate cannot have a type of %i. This is reserved for CERTS cells.' % self.type_int)
+ elif self.type == ClientCertType.ED25519_IDENTITY:
+ raise ValueError('Ed25519 certificate cannot have a type of 7. This is reserved for RSA identity cross-certification.')
+ elif self.type == ClientCertType.UNKNOWN:
+ raise ValueError('Ed25519 certificate type %i is unrecognized' % self.type_int)
+
def to_base64(self, pem = False):
if pem:
return '-----BEGIN ED25519 CERT-----\n%s\n-----END ED25519 CERT-----' % self.encoded
@@ -247,40 +255,33 @@ class Ed25519CertificateV1(Ed25519Certificate):
if len(decoded) < ED25519_HEADER_LENGTH + ED25519_SIGNATURE_LENGTH:
raise ValueError('Ed25519 certificate was %i bytes, but should be at least %i' % (len(decoded), ED25519_HEADER_LENGTH + ED25519_SIGNATURE_LENGTH))
- type_enum, type_int = ClientCertType.get(stem.util.str_tools._to_int(decoded[1:2]))
-
- if type_enum in (ClientCertType.LINK, ClientCertType.IDENTITY, ClientCertType.AUTHENTICATE):
- raise ValueError('Ed25519 certificate cannot have a type of %i. This is reserved for CERTS cells.' % type_int)
- elif type_enum == ClientCertType.ED25519_IDENTITY:
- raise ValueError('Ed25519 certificate cannot have a type of 7. This is reserved for RSA identity cross-certification.')
- elif type_enum == ClientCertType.UNKNOWN:
- raise ValueError('Ed25519 certificate type %i is unrecognized' % type_int)
+ header, signature = split(decoded, len(decoded) - ED25519_SIGNATURE_LENGTH)
- # expiration time is in hours since epoch
- try:
- expiration = datetime.datetime.utcfromtimestamp(stem.util.str_tools._to_int(decoded[2:6]) * 3600)
- except ValueError as exc:
- raise ValueError('Invalid expiration timestamp (%s): %s' % (exc, stem.util.str_tools._to_int(decoded[2:6]) * 3600))
+ version, header = Size.CHAR.pop(header)
+ cert_type, header = Size.CHAR.pop(header)
+ expiration_hours, header = Size.LONG.pop(header)
+ key_type, header = Size.CHAR.pop(header)
+ key, header = split(header, ED25519_KEY_LENGTH)
+ extension_count, extension_data = Size.CHAR.pop(header)
- key_type = stem.util.str_tools._to_int(decoded[6:7])
- key = decoded[7:39]
- signature = decoded[-ED25519_SIGNATURE_LENGTH:]
+ if version != 1:
+ raise ValueError('Ed25519 v1 parser cannot read version %i certificates' % version)
extensions = []
- extension_count = stem.util.str_tools._to_int(decoded[39:40])
- remaining_data = decoded[40:-ED25519_SIGNATURE_LENGTH]
for i in range(extension_count):
- if len(remaining_data) < 4:
+ if len(extension_data) < 4:
raise ValueError('Ed25519 extension is missing header field data')
- extension_length = stem.util.str_tools._to_int(remaining_data[:2])
- extension_type = stem.util.str_tools._to_int(remaining_data[2:3])
- extension_flags = stem.util.str_tools._to_int(remaining_data[3:4])
- extension_data = remaining_data[4:4 + extension_length]
+ extension_length, extension_data = Size.SHORT.pop(extension_data)
+ extension_type, extension_data = Size.CHAR.pop(extension_data)
+ extension_flags, extension_data = Size.CHAR.pop(extension_data)
+ extension_value, extension_data = split(extension_data, extension_length)
- if extension_length != len(extension_data):
- raise ValueError("Ed25519 extension is truncated. It should have %i bytes of data but there's only %i." % (extension_length, len(extension_data)))
+ if extension_length != len(extension_value):
+ raise ValueError("Ed25519 extension is truncated. It should have %i bytes of data but there's only %i." % (extension_length, len(extension_value)))
+ elif extension_type == ExtensionType.HAS_SIGNING_KEY and len(extension_value) != 32:
+ raise ValueError('Ed25519 HAS_SIGNING_KEY extension must be 32 bytes, but was %i.' % len(extension_value))
flags, remaining_flags = [], extension_flags
@@ -291,16 +292,12 @@ class Ed25519CertificateV1(Ed25519Certificate):
if remaining_flags:
flags.append(ExtensionFlag.UNKNOWN)
- if extension_type == ExtensionType.HAS_SIGNING_KEY and len(extension_data) != 32:
- raise ValueError('Ed25519 HAS_SIGNING_KEY extension must be 32 bytes, but was %i.' % len(extension_data))
-
- extensions.append(Ed25519Extension(extension_type, flags, extension_flags, extension_data))
- remaining_data = remaining_data[4 + extension_length:]
+ extensions.append(Ed25519Extension(extension_type, flags, extension_flags, extension_value))
- if remaining_data:
- raise ValueError('Ed25519 certificate had %i bytes of unused extension data' % len(remaining_data))
+ if extension_data:
+ raise ValueError('Ed25519 certificate had %i bytes of unused extension data' % len(extension_data))
- instance = Ed25519CertificateV1(type_int, expiration, key_type, key, extensions, signature)
+ instance = Ed25519CertificateV1(cert_type, datetime.datetime.utcfromtimestamp(expiration_hours * 3600), key_type, key, extensions, signature)
instance.encoded = content
return instance
diff --git a/test/unit/descriptor/certificate.py b/test/unit/descriptor/certificate.py
index 26e091c1..7fd5e731 100644
--- a/test/unit/descriptor/certificate.py
+++ b/test/unit/descriptor/certificate.py
@@ -57,7 +57,7 @@ class TestEd25519Certificate(unittest.TestCase):
signing_key = b'\x11' * 32
cert_bytes = certificate(extension_data = [b'\x00\x20\x04\x07' + signing_key, b'\x00\x00\x05\x04'])
- cert = Ed25519Certificate.parse(cert_bytes)
+ cert = Ed25519Certificate.from_base64(cert_bytes)
self.assertEqual(Ed25519CertificateV1, type(cert))
self.assertEqual(1, cert.version)
@@ -81,7 +81,7 @@ class TestEd25519Certificate(unittest.TestCase):
Parse a certificate from a real server descriptor.
"""
- cert = Ed25519Certificate.parse(ED25519_CERT)
+ cert = Ed25519Certificate.from_base64(ED25519_CERT)
self.assertEqual(Ed25519CertificateV1, type(cert))
self.assertEqual(1, cert.version)
@@ -99,7 +99,7 @@ class TestEd25519Certificate(unittest.TestCase):
"""
exc_msg = re.escape("Ed25519 certificate wasn't propoerly base64 encoded (Incorrect padding):")
- self.assertRaisesRegexp(ValueError, exc_msg, Ed25519Certificate.parse, '\x02\x0323\x04')
+ self.assertRaisesRegexp(ValueError, exc_msg, Ed25519Certificate.from_base64, '\x02\x0323\x04')
def test_too_short(self):
"""
@@ -107,10 +107,10 @@ class TestEd25519Certificate(unittest.TestCase):
"""
exc_msg = "Ed25519 certificate wasn't propoerly base64 encoded (empty):"
- self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.parse, '')
+ self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.from_base64, '')
exc_msg = 'Ed25519 certificate was 18 bytes, but should be at least 104'
- self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.parse, 'AQQABhtZAaW2GoBED1IjY3A6')
+ self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.from_base64, 'AQQABhtZAaW2GoBED1IjY3A6')
def test_with_invalid_version(self):
"""
@@ -119,7 +119,7 @@ class TestEd25519Certificate(unittest.TestCase):
"""
exc_msg = 'Ed25519 certificate is version 2. Parser presently only supports version 1.'
- self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.parse, certificate(version = 2))
+ self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.from_base64, certificate(version = 2))
def test_with_invalid_cert_type(self):
"""
@@ -128,13 +128,13 @@ class TestEd25519Certificate(unittest.TestCase):
"""
exc_msg = 'Ed25519 certificate type 0 is unrecognized'
- self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.parse, certificate(cert_type = 0))
+ self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.from_base64, certificate(cert_type = 0))
exc_msg = 'Ed25519 certificate cannot have a type of 1. This is reserved for CERTS cells.'
- self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.parse, certificate(cert_type = 1))
+ self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.from_base64, certificate(cert_type = 1))
exc_msg = 'Ed25519 certificate cannot have a type of 7. This is reserved for RSA identity cross-certification.'
- self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.parse, certificate(cert_type = 7))
+ self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.from_base64, certificate(cert_type = 7))
def test_truncated_extension(self):
"""
@@ -142,10 +142,10 @@ class TestEd25519Certificate(unittest.TestCase):
"""
exc_msg = 'Ed25519 extension is missing header field data'
- self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.parse, certificate(extension_data = [b'']))
+ self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.from_base64, certificate(extension_data = [b'']))
exc_msg = "Ed25519 extension is truncated. It should have 20480 bytes of data but there's only 2."
- self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.parse, certificate(extension_data = [b'\x50\x00\x00\x00\x15\x12']))
+ self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.from_base64, certificate(extension_data = [b'\x50\x00\x00\x00\x15\x12']))
def test_extra_extension_data(self):
"""
@@ -153,7 +153,7 @@ class TestEd25519Certificate(unittest.TestCase):
"""
exc_msg = 'Ed25519 certificate had 1 bytes of unused extension data'
- self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.parse, certificate(extension_data = [b'\x00\x01\x00\x00\x15\x12']))
+ self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.from_base64, certificate(extension_data = [b'\x00\x01\x00\x00\x15\x12']))
def test_truncated_signing_key(self):
"""
@@ -161,7 +161,7 @@ class TestEd25519Certificate(unittest.TestCase):
"""
exc_msg = 'Ed25519 HAS_SIGNING_KEY extension must be 32 bytes, but was 2.'
- self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.parse, certificate(extension_data = [b'\x00\x02\x04\x07\11\12']))
+ self.assertRaisesWith(ValueError, exc_msg, Ed25519Certificate.from_base64, certificate(extension_data = [b'\x00\x02\x04\x07\11\12']))
@test.require.ed25519_support
def test_validation_with_descriptor_key(self):
@@ -197,7 +197,7 @@ class TestEd25519Certificate(unittest.TestCase):
with open(get_resource('server_descriptor_with_ed25519'), 'rb') as descriptor_file:
desc = next(stem.descriptor.parse_file(descriptor_file, validate = False))
- cert = Ed25519Certificate.parse(certificate())
+ cert = Ed25519Certificate.from_base64(certificate())
self.assertRaisesWith(ValueError, 'Ed25519KeyCertificate signing key is invalid (signature forged or corrupt)', cert.validate, desc)
@test.require.ed25519_support
@@ -219,7 +219,7 @@ class TestEd25519Certificate(unittest.TestCase):
# base64 the cert since that's what the parsing func expects
ed_cert_bytes_b64 = base64.b64encode(ed_cert_bytes)
- ed_cert_parsed = stem.descriptor.certificate.Ed25519Certificate.parse(ed_cert_bytes_b64)
+ ed_cert_parsed = Ed25519Certificate.from_base64(ed_cert_bytes_b64)
self.assertEqual(ed_cert_parsed.type, my_ed_cert.cert_type)
self.assertEqual(ed_cert_parsed.expiration, my_ed_cert.expiration_date)