[tor-commits] [stem/master] Unit tests for DirectoryAuthority fields

atagar at torproject.org atagar at torproject.org
Sat Oct 13 18:35:45 UTC 2012


commit dc89b293bb3f8a282766971597ada90723f17fa7
Author: Damian Johnson <atagar at torproject.org>
Date:   Sat Oct 6 13:51:29 2012 -0700

    Unit tests for DirectoryAuthority fields
    
    Tests for the DirectoryAuthority's individual fields, and fixes for a couple
    issues they uncovered.
---
 stem/descriptor/networkstatus.py                   |   33 +++--
 .../networkstatus/directory_authority.py           |  142 ++++++++++++++++++++
 2 files changed, 164 insertions(+), 11 deletions(-)

diff --git a/stem/descriptor/networkstatus.py b/stem/descriptor/networkstatus.py
index 039c90c..80eb17b 100644
--- a/stem/descriptor/networkstatus.py
+++ b/stem/descriptor/networkstatus.py
@@ -726,14 +726,13 @@ class DirectoryAuthority(stem.descriptor.Descriptor):
     """
     
     # separate the directory authority entry from its key certificate
-    key_cert_content = None
+    key_div = content.find('\ndir-key-certificate-version')
     
-    if is_vote:
-      key_div = content.find('\ndir-key-certificate-version')
-      
-      if key_div != -1:
-        key_cert_content = content[key_div + 1:]
-        content = content[:key_div + 1]
+    if key_div != -1:
+      key_cert_content = content[key_div + 1:]
+      content = content[:key_div + 1]
+    else:
+      key_cert_content = None
     
     entries, first_keyword, _, _ = stem.descriptor._get_descriptor_components(content, validate)
     
@@ -743,16 +742,28 @@ class DirectoryAuthority(stem.descriptor.Descriptor):
     # check that we have mandatory fields
     
     if validate:
-      required_fields = ["dir-source", "contact"]
+      required_fields, excluded_fields = ["dir-source", "contact"], []
       
-      if is_vote and not key_cert_content:
-        raise ValueError("Authority votes must have a key certificate:\n%s" % content)
+      if is_vote:
+        if not key_cert_content:
+          raise ValueError("Authority votes must have a key certificate:\n%s" % content)
+        
+        excluded_fields += ["vote-digest"]
       elif not is_vote:
+        if key_cert_content:
+          raise ValueError("Authority consensus entries shouldn't have a key certificate:\n%s" % content)
+        
         required_fields += ["vote-digest"]
+        excluded_fields += ["legacy-dir-key"]
       
       for keyword in required_fields:
         if not keyword in entries:
           raise ValueError("Authority entries must have a '%s' line:\n%s" % (keyword, content))
+      
+      for keyword in entries:
+        if keyword in excluded_fields:
+          type_label = "votes" if is_vote else "consensus entries"
+          raise ValueError("Authority %s shouldn't have a '%s' line:\n%s" % (type_label, keyword, content))
     
     for keyword, values in entries.items():
       value, block_contents = values[0]
@@ -814,7 +825,7 @@ class DirectoryAuthority(stem.descriptor.Descriptor):
         self._unrecognized_lines.append(line)
     
     if key_cert_content:
-      self.key_certificate = KeyCertificate(key_cert_content)
+      self.key_certificate = KeyCertificate(key_cert_content, validate)
   
   def get_unrecognized_lines(self):
     """
diff --git a/test/unit/descriptor/networkstatus/directory_authority.py b/test/unit/descriptor/networkstatus/directory_authority.py
index 1405d12..b032b48 100644
--- a/test/unit/descriptor/networkstatus/directory_authority.py
+++ b/test/unit/descriptor/networkstatus/directory_authority.py
@@ -106,6 +106,25 @@ class TestDirectoryAuthority(unittest.TestCase):
       authority = DirectoryAuthority(content, False)
       self.assertEqual("turtles", authority.nickname)
   
