commit dc89b293bb3f8a282766971597ada90723f17fa7 Author: Damian Johnson atagar@torproject.org Date: Sat Oct 6 13:51:29 2012 -0700
Unit tests for DirectoryAuthority fields
Tests for the DirectoryAuthority's individual fields, and fixes for a couple issues they uncovered. --- stem/descriptor/networkstatus.py | 33 +++-- .../networkstatus/directory_authority.py | 142 ++++++++++++++++++++ 2 files changed, 164 insertions(+), 11 deletions(-)
diff --git a/stem/descriptor/networkstatus.py b/stem/descriptor/networkstatus.py index 039c90c..80eb17b 100644 --- a/stem/descriptor/networkstatus.py +++ b/stem/descriptor/networkstatus.py @@ -726,14 +726,13 @@ class DirectoryAuthority(stem.descriptor.Descriptor): """
# separate the directory authority entry from its key certificate - key_cert_content = None + key_div = content.find('\ndir-key-certificate-version')
- if is_vote: - key_div = content.find('\ndir-key-certificate-version') - - if key_div != -1: - key_cert_content = content[key_div + 1:] - content = content[:key_div + 1] + if key_div != -1: + key_cert_content = content[key_div + 1:] + content = content[:key_div + 1] + else: + key_cert_content = None
entries, first_keyword, _, _ = stem.descriptor._get_descriptor_components(content, validate)
@@ -743,16 +742,28 @@ class DirectoryAuthority(stem.descriptor.Descriptor): # check that we have mandatory fields
if validate: - required_fields = ["dir-source", "contact"] + required_fields, excluded_fields = ["dir-source", "contact"], []
- if is_vote and not key_cert_content: - raise ValueError("Authority votes must have a key certificate:\n%s" % content) + if is_vote: + if not key_cert_content: + raise ValueError("Authority votes must have a key certificate:\n%s" % content) + + excluded_fields += ["vote-digest"] elif not is_vote: + if key_cert_content: + raise ValueError("Authority consensus entries shouldn't have a key certificate:\n%s" % content) + required_fields += ["vote-digest"] + excluded_fields += ["legacy-dir-key"]
for keyword in required_fields: if not keyword in entries: raise ValueError("Authority entries must have a '%s' line:\n%s" % (keyword, content)) + + for keyword in entries: + if keyword in excluded_fields: + type_label = "votes" if is_vote else "consensus entries" + raise ValueError("Authority %s shouldn't have a '%s' line:\n%s" % (type_label, keyword, content))
for keyword, values in entries.items(): value, block_contents = values[0] @@ -814,7 +825,7 @@ class DirectoryAuthority(stem.descriptor.Descriptor): self._unrecognized_lines.append(line)
if key_cert_content: - self.key_certificate = KeyCertificate(key_cert_content) + self.key_certificate = KeyCertificate(key_cert_content, validate)
def get_unrecognized_lines(self): """ diff --git a/test/unit/descriptor/networkstatus/directory_authority.py b/test/unit/descriptor/networkstatus/directory_authority.py index 1405d12..b032b48 100644 --- a/test/unit/descriptor/networkstatus/directory_authority.py +++ b/test/unit/descriptor/networkstatus/directory_authority.py @@ -106,6 +106,25 @@ class TestDirectoryAuthority(unittest.TestCase): authority = DirectoryAuthority(content, False) self.assertEqual("turtles", authority.nickname)
+ def test_missing_dir_source_field(self): + """ + Excludes fields from the 'dir-source' line. + """ + + for missing_value in AUTHORITY_HEADER[0][1].split(' '): + dir_source = AUTHORITY_HEADER[0][1].replace(missing_value, '').replace(' ', ' ') + content = get_directory_authority({"dir-source": dir_source}, content = True) + self.assertRaises(ValueError, DirectoryAuthority, content) + + authority = DirectoryAuthority(content, False) + + self.assertEqual(None, authority.nickname) + self.assertEqual(None, authority.fingerprint) + self.assertEqual(None, authority.hostname) + self.assertEqual(None, authority.address) + self.assertEqual(None, authority.dir_port) + self.assertEqual(None, authority.or_port) + def test_empty_values(self): """ The 'dir-source' line has a couple string values where anything (without @@ -137,4 +156,127 @@ class TestDirectoryAuthority(unittest.TestCase): dir_source = AUTHORITY_HEADER[0][1].replace('no.place.com', '') authority = get_directory_authority({"dir-source": dir_source}) self.assertEqual('', authority.hostname) + + def test_malformed_fingerprint(self): + """ + Includes a malformed fingerprint on the 'dir-source' line. + """ + + test_values = ( + "", + "zzzzz", + "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz", + ) + + for value in test_values: + dir_source = AUTHORITY_HEADER[0][1].replace('27B6B5996C426270A5C95488AA5BCEB6BCC86956', value) + content = get_directory_authority({"dir-source": dir_source}, content = True) + self.assertRaises(ValueError, DirectoryAuthority, content) + + authority = DirectoryAuthority(content, False) + self.assertEqual(value, authority.fingerprint) + + def test_malformed_address(self): + """ + Includes a malformed ip address on the 'dir-source' line. + """ + + test_values = ( + "", + "71.35.150.", + "71.35..29", + "71.35.150", + "71.35.150.256", + "[fd9f:2e19:3bcf::02:9970]", + ) + + for value in test_values: + dir_source = AUTHORITY_HEADER[0][1].replace('76.73.17.194', value) + content = get_directory_authority({"dir-source": dir_source}, content = True) + self.assertRaises(ValueError, DirectoryAuthority, content) + + authority = DirectoryAuthority(content, False) + self.assertEqual(value, authority.address) + + def test_malformed_port(self): + """ + Includes a malformed orport or dirport on the 'dir-source' line. + """ + + test_values = ( + "", + "-1", + "399482", + "blarg", + ) + + for value in test_values: + for include_or_port in (False, True): + for include_dir_port in (False, True): + if not include_or_port and not include_dir_port: + continue + + dir_source = AUTHORITY_HEADER[0][1] + + if include_or_port: + dir_source = dir_source.replace('9090', value) + + if include_dir_port: + dir_source = dir_source.replace('9030', value) + + content = get_directory_authority({"dir-source": dir_source}, content = True) + self.assertRaises(ValueError, DirectoryAuthority, content) + + authority = DirectoryAuthority(content, False) + + expected_value = 399482 if value == "399482" else None + actual_value = authority.or_port if include_or_port else authority.dir_port + self.assertEqual(expected_value, actual_value) + + def test_legacy_dir_key(self): + """ + Includes a 'legacy-dir-key' line with both valid and invalid content. + """ + + test_value = "65968CCB6BECB5AA88459C5A072624C6995B6B72" + authority = get_directory_authority({"legacy-dir-key": test_value}, is_vote = True) + self.assertEqual(test_value, authority.legacy_dir_key) + + # check that we'll fail if legacy-dir-key appears in a consensus + content = get_directory_authority({"legacy-dir-key": test_value}, content = True) + self.assertRaises(ValueError, DirectoryAuthority, content) + + test_values = ( + "", + "zzzzz", + "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz", + ) + + for value in test_values: + content = get_directory_authority({"legacy-dir-key": value}, content = True) + self.assertRaises(ValueError, DirectoryAuthority, content) + + authority = DirectoryAuthority(content, False) + self.assertEqual(value, authority.legacy_dir_key) + + def test_key_certificate(self): + """ + Includes or exclude a key certificate from the directory entry. + """ + + key_cert = get_key_certificate() + + # include a key cert with a consensus + content = get_directory_authority(content = True) + "\n" + str(key_cert) + self.assertRaises(ValueError, DirectoryAuthority, content) + + authority = DirectoryAuthority(content, False) + self.assertEqual('turtles', authority.nickname) + + # exclude key cert from a vote + content = get_directory_authority(content = True, is_vote = True).replace("\n" + str(key_cert), '') + self.assertRaises(ValueError, DirectoryAuthority, content, True, True) + + authority = DirectoryAuthority(content, False, True) + self.assertEqual('turtles', authority.nickname)