commit b3c42c61ecf883b311af17665abb7213e2361abc Author: Damian Johnson atagar@torproject.org Date: Sat Feb 28 14:35:40 2015 -0800
Unit tests for hidden service descriptor edge cases
Expanding its unit test coverage and few fixes for isssues it surfaced. --- stem/descriptor/hidden_service_descriptor.py | 23 ++-- test/mocking.py | 2 +- test/unit/descriptor/hidden_service_descriptor.py | 149 +++++++++++++++++++++ 3 files changed, 162 insertions(+), 12 deletions(-)
diff --git a/stem/descriptor/hidden_service_descriptor.py b/stem/descriptor/hidden_service_descriptor.py index f247b07..c1970a0 100644 --- a/stem/descriptor/hidden_service_descriptor.py +++ b/stem/descriptor/hidden_service_descriptor.py @@ -16,12 +16,8 @@ the HSDir flag. HiddenServiceDescriptor - Tor hidden service descriptor. """
-# TODO: Add a description for how to retrieve them when tor supports that (#14847). - -# TODO: We should add a '@type hidden-service-descriptor 1.0' annotation to -# CollecTor... -# -# https://collector.torproject.org/formats.html +# TODO: Add a description for how to retrieve them when tor supports that +# (#14847) and then update #15009.
import base64 import collections @@ -141,8 +137,11 @@ def _parse_introduction_points_line(descriptor, entries):
descriptor.introduction_points_encoded = block_contents
- blob = ''.join(block_contents.split('\n')[1:-1]) - decoded_field = base64.b64decode(stem.util.str_tools._to_bytes(blob)) + try: + blob = ''.join(block_contents.split('\n')[1:-1]) + decoded_field = base64.b64decode(stem.util.str_tools._to_bytes(blob)) + except TypeError: + raise ValueError("'introduction-points' isn't base64 encoded content:\n%s" % block_contents)
auth_types = []
@@ -176,11 +175,11 @@ class HiddenServiceDescriptor(Descriptor): values so our descriptor_id can be validated :var datetime published: ***** time in UTC when this descriptor was made :var list protocol_versions: ***** list of **int** versions that are supported when establishing a connection - :var str introduction_points_encoded: ***** raw introduction points blob + :var str introduction_points_encoded: raw introduction points blob :var list introduction_points_auth: ***** tuples of the form (auth_method, auth_data) for our introduction_points_content - :var str introduction_points_content: ***** decoded introduction-points - content without authentication data, if using cookie authentication this is + :var str introduction_points_content: decoded introduction-points content + without authentication data, if using cookie authentication this is encrypted :var str signature: signature of the descriptor content
@@ -196,6 +195,8 @@ class HiddenServiceDescriptor(Descriptor): 'published': (None, _parse_publication_time_line), 'protocol_versions': ([], _parse_protocol_versions_line), 'introduction_points_encoded': (None, _parse_introduction_points_line), + 'introduction_points_auth': ([], _parse_introduction_points_line), + 'introduction_points_content': (None, _parse_introduction_points_line), 'signature': (None, _parse_signature_line), }
diff --git a/test/mocking.py b/test/mocking.py index 604beec..4a3f272 100644 --- a/test/mocking.py +++ b/test/mocking.py @@ -531,7 +531,7 @@ def get_hidden_service_descriptor(attr = None, exclude = (), content = False, in :returns: HidenServiceDescriptor for the requested descriptor content """
- if introduction_points_lines is not None: + if (not attr or 'introduction-points' not in attr) and introduction_points_lines is not None: encoded = base64.b64encode(introduction_points_lines('\n')) attr['introduction-points'] = '\n-----BEGIN MESSAGE-----\n%s\n-----END MESSAGE-----' % '\n'.join(textwrap.wrap(encoded, 64))
diff --git a/test/unit/descriptor/hidden_service_descriptor.py b/test/unit/descriptor/hidden_service_descriptor.py index 1a1c14d..3b9c652 100644 --- a/test/unit/descriptor/hidden_service_descriptor.py +++ b/test/unit/descriptor/hidden_service_descriptor.py @@ -7,9 +7,17 @@ import unittest
import stem.descriptor
+from stem.descriptor.hidden_service_descriptor import REQUIRED_FIELDS, HiddenServiceDescriptor + from test.mocking import CRYPTO_BLOB, get_hidden_service_descriptor from test.unit.descriptor import get_resource
+MESSAGE_BLOCK = """ +-----BEGIN MESSAGE----- +%s +-----END MESSAGE-----\ +""" + EXPECTED_DDG_PERMANENT_KEY = """\ -----BEGIN RSA PUBLIC KEY----- MIGJAoGBAJ/SzzgrXPxTlFrKVhXh3buCWv2QfcNgncUpDpKouLn3AtPH5Ocys0jE @@ -258,3 +266,144 @@ class TestHiddenServiceDescriptor(unittest.TestCase): self.assertEqual('', desc.introduction_points_content) self.assertTrue(CRYPTO_BLOB in desc.signature) self.assertEqual([], desc.introduction_points()) + + def test_unrecognized_line(self): + """ + Includes unrecognized content in the descriptor. + """ + + desc = get_hidden_service_descriptor({'pepperjack': 'is oh so tasty!'}) + self.assertEqual(['pepperjack is oh so tasty!'], desc.get_unrecognized_lines()) + + def test_proceeding_line(self): + """ + Includes a line prior to the 'rendezvous-service-descriptor' entry. + """ + + desc_text = b'hibernate 1\n' + get_hidden_service_descriptor(content = True) + self._expect_invalid_attr(desc_text) + + def test_trailing_line(self): + """ + Includes a line after the 'router-signature' entry. + """ + + desc_text = get_hidden_service_descriptor(content = True) + b'\nhibernate 1' + self._expect_invalid_attr(desc_text) + + def test_required_fields(self): + """ + Check that we require the mandatory fields. + """ + + line_to_attr = { + 'rendezvous-service-descriptor': 'descriptor_id', + 'version': 'version', + 'permanent-key': 'permanent_key', + 'secret-id-part': 'secret_id_part', + 'publication-time': 'published', + 'introduction-points': 'introduction_points_encoded', + 'protocol-versions': 'protocol_versions', + 'signature': 'signature', + } + + for line in REQUIRED_FIELDS: + desc_text = get_hidden_service_descriptor(content = True, exclude = (line,)) + + expected = [] if line == 'protocol-versions' else None + self._expect_invalid_attr(desc_text, line_to_attr[line], expected) + + def test_invalid_version(self): + """ + Checks that our version field expects a numeric value. + """ + + test_values = ( + '', + '-10', + 'hello', + ) + + for test_value in test_values: + desc_text = get_hidden_service_descriptor({'version': test_value}, content = True) + self._expect_invalid_attr(desc_text, 'version') + + def test_invalid_protocol_versions(self): + """ + Checks that our protocol-versions field expects comma separated numeric + values. + """ + + test_values = ( + '', + '-10', + 'hello', + '10,', + ',10', + '10,-10', + '10,hello', + ) + + for test_value in test_values: + desc_text = get_hidden_service_descriptor({'protocol-versions': test_value}, content = True) + self._expect_invalid_attr(desc_text, 'protocol_versions', []) + + def test_introduction_points_when_empty(self): + """ + It's valid to advertise zero introduciton points. I'm not clear if this + would mean an empty protocol-versions field or that it's omitted but either + are valid according to the spec. + """ + + missing_field_desc = get_hidden_service_descriptor(exclude = ('introduction-points',)) + + self.assertEqual(None, missing_field_desc.introduction_points_encoded) + self.assertEqual([], missing_field_desc.introduction_points_auth) + self.assertEqual(None, missing_field_desc.introduction_points_content) + self.assertEqual([], missing_field_desc.introduction_points()) + + empty_field_desc = get_hidden_service_descriptor({'introduction-points': MESSAGE_BLOCK % ''}) + + self.assertEqual((MESSAGE_BLOCK % '').strip(), empty_field_desc.introduction_points_encoded) + self.assertEqual([], empty_field_desc.introduction_points_auth) + self.assertEqual('', empty_field_desc.introduction_points_content) + self.assertEqual([], empty_field_desc.introduction_points()) + + def test_introduction_points_when_not_base64(self): + """ + Checks the introduction-points field when the content isn't base64 encoded. + """ + + test_values = ( + MESSAGE_BLOCK % '12345', + MESSAGE_BLOCK % 'hello', + ) + + for test_value in test_values: + desc_text = get_hidden_service_descriptor({'introduction-points': test_value}, content = True) + + desc = self._expect_invalid_attr(desc_text, 'introduction_points_encoded', test_value.strip()) + self.assertEqual([], desc.introduction_points_auth) + self.assertEqual(None, desc.introduction_points_content) + self.assertEqual([], desc.introduction_points()) + + def _expect_invalid_attr(self, desc_text, attr = None, expected_value = None): + """ + Asserts that construction will fail due to desc_text having a malformed + attribute. If an attr is provided then we check that it matches an expected + value when we're constructed without validation. + """ + + self.assertRaises(ValueError, HiddenServiceDescriptor, desc_text, True) + desc = HiddenServiceDescriptor(desc_text, validate = False) + + if attr: + # check that the invalid attribute matches the expected value when + # constructed without validation + + self.assertEqual(expected_value, getattr(desc, attr)) + else: + # check a default attribute + self.assertEqual('y3olqqblqw2gbh6phimfuiroechjjafa', desc.descriptor_id) + + return desc