commit 6f5d91666562e562fd773636ffcd292abff0d939 Author: Damian Johnson atagar@torproject.org Date: Fri Sep 28 08:03:56 2012 -0700
Moving key certificate mocking into util --- test/mocking.py | 33 +++++++++++ .../descriptor/networkstatus/key_certificate.py | 59 ++------------------ 2 files changed, 38 insertions(+), 54 deletions(-)
diff --git a/test/mocking.py b/test/mocking.py index 2dd3319..3d4a615 100644 --- a/test/mocking.py +++ b/test/mocking.py @@ -28,6 +28,7 @@ calling :func:`test.mocking.revert_mocking`. get_relay_extrainfo_descriptor - stem.descriptor.extrainfo_descriptor.RelayExtraInfoDescriptor get_bridge_extrainfo_descriptor - stem.descriptor.extrainfo_descriptor.BridgeExtraInfoDescriptor get_router_status_entry - stem.descriptor.networkstatus.RouterStatusEntry + get_key_certificate - stem.descriptor.networkstatus.KeyCertificate """
import inspect @@ -102,6 +103,19 @@ ROUTER_STATUS_ENTRY_HEADER = ( ("s", "Fast Named Running Stable Valid"), )
+KEY_CERTIFICATE_HEADER = ( + ("dir-key-certificate-version", "3"), + ("fingerprint", "27B6B5996C426270A5C95488AA5BCEB6BCC86956"), + ("dir-key-published", "2011-11-28 21:51:04"), + ("dir-key-expires", "2012-11-28 21:51:04"), + ("dir-identity-key", "\n-----BEGIN RSA PUBLIC KEY-----%s-----END RSA PUBLIC KEY-----" % CRYPTO_BLOB), + ("dir-signing-key", "\n-----BEGIN RSA PUBLIC KEY-----%s-----END RSA PUBLIC KEY-----" % CRYPTO_BLOB), +) + +KEY_CERTIFICATE_FOOTER = ( + ("dir-key-certification", "\n-----BEGIN SIGNATURE-----%s-----END SIGNATURE-----" % CRYPTO_BLOB), +) + def no_op(): def _no_op(*args): pass return _no_op @@ -483,3 +497,22 @@ def get_router_status_entry(attr = None, exclude = (), content = False): else: return stem.descriptor.networkstatus.RouterStatusEntry(desc_content, validate = True)
+def get_key_certificate(attr = None, exclude = (), content = False): + """ + Provides the descriptor content for... + stem.descriptor.networkstatus.KeyCertificate + + :param dict attr: keyword/value mappings to be included in the descriptor + :param list exclude: mandatory keywords to exclude from the descriptor + :param bool content: provides the str content of the descriptor rather than the class if True + + :returns: KeyCertificate for the requested descriptor content + """ + + desc_content = _get_descriptor_content(attr, exclude, KEY_CERTIFICATE_HEADER, KEY_CERTIFICATE_FOOTER) + + if content: + return desc_content + else: + return stem.descriptor.networkstatus.KeyCertificate(desc_content, validate = True) + diff --git a/test/unit/descriptor/networkstatus/key_certificate.py b/test/unit/descriptor/networkstatus/key_certificate.py index 2d779e7..703b61d 100644 --- a/test/unit/descriptor/networkstatus/key_certificate.py +++ b/test/unit/descriptor/networkstatus/key_certificate.py @@ -6,56 +6,7 @@ import datetime import unittest
from stem.descriptor.networkstatus import KeyCertificate - -sig_block = """\ ------BEGIN %s----- -MIGJAoGBAJ5itcJRYNEM3Qf1OVWLRkwjqf84oXPc2ZusaJ5zOe7TVvBMra9GNyc0 -NM9y6zVkHCAePAjr4KbW/8P1olA6FUE2LV9bozaU1jFf6K8B2OELKs5FUEW+n+ic -GM0x6MhngyXonWOcKt5Gj+mAu5lrno9tpNbPkz2Utr/Pi0nsDhWlAgMBAAE= ------END %s-----\ -""" - -RSA_SIG = sig_block % ("RSA PUBLIC KEY", "RSA PUBLIC KEY") -KEY_SIG = sig_block % ("SIGNATURE", "SIGNATURE") - -KEY_CERTIFICATE_ATTR = ( - ("dir-key-certificate-version", "3"), - ("fingerprint", "27B6B5996C426270A5C95488AA5BCEB6BCC86956"), - ("dir-key-published", "2011-11-28 21:51:04"), - ("dir-key-expires", "2012-11-28 21:51:04"), - ("dir-identity-key", "\n" + RSA_SIG), - ("dir-signing-key", "\n" + RSA_SIG), - ("dir-key-certification", "\n" + KEY_SIG), -) - -def get_key_certificate(attr = None, exclude = None): - """ - Constructs a minimal key certificate with the given attributes. - - :param dict attr: keyword/value mappings to be included in the entry - :param list exclude: mandatory keywords to exclude from the entry - - :returns: str with customized key certificate content - """ - - descriptor_lines = [] - if attr is None: attr = {} - if exclude is None: exclude = [] - attr = dict(attr) # shallow copy since we're destructive - - for keyword, value in KEY_CERTIFICATE_ATTR: - if keyword in exclude: continue - elif keyword in attr: - value = attr[keyword] - del attr[keyword] - - descriptor_lines.append("%s %s" % (keyword, value)) - - # dump in any unused attributes - for attr_keyword, attr_value in attr.items(): - descriptor_lines.append("%s %s" % (attr_keyword, attr_value)) - - return "\n".join(descriptor_lines) +from test.mocking import get_key_certificate, CRYPTO_BLOB
class TestKeyCertificate(unittest.TestCase): def test_minimal(self): @@ -63,17 +14,17 @@ class TestKeyCertificate(unittest.TestCase): Parses a minimal key certificate. """
- certificate = KeyCertificate(get_key_certificate()) + certificate = get_key_certificate()
self.assertEqual(3, certificate.version) self.assertEqual(None, certificate.address) self.assertEqual(None, certificate.dir_port) self.assertEqual("27B6B5996C426270A5C95488AA5BCEB6BCC86956", certificate.fingerprint) - self.assertEqual(RSA_SIG, certificate.identity_key) + self.assertTrue(CRYPTO_BLOB in certificate.identity_key) self.assertEqual(datetime.datetime(2011, 11, 28, 21, 51, 4), certificate.published) self.assertEqual(datetime.datetime(2012, 11, 28, 21, 51, 4), certificate.expires) - self.assertEqual(RSA_SIG, certificate.signing_key) + self.assertTrue(CRYPTO_BLOB in certificate.signing_key) self.assertEqual(None, certificate.crosscert) - self.assertEqual(KEY_SIG, certificate.certification) + self.assertTrue(CRYPTO_BLOB in certificate.certification) self.assertEqual([], certificate.get_unrecognized_lines())