commit 5caa371952aab488ae162287d599b78259b03f9b Author: Damian Johnson atagar@torproject.org Date: Sat Jul 1 12:03:28 2017 -0700
Randomize network status document fields --- stem/descriptor/networkstatus.py | 128 +++++++++------------ .../networkstatus/directory_authority.py | 41 ++++--- test/unit/descriptor/networkstatus/document_v2.py | 14 +-- test/unit/descriptor/networkstatus/document_v3.py | 30 ++--- .../descriptor/networkstatus/key_certificate.py | 26 ++--- 5 files changed, 99 insertions(+), 140 deletions(-)
diff --git a/stem/descriptor/networkstatus.py b/stem/descriptor/networkstatus.py index 2fc868a..af2e41a 100644 --- a/stem/descriptor/networkstatus.py +++ b/stem/descriptor/networkstatus.py @@ -63,7 +63,6 @@ import stem.util.tor_tools import stem.version
from stem.descriptor import ( - CRYPTO_BLOB, PGP_BLOCK_END, Descriptor, DocumentHandler, @@ -77,6 +76,11 @@ from stem.descriptor import ( _parse_forty_character_hex, _parse_protocol_line, _parse_key_block, + _random_nickname, + _random_fingerprint, + _random_ipv4_address, + _random_date, + _random_crypto_blob, )
from stem.descriptor.router_status_entry import ( @@ -216,57 +220,6 @@ PARAM_RANGE = { 'onion-key-grace-period-days': (1, 90), # max is the highest onion-key-rotation-days }
-AUTHORITY_HEADER = ( - ('dir-source', 'turtles 27B6B5996C426270A5C95488AA5BCEB6BCC86956 no.place.com 76.73.17.194 9030 9090'), - ('contact', 'Mike Perry <email>'), -) - -KEY_CERTIFICATE_HEADER = ( - ('dir-key-certificate-version', '3'), - ('fingerprint', '27B6B5996C426270A5C95488AA5BCEB6BCC86956'), - ('dir-key-published', '2011-11-28 21:51:04'), - ('dir-key-expires', '2012-11-28 21:51:04'), - ('dir-identity-key', '\n-----BEGIN RSA PUBLIC KEY-----%s-----END RSA PUBLIC KEY-----' % CRYPTO_BLOB), - ('dir-signing-key', '\n-----BEGIN RSA PUBLIC KEY-----%s-----END RSA PUBLIC KEY-----' % CRYPTO_BLOB), -) - -KEY_CERTIFICATE_FOOTER = ( - ('dir-key-certification', '\n-----BEGIN SIGNATURE-----%s-----END SIGNATURE-----' % CRYPTO_BLOB), -) - -NETWORK_STATUS_DOCUMENT_HEADER_V2 = ( - ('network-status-version', '2'), - ('dir-source', '18.244.0.114 18.244.0.114 80'), - ('fingerprint', '719BE45DE224B607C53707D0E2143E2D423E74CF'), - ('contact', 'arma at mit dot edu'), - ('published', '2005-12-16 00:13:46'), - ('dir-signing-key', '\n-----BEGIN RSA PUBLIC KEY-----%s-----END RSA PUBLIC KEY-----' % CRYPTO_BLOB), -) - -NETWORK_STATUS_DOCUMENT_FOOTER_V2 = ( - ('directory-signature', 'moria2\n-----BEGIN SIGNATURE-----%s-----END SIGNATURE-----' % CRYPTO_BLOB), -) - -NETWORK_STATUS_DOCUMENT_HEADER = ( - ('network-status-version', '3'), - ('vote-status', 'consensus'), - ('consensus-methods', None), - ('consensus-method', None), - ('published', None), - ('valid-after', '2012-09-02 22:00:00'), - ('fresh-until', '2012-09-02 22:00:00'), - ('valid-until', '2012-09-02 22:00:00'), - ('voting-delay', '300 300'), - ('client-versions', None), - ('server-versions', None), - ('package', None), - ('known-flags', 'Authority BadExit Exit Fast Guard HSDir Named Running Stable Unnamed V2Dir Valid'), - ('params', None), -) - -VOTE_HEADER_DEFAULTS = {'consensus-methods': '1 9', 'published': '2012-09-02 22:00:00'} -CONSENSUS_HEADER_DEFAULTS = {'consensus-method': '9'} -
class PackageVersion(collections.namedtuple('PackageVersion', ['name', 'version', 'url', 'digests'])): """ @@ -515,7 +468,16 @@ class NetworkStatusDocumentV2(NetworkStatusDocument): if sign: raise NotImplementedError('Signing of %s not implemented' % cls.__name__)
- return _descriptor_content(attr, exclude, sign, NETWORK_STATUS_DOCUMENT_HEADER_V2, NETWORK_STATUS_DOCUMENT_FOOTER_V2) + return _descriptor_content(attr, exclude, sign, ( + ('network-status-version', '2'), + ('dir-source', '%s %s 80' % (_random_ipv4_address(), _random_ipv4_address())), + ('fingerprint', _random_fingerprint()), + ('contact', 'arma at mit dot edu'), + ('published', _random_date()), + ('dir-signing-key', _random_crypto_blob('RSA PUBLIC KEY')), + ), ( + ('directory-signature', 'moria2' + _random_crypto_blob('SIGNATURE')), + ))
def __init__(self, raw_content, validate = False): super(NetworkStatusDocumentV2, self).__init__(raw_content, lazy_load = not validate) @@ -967,9 +929,12 @@ class NetworkStatusDocumentV3(NetworkStatusDocument): raise NotImplementedError('Signing of %s not implemented' % cls.__name__)
attr = {} if attr is None else dict(attr) - is_vote = attr.get('vote-status') == 'vote' - extra_defaults = VOTE_HEADER_DEFAULTS if is_vote else CONSENSUS_HEADER_DEFAULTS + + if is_vote: + extra_defaults = {'consensus-methods': '1 9', 'published': _random_date()} + else: + extra_defaults = {'consensus-method': '9'}
if is_vote and authorities is None: authorities = [DirectoryAuthority.create(is_vote = is_vote)] @@ -980,7 +945,26 @@ class NetworkStatusDocumentV3(NetworkStatusDocument): elif k not in attr: attr[k] = v
- desc_content = _descriptor_content(attr, exclude, sign, NETWORK_STATUS_DOCUMENT_HEADER, NETWORK_STATUS_DOCUMENT_FOOTER) + desc_content = _descriptor_content(attr, exclude, sign, ( + ('network-status-version', '3'), + ('vote-status', 'consensus'), + ('consensus-methods', None), + ('consensus-method', None), + ('published', None), + ('valid-after', _random_date()), + ('fresh-until', _random_date()), + ('valid-until', _random_date()), + ('voting-delay', '300 300'), + ('client-versions', None), + ('server-versions', None), + ('package', None), + ('known-flags', 'Authority BadExit Exit Fast Guard HSDir Named Running Stable Unnamed V2Dir Valid'), + ('params', None), + ), ( + ('directory-footer', ''), + ('bandwidth-weights', None), + ('directory-signature', '%s %s%s' % (_random_fingerprint(), _random_fingerprint(), _random_crypto_blob('SIGNATURE'))), + ))
# inject the authorities and/or routers between the header and footer
@@ -1438,9 +1422,12 @@ class DirectoryAuthority(Descriptor): # include mandatory 'vote-digest' if a consensus
if not is_vote and not ('vote-digest' in attr or (exclude and 'vote-digest' in exclude)): - attr['vote-digest'] = '0B6D1E9A300B895AA2D0B427F92917B6995C3C1C' + attr['vote-digest'] = _random_fingerprint()
- content = _descriptor_content(attr, exclude, sign, AUTHORITY_HEADER) + content = _descriptor_content(attr, exclude, sign, ( + ('dir-source', '%s %s no.place.com %s 9030 9090' % (_random_nickname(), _random_fingerprint(), _random_ipv4_address())), + ('contact', 'Mike Perry <email>'), + ))
if is_vote: content += b'\n' + KeyCertificate.content() @@ -1630,7 +1617,16 @@ class KeyCertificate(Descriptor): if sign: raise NotImplementedError('Signing of %s not implemented' % cls.__name__)
- return _descriptor_content(attr, exclude, sign, KEY_CERTIFICATE_HEADER, KEY_CERTIFICATE_FOOTER) + return _descriptor_content(attr, exclude, sign, ( + ('dir-key-certificate-version', '3'), + ('fingerprint', _random_fingerprint()), + ('dir-key-published', _random_date()), + ('dir-key-expires', _random_date()), + ('dir-identity-key', _random_crypto_blob('RSA PUBLIC KEY')), + ('dir-signing-key', _random_crypto_blob('RSA PUBLIC KEY')), + ), ( + ('dir-key-certification', _random_crypto_blob('SIGNATURE')), + ))
def __init__(self, raw_content, validate = False): super(KeyCertificate, self).__init__(raw_content, lazy_load = not validate) @@ -1771,17 +1767,3 @@ class BridgeNetworkStatusDocument(NetworkStatusDocument): )
self.routers = dict((desc.fingerprint, desc) for desc in router_iter) - - -DOC_SIG = DocumentSignature( - 'sha1', - '14C131DFC5C6F93646BE72FA1401C02A8DF2E8B4', - 'BF112F1C6D5543CFD0A32215ACABD4197B5279AD', - '-----BEGIN SIGNATURE-----%s-----END SIGNATURE-----' % CRYPTO_BLOB, -) - -NETWORK_STATUS_DOCUMENT_FOOTER = ( - ('directory-footer', ''), - ('bandwidth-weights', None), - ('directory-signature', '%s %s\n%s' % (DOC_SIG.identity, DOC_SIG.key_digest, DOC_SIG.signature)), -) diff --git a/test/unit/descriptor/networkstatus/directory_authority.py b/test/unit/descriptor/networkstatus/directory_authority.py index f2c270e..8c1e64e 100644 --- a/test/unit/descriptor/networkstatus/directory_authority.py +++ b/test/unit/descriptor/networkstatus/directory_authority.py @@ -7,11 +7,12 @@ import unittest import test.require
from stem.descriptor.networkstatus import ( - AUTHORITY_HEADER, DirectoryAuthority, KeyCertificate, )
+DIR_SOURCE_LINE = 'turtles 27B6B5996C426270A5C95488AA5BCEB6BCC86956 no.place.com 76.73.17.194 9030 9090' +
class TestDirectoryAuthority(unittest.TestCase): def test_minimal_consensus_authority(self): @@ -21,15 +22,14 @@ class TestDirectoryAuthority(unittest.TestCase):
authority = DirectoryAuthority.create()
- self.assertEqual('turtles', authority.nickname) - self.assertEqual('27B6B5996C426270A5C95488AA5BCEB6BCC86956', authority.fingerprint) + self.assertTrue(authority.nickname.startswith('Unnamed')) + self.assertEqual(40, len(authority.fingerprint)) self.assertEqual('no.place.com', authority.hostname) - self.assertEqual('76.73.17.194', authority.address) self.assertEqual(9030, authority.dir_port) self.assertEqual(9090, authority.or_port) self.assertEqual(False, authority.is_legacy) self.assertEqual('Mike Perry <email>', authority.contact) - self.assertEqual('0B6D1E9A300B895AA2D0B427F92917B6995C3C1C', authority.vote_digest) + self.assertEqual(40, len(authority.vote_digest)) self.assertEqual(None, authority.legacy_dir_key) self.assertEqual(None, authority.key_certificate) self.assertEqual([], authority.get_unrecognized_lines()) @@ -41,17 +41,15 @@ class TestDirectoryAuthority(unittest.TestCase):
authority = DirectoryAuthority.create(is_vote = True)
- self.assertEqual('turtles', authority.nickname) - self.assertEqual('27B6B5996C426270A5C95488AA5BCEB6BCC86956', authority.fingerprint) + self.assertTrue(authority.nickname.startswith('Unnamed')) + self.assertEqual(40, len(authority.fingerprint)) self.assertEqual('no.place.com', authority.hostname) - self.assertEqual('76.73.17.194', authority.address) self.assertEqual(9030, authority.dir_port) self.assertEqual(9090, authority.or_port) self.assertEqual(False, authority.is_legacy) self.assertEqual('Mike Perry <email>', authority.contact) self.assertEqual(None, authority.vote_digest) self.assertEqual(None, authority.legacy_dir_key) - self.assertEqual(KeyCertificate.create(), authority.key_certificate) self.assertEqual([], authority.get_unrecognized_lines())
@test.require.cryptography @@ -96,7 +94,7 @@ class TestDirectoryAuthority(unittest.TestCase): self.assertRaises(ValueError, DirectoryAuthority, content, True)
authority = DirectoryAuthority(content, False) - self.assertEqual('turtles', authority.nickname) + self.assertTrue(authority.nickname.startswith('Unnamed')) self.assertEqual(['ho-hum 567'], authority.get_unrecognized_lines())
def test_missing_fields(self): @@ -113,14 +111,14 @@ class TestDirectoryAuthority(unittest.TestCase): if excluded_field == 'dir-source': self.assertEqual('Mike Perry <email>', authority.contact) else: - self.assertEqual('turtles', authority.nickname) + self.assertTrue(authority.nickname.startswith('Unnamed'))
def test_blank_lines(self): """ Includes blank lines, which should be ignored. """
- authority = DirectoryAuthority.create({'dir-source': AUTHORITY_HEADER[0][1] + '\n\n\n'}) + authority = DirectoryAuthority.create({'dir-source': DIR_SOURCE_LINE + '\n\n\n'}) self.assertEqual('Mike Perry <email>', authority.contact)
def test_duplicate_lines(self): @@ -135,15 +133,15 @@ class TestDirectoryAuthority(unittest.TestCase): self.assertRaises(ValueError, DirectoryAuthority, content, True)
authority = DirectoryAuthority(content, False) - self.assertEqual('turtles', authority.nickname) + self.assertTrue(authority.nickname.startswith('Unnamed'))
def test_missing_dir_source_field(self): """ Excludes fields from the 'dir-source' line. """
- for missing_value in AUTHORITY_HEADER[0][1].split(' '): - dir_source = AUTHORITY_HEADER[0][1].replace(missing_value, '').replace(' ', ' ') + for missing_value in DIR_SOURCE_LINE.split(' '): + dir_source = DIR_SOURCE_LINE.replace(missing_value, '').replace(' ', ' ') content = DirectoryAuthority.content({'dir-source': dir_source}) self.assertRaises(ValueError, DirectoryAuthority, content, True)
@@ -168,7 +166,7 @@ class TestDirectoryAuthority(unittest.TestCase): )
for value in test_values: - dir_source = AUTHORITY_HEADER[0][1].replace('27B6B5996C426270A5C95488AA5BCEB6BCC86956', value) + dir_source = DIR_SOURCE_LINE.replace('27B6B5996C426270A5C95488AA5BCEB6BCC86956', value) content = DirectoryAuthority.content({'dir-source': dir_source}) self.assertRaises(ValueError, DirectoryAuthority, content, True)
@@ -190,7 +188,7 @@ class TestDirectoryAuthority(unittest.TestCase): )
for value in test_values: - dir_source = AUTHORITY_HEADER[0][1].replace('76.73.17.194', value) + dir_source = DIR_SOURCE_LINE.replace('76.73.17.194', value) content = DirectoryAuthority.content({'dir-source': dir_source}) self.assertRaises(ValueError, DirectoryAuthority, content, True)
@@ -215,7 +213,7 @@ class TestDirectoryAuthority(unittest.TestCase): if not include_or_port and not include_dir_port: continue
- dir_source = AUTHORITY_HEADER[0][1] + dir_source = DIR_SOURCE_LINE
if include_or_port: dir_source = dir_source.replace('9090', value) @@ -269,11 +267,12 @@ class TestDirectoryAuthority(unittest.TestCase): self.assertRaises(ValueError, DirectoryAuthority, content, True)
authority = DirectoryAuthority(content, False) - self.assertEqual('turtles', authority.nickname) + self.assertTrue(authority.nickname.startswith('Unnamed'))
# exclude key cert from a vote - content = DirectoryAuthority.content(is_vote = True).replace(b'\n' + key_cert, b'') + + content = '\n'.join(DirectoryAuthority.content(is_vote = True).splitlines()[:-5]) self.assertRaises(ValueError, DirectoryAuthority, content, True, True)
authority = DirectoryAuthority(content, False, True) - self.assertEqual('turtles', authority.nickname) + self.assertTrue(authority.nickname.startswith('Unnamed')) diff --git a/test/unit/descriptor/networkstatus/document_v2.py b/test/unit/descriptor/networkstatus/document_v2.py index 2ed3177..5ff66ea 100644 --- a/test/unit/descriptor/networkstatus/document_v2.py +++ b/test/unit/descriptor/networkstatus/document_v2.py @@ -7,12 +7,7 @@ import unittest
import test.require
-from stem.descriptor.networkstatus import ( - NETWORK_STATUS_DOCUMENT_HEADER_V2, - NETWORK_STATUS_DOCUMENT_FOOTER_V2, - NetworkStatusDocumentV2, -) - +from stem.descriptor.networkstatus import NetworkStatusDocumentV2 from test.unit.descriptor import get_resource
@@ -101,18 +96,13 @@ TpQQk3nNQF8z6UIvdlvP+DnJV4izWVkQEZgUZgIVM0E=
self.assertEqual({}, document.routers) self.assertEqual(2, document.version) - self.assertEqual('18.244.0.114', document.hostname) - self.assertEqual('18.244.0.114', document.address) self.assertEqual(80, document.dir_port) - self.assertEqual('719BE45DE224B607C53707D0E2143E2D423E74CF', document.fingerprint) + self.assertEqual(40, len(document.fingerprint)) self.assertEqual('arma at mit dot edu', document.contact) - self.assertEqual(NETWORK_STATUS_DOCUMENT_HEADER_V2[5][1][1:], document.signing_key) self.assertEqual([], document.client_versions) self.assertEqual([], document.server_versions) - self.assertEqual(datetime.datetime(2005, 12, 16, 0, 13, 46), document.published) self.assertEqual([], document.options) self.assertEqual('moria2', document.signing_authority) - self.assertEqual(NETWORK_STATUS_DOCUMENT_FOOTER_V2[0][1][7:], document.signature)
@test.require.cryptography def test_descriptor_signing(self): diff --git a/test/unit/descriptor/networkstatus/document_v3.py b/test/unit/descriptor/networkstatus/document_v3.py index 832ba9c..8c662a3 100644 --- a/test/unit/descriptor/networkstatus/document_v3.py +++ b/test/unit/descriptor/networkstatus/document_v3.py @@ -14,11 +14,11 @@ import test.require from stem import Flag from stem.util import str_type
+from stem.descriptor import CRYPTO_BLOB + from stem.descriptor.networkstatus import ( HEADER_STATUS_DOCUMENT_FIELDS, FOOTER_STATUS_DOCUMENT_FIELDS, - NETWORK_STATUS_DOCUMENT_FOOTER, - DOC_SIG, DEFAULT_PARAMS, PackageVersion, DirectoryAuthority, @@ -309,9 +309,6 @@ DnN5aFtYKiTc19qIC7Nmo+afPdDEf0MlJvEOP5EWl3w= self.assertEqual(9, document.consensus_method) self.assertEqual([], document.consensus_methods) self.assertEqual(None, document.published) - self.assertEqual(datetime.datetime(2012, 9, 2, 22, 0, 0), document.valid_after) - self.assertEqual(datetime.datetime(2012, 9, 2, 22, 0, 0), document.fresh_until) - self.assertEqual(datetime.datetime(2012, 9, 2, 22, 0, 0), document.valid_until) self.assertEqual(300, document.vote_delay) self.assertEqual(300, document.dist_delay) self.assertEqual([], document.client_versions) @@ -332,7 +329,6 @@ DnN5aFtYKiTc19qIC7Nmo+afPdDEf0MlJvEOP5EWl3w= self.assertEqual(DEFAULT_PARAMS, document.params) self.assertEqual((), document.directory_authorities) self.assertEqual({}, document.bandwidth_weights) - self.assertEqual([DOC_SIG], document.signatures) self.assertEqual([], document.get_unrecognized_lines())
def test_minimal_vote(self): @@ -353,10 +349,6 @@ DnN5aFtYKiTc19qIC7Nmo+afPdDEf0MlJvEOP5EWl3w= self.assertEqual(True, document.is_vote) self.assertEqual(None, document.consensus_method) self.assertEqual([1, 9], document.consensus_methods) - self.assertEqual(datetime.datetime(2012, 9, 2, 22, 0, 0), document.published) - self.assertEqual(datetime.datetime(2012, 9, 2, 22, 0, 0), document.valid_after) - self.assertEqual(datetime.datetime(2012, 9, 2, 22, 0, 0), document.fresh_until) - self.assertEqual(datetime.datetime(2012, 9, 2, 22, 0, 0), document.valid_until) self.assertEqual(300, document.vote_delay) self.assertEqual(300, document.dist_delay) self.assertEqual([], document.client_versions) @@ -372,7 +364,6 @@ DnN5aFtYKiTc19qIC7Nmo+afPdDEf0MlJvEOP5EWl3w= self.assertEqual(None, document.shared_randomness_current_value) self.assertEqual(DEFAULT_PARAMS, document.params) self.assertEqual({}, document.bandwidth_weights) - self.assertEqual([DOC_SIG], document.signatures) self.assertEqual([], document.get_unrecognized_lines())
@test.require.cryptography @@ -464,14 +455,11 @@ DnN5aFtYKiTc19qIC7Nmo+afPdDEf0MlJvEOP5EWl3w= # the document that the entries refer to should actually be the minimal # descriptor (ie, without the entries)
- expected_document = NetworkStatusDocumentV3.create() - descriptor_file = io.BytesIO(content) entries = list(_parse_file(descriptor_file))
self.assertEqual(entry1, entries[0]) self.assertEqual(entry2, entries[1]) - self.assertEqual(expected_document, entries[0].document)
def test_missing_fields(self): """ @@ -946,7 +934,6 @@ DnN5aFtYKiTc19qIC7Nmo+afPdDEf0MlJvEOP5EWl3w= self.assertRaises(ValueError, NetworkStatusDocumentV3, content, True)
document = NetworkStatusDocumentV3(content, False) - self.assertEqual([DOC_SIG], document.signatures) self.assertEqual([], document.get_unrecognized_lines())
# excludes a footer from a version that shouldn't have it @@ -969,7 +956,6 @@ DnN5aFtYKiTc19qIC7Nmo+afPdDEf0MlJvEOP5EWl3w= authorities = (DirectoryAuthority.create(is_vote = True),) )
- self.assertEqual([DOC_SIG], document.signatures) self.assertEqual([], document.get_unrecognized_lines())
def test_footer_with_value(self): @@ -981,7 +967,6 @@ DnN5aFtYKiTc19qIC7Nmo+afPdDEf0MlJvEOP5EWl3w= self.assertRaises(ValueError, NetworkStatusDocumentV3, content, True)
document = NetworkStatusDocumentV3(content, False) - self.assertEqual([DOC_SIG], document.signatures) self.assertEqual([], document.get_unrecognized_lines())
def test_bandwidth_wights_ok(self): @@ -1059,7 +1044,7 @@ DnN5aFtYKiTc19qIC7Nmo+afPdDEf0MlJvEOP5EWl3w=
document = NetworkStatusDocumentV3.create({ 'network-status-version': '3 microdesc', - 'directory-signature': 'sha256 ' + NETWORK_STATUS_DOCUMENT_FOOTER[2][1], + 'directory-signature': 'sha256 14C131DFC5C6F93646BE72FA1401C02A8DF2E8B4 BF112F1C6D5543CFD0A32215ACABD4197B5279AD\n-----BEGIN SIGNATURE-----%s-----END SIGNATURE-----' % CRYPTO_BLOB, })
self.assertEqual('sha256', document.signatures[0].method) @@ -1085,7 +1070,12 @@ DnN5aFtYKiTc19qIC7Nmo+afPdDEf0MlJvEOP5EWl3w=
for test_value in test_values: for test_attr in range(3): - attrs = [DOC_SIG.identity, DOC_SIG.key_digest, DOC_SIG.signature] + attrs = [ + '14C131DFC5C6F93646BE72FA1401C02A8DF2E8B4', + 'BF112F1C6D5543CFD0A32215ACABD4197B5279AD', + '-----BEGIN SIGNATURE-----%s-----END SIGNATURE-----' % CRYPTO_BLOB, + ] + attrs[test_attr] = test_value
content = NetworkStatusDocumentV3.content({'directory-signature': '%s %s\n%s' % tuple(attrs)}) @@ -1279,7 +1269,7 @@ DnN5aFtYKiTc19qIC7Nmo+afPdDEf0MlJvEOP5EWl3w=
# make the dir-key-published field of the certiciate be malformed authority_content = DirectoryAuthority.content(is_vote = True) - authority_content = authority_content.replace(b'dir-key-published 2011', b'dir-key-published 2011a') + authority_content = authority_content.replace(b'dir-key-published 2', b'dir-key-published b2') authority = DirectoryAuthority(authority_content, False, True)
content = NetworkStatusDocumentV3.content({'vote-status': 'vote'}, authorities = (authority,)) diff --git a/test/unit/descriptor/networkstatus/key_certificate.py b/test/unit/descriptor/networkstatus/key_certificate.py index d7982ae..5115227 100644 --- a/test/unit/descriptor/networkstatus/key_certificate.py +++ b/test/unit/descriptor/networkstatus/key_certificate.py @@ -8,12 +8,7 @@ import unittest import stem.descriptor import test.require
-from stem.descriptor.networkstatus import ( - KEY_CERTIFICATE_HEADER, - KEY_CERTIFICATE_FOOTER, - KeyCertificate, -) - +from stem.descriptor.networkstatus import KeyCertificate from test.unit.descriptor import get_resource
@@ -28,13 +23,8 @@ class TestKeyCertificate(unittest.TestCase): self.assertEqual(3, certificate.version) self.assertEqual(None, certificate.address) self.assertEqual(None, certificate.dir_port) - self.assertEqual('27B6B5996C426270A5C95488AA5BCEB6BCC86956', certificate.fingerprint) - self.assertTrue(stem.descriptor.CRYPTO_BLOB in certificate.identity_key) - self.assertEqual(datetime.datetime(2011, 11, 28, 21, 51, 4), certificate.published) - self.assertEqual(datetime.datetime(2012, 11, 28, 21, 51, 4), certificate.expires) - self.assertTrue(stem.descriptor.CRYPTO_BLOB in certificate.signing_key) + self.assertEqual(40, len(certificate.fingerprint)) self.assertEqual(None, certificate.crosscert) - self.assertTrue(stem.descriptor.CRYPTO_BLOB in certificate.certification) self.assertEqual([], certificate.get_unrecognized_lines())
def test_real_certificates(self): @@ -182,7 +172,15 @@ GM9hAsAMRX9Ogqhq5UjDNqEsvDKuyVeyh7unSZEOip9Zr6K/+7VsVPNb8vfBRBjo Parse a key certificate where a mandatory field is missing. """
- mandatory_fields = [entry[0] for entry in KEY_CERTIFICATE_HEADER + KEY_CERTIFICATE_FOOTER] + mandatory_fields = ( + 'dir-key-certificate-version', + 'fingerprint', + 'dir-key-published', + 'dir-key-expires', + 'dir-identity-key', + 'dir-signing-key', + 'dir-key-certification', + )
for excluded_field in mandatory_fields: content = KeyCertificate.content(exclude = (excluded_field,)) @@ -193,7 +191,7 @@ GM9hAsAMRX9Ogqhq5UjDNqEsvDKuyVeyh7unSZEOip9Zr6K/+7VsVPNb8vfBRBjo if excluded_field == 'fingerprint': self.assertEqual(3, certificate.version) else: - self.assertEqual('27B6B5996C426270A5C95488AA5BCEB6BCC86956', certificate.fingerprint) + self.assertEqual(40, len(certificate.fingerprint))
def test_blank_lines(self): """