commit 6ee5abbe5c1137b4a311d5b7ef5003874982008e Author: Damian Johnson atagar@torproject.org Date: Sun Sep 23 14:20:49 2012 -0700
Initial KeyCertificate implementation
First stab at parsing the authority section's key certificates. This is completely untested, next step is to write some unit tests for it. --- stem/descriptor/networkstatus.py | 170 ++++++++++++++++++++++++++++++++++++++ 1 files changed, 170 insertions(+), 0 deletions(-)
diff --git a/stem/descriptor/networkstatus.py b/stem/descriptor/networkstatus.py index 17a9877..46adbfe 100644 --- a/stem/descriptor/networkstatus.py +++ b/stem/descriptor/networkstatus.py @@ -106,6 +106,21 @@ DEFAULT_PARAMS = { "cbtinitialtimeout": 60000, }
+# KeyCertificate fields, tuple is of the form... +# (keyword, is_mandatory) + +KEY_CERTIFICATE_PARAMS = ( + ('dir-key-certificate-version', True), + ('dir-address', False), + ('fingerprint', True), + ('dir-identity-key', True), + ('dir-key-published', True), + ('dir-key-expires', True), + ('dir-signing-key', True), + ('dir-key-crosscert', False), + ('dir-key-certification', True), +) + BANDWIDTH_WEIGHT_ENTRIES = ( "Wbd", "Wbe", "Wbg", "Wbm", "Wdb", @@ -768,6 +783,161 @@ class DirectoryAuthority(stem.descriptor.Descriptor):
return self.unrecognized_lines
+class KeyCertificate(stem.descriptor.Descriptor): + """ + Directory key certificate for a v3 network status document. + + :var int version: ***** version of the key certificate + :var str address: authority's IP address + :var int dir_port: authority's DirPort + :var str fingerprint: ***** authority's fingerprint + :var str identity_key: ***** long term authority identity key + :var datetime published: ***** time when this key was generated + :var datetime expires: ***** time after which this key becomes invalid + :var str signing_key: ***** directory server's public signing key + :var str crosscert: signature made using certificate's signing key + :var str certification: ***** signature of this key certificate signed with the identity key + + ***** mandatory attribute + """ + + def __init__(self, raw_content, validate): + super(KeyCertificate, self).__init__(raw_content) + + self.version = None + self.address = None + self.dir_port = None + self.fingerprint = None + self.identity_key = None + self.published = None + self.expires = None + self.signing_key = None + self.crosscert = None + self.certification = None + + self._unrecognized_lines = [] + + self._parse(raw_contents, validate) + + def _parse(self, content, validate): + """ + Parses the given content and applies the attributes. + + :param str content: descriptor content + :param bool validate: checks validity if True + + :raises: ValueError if a validity check fails + """ + + entries, first_keyword, last_keyword, _ = stem.descriptor._get_descriptor_components(content, validate) + + if validate: + if first_keyword != 'dir-key-certificate-version': + raise ValueError("Key certificates must start with a 'dir-key-certificate-version' line:\n%s" % (content)) + elif last_keyword != 'dir-key-certification': + raise ValueError("Key certificates must end with a 'dir-key-certification' line:\n%s" % (content)) + + # check that we have mandatory fields and that our known fields only + # appear once + + for keyword, is_mandatory in KEY_CERTIFICATE_PARAMS: + if is_mandatory and not keyword in entries: + raise ValueError("Key certificates must have a '%s' line:\n%s" % (keyword, content)) + + entry_count = len(entries.get(keyword, [])) + if entry_count > 1: + raise ValueError("Key certificates can only have a single '%s' line, got %i:\n%s" % (keyword, entry_count, content)) + + # Check that our field's order matches the spec. This isn't explicitely + # stated in the spec, but the network status document requires a specific + # order so it stands to reason that the key certificate (which is in it) + # needs ot match a prescribed order too. + + fields = [attr[0] for attr in KEY_CERTIFICATE_PARAMS] + _check_for_misordered_fields(entries, fields) + + for keyword, values in entries.items(): + value, block_contents = values[0] + line = "%s %s" % (keyword, value) + + if keyword == 'dir-key-certificate-version': + # "dir-key-certificate-version" version + + if not value.isdigit(): + if not validate: continue + raise ValueError("Key certificate has a non-integer version: %s" % line) + + self.version = int(value) + + if validate and self.version != 3: + raise ValueError("Expected a version 3 key certificate, got version '%i' instead" % self.version) + elif keyword == 'dir-address': + # "dir-address" IPPort + + if not ':' in value: + if not validate: continue + raise ValueError("Key certificate's 'dir-address' is expected to be of the form ADDRESS:PORT: %s" % line) + + address, dirport = value.split(':', 1) + + if validate: + if not stem.util.connection.is_valid_ip_address(address): + raise ValueError("Key certificate's address isn't a valid IPv4 address: %s" % line) + elif not stem.util.connection.is_valid_port(dirport): + raise ValueError("Key certificate's dirport is invalid: %s" % line) + + self.address = address + self.dir_port = dirport + elif keyword == 'fingerprint': + # "fingerprint" fingerprint + + if validate and not stem.util.tor_tools.is_valid_fingerprint(value): + raise ValueError("Key certificate's fingerprint is malformed: %s" % line) + + self.fingerprint = value + elif keyword in ('dir-key-published', 'dir-key-expires'): + # "dir-key-published" YYYY-MM-DD HH:MM:SS + # "dir-key-expires" YYYY-MM-DD HH:MM:SS + + try: + date_value = datetime.datetime.strptime(value, "%Y-%m-%d %H:%M:%S") + + if keyword == 'dir-key-published': + self.published = date_value + elif keyword == 'dir-key-expires': + self.expires = date_value + except ValueError: + if validate: + raise ValueError("Key certificate's '%s' time wasn't parseable: %s" % (keyword, value)) + elif keyword in ('dir-identity-key', 'dir-signing-key', 'dir-key-crosscert', 'dir-key-certification'): + # "dir-identity-key" NL a public key in PEM format + # "dir-signing-key" NL a key in PEM format + # "dir-key-crosscert" NL CrossSignature + # "dir-key-certification" NL Signature + + if validate and not block_contents: + raise ValueError("Key certificate's '%s' line must be followed by a key block: %s" % (keyword, line)) + + if keyword == 'dir-identity-key': + self.identity_key = block_contents + elif keyword == 'dir-signing-key': + self.signing_key = block_contents + elif keyword == 'dir-key-crosscert': + self.crosscert = block_contents + elif keyword == 'dir-key-certification': + self.certification = block_contents + else: + self._unrecognized_lines.append(line) + + def get_unrecognized_lines(self): + """ + Returns any unrecognized lines. + + :returns: a list of unrecognized lines + """ + + return self.unrecognized_lines + # TODO: microdescriptors have a slightly different format (including a # 'method') - should probably be a subclass class DocumentSignature(object):
tor-commits@lists.torproject.org