+  def test_missing_dir_source_field(self):
+    """
+    Excludes fields from the 'dir-source' line.
+    """
+    
+    for missing_value in AUTHORITY_HEADER[0][1].split(' '):
+      dir_source = AUTHORITY_HEADER[0][1].replace(missing_value, '').replace('  ', ' ')
+      content = get_directory_authority({"dir-source": dir_source}, content = True)
+      self.assertRaises(ValueError, DirectoryAuthority, content)
+      
+      authority = DirectoryAuthority(content, False)
+      
+      self.assertEqual(None, authority.nickname)
+      self.assertEqual(None, authority.fingerprint)
+      self.assertEqual(None, authority.hostname)
+      self.assertEqual(None, authority.address)
+      self.assertEqual(None, authority.dir_port)
+      self.assertEqual(None, authority.or_port)
+  
   def test_empty_values(self):
     """
     The 'dir-source' line has a couple string values where anything (without
@@ -137,4 +156,127 @@ class TestDirectoryAuthority(unittest.TestCase):
     dir_source = AUTHORITY_HEADER[0][1].replace('no.place.com', '')
     authority = get_directory_authority({"dir-source": dir_source})
     self.assertEqual('', authority.hostname)
+  
+  def test_malformed_fingerprint(self):
+    """
+    Includes a malformed fingerprint on the 'dir-source' line.
+    """
+    
+    test_values = (
+      "",
+      "zzzzz",
+      "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
+    )
+    
+    for value in test_values:
+      dir_source = AUTHORITY_HEADER[0][1].replace('27B6B5996C426270A5C95488AA5BCEB6BCC86956', value)
+      content = get_directory_authority({"dir-source": dir_source}, content = True)
+      self.assertRaises(ValueError, DirectoryAuthority, content)
+      
+      authority = DirectoryAuthority(content, False)
+      self.assertEqual(value, authority.fingerprint)
+  
+  def test_malformed_address(self):
+    """
+    Includes a malformed ip address on the 'dir-source' line.
+    """
+    
+    test_values = (
+      "",
+      "71.35.150.",
+      "71.35..29",
+      "71.35.150",
+      "71.35.150.256",
+      "[fd9f:2e19:3bcf::02:9970]",
+    )
+    
+    for value in test_values:
+      dir_source = AUTHORITY_HEADER[0][1].replace('76.73.17.194', value)
+      content = get_directory_authority({"dir-source": dir_source}, content = True)
+      self.assertRaises(ValueError, DirectoryAuthority, content)
+      
+      authority = DirectoryAuthority(content, False)
+      self.assertEqual(value, authority.address)
+  
+  def test_malformed_port(self):
+    """
+    Includes a malformed orport or dirport on the 'dir-source' line.
+    """
+    
+    test_values = (
+      "",
+      "-1",
+      "399482",
+      "blarg",
+    )
+    
+    for value in test_values:
+      for include_or_port in (False, True):
+        for include_dir_port in (False, True):
+          if not include_or_port and not include_dir_port:
+            continue
+          
+          dir_source = AUTHORITY_HEADER[0][1]
+          
+          if include_or_port:
+            dir_source = dir_source.replace('9090', value)
+          
+          if include_dir_port:
+            dir_source = dir_source.replace('9030', value)
+          
+          content = get_directory_authority({"dir-source": dir_source}, content = True)
+          self.assertRaises(ValueError, DirectoryAuthority, content)
+          
+          authority = DirectoryAuthority(content, False)
+          
+          expected_value = 399482 if value == "399482" else None
+          actual_value = authority.or_port if include_or_port else authority.dir_port
+          self.assertEqual(expected_value, actual_value)
+  
+  def test_legacy_dir_key(self):
+    """
+    Includes a 'legacy-dir-key' line with both valid and invalid content.
+    """
+    
+    test_value = "65968CCB6BECB5AA88459C5A072624C6995B6B72"
+    authority = get_directory_authority({"legacy-dir-key": test_value}, is_vote = True)
+    self.assertEqual(test_value, authority.legacy_dir_key)
+    
+    # check that we'll fail if legacy-dir-key appears in a consensus
+    content = get_directory_authority({"legacy-dir-key": test_value}, content = True)
+    self.assertRaises(ValueError, DirectoryAuthority, content)
+    
+    test_values = (
+      "",
+      "zzzzz",
+      "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
+    )
+    
+    for value in test_values:
+      content = get_directory_authority({"legacy-dir-key": value}, content = True)
+      self.assertRaises(ValueError, DirectoryAuthority, content)
+      
+      authority = DirectoryAuthority(content, False)
+      self.assertEqual(value, authority.legacy_dir_key)
+  
+  def test_key_certificate(self):
+    """
+    Includes or exclude a key certificate from the directory entry.
+    """
+    
+    key_cert = get_key_certificate()
+    
+    # include a key cert with a consensus
+    content = get_directory_authority(content = True) + "\n" + str(key_cert)
+    self.assertRaises(ValueError, DirectoryAuthority, content)
+    
+    authority = DirectoryAuthority(content, False)
+    self.assertEqual('turtles', authority.nickname)
+    
+    # exclude  key cert from a vote
+    content = get_directory_authority(content = True, is_vote = True).replace("\n" + str(key_cert), '')
+    self.assertRaises(ValueError, DirectoryAuthority, content, True, True)
+    
+    authority = DirectoryAuthority(content, False, True)
+    self.assertEqual('turtles', authority.nickname)
 





More information about the tor-commits mailing list