commit cf869a877683254ca7b262a989551bc9c6b76204 Author: Damian Johnson atagar@torproject.org Date: Mon May 1 12:56:36 2017 -0700
HidenServiceDescriptor creation --- stem/descriptor/hidden_service_descriptor.py | 31 ++++++++++- test/mocking.py | 51 ------------------ test/unit/descriptor/hidden_service_descriptor.py | 65 ++++++++--------------- 3 files changed, 52 insertions(+), 95 deletions(-)
diff --git a/stem/descriptor/hidden_service_descriptor.py b/stem/descriptor/hidden_service_descriptor.py index 129bc83..a3b12b5 100644 --- a/stem/descriptor/hidden_service_descriptor.py +++ b/stem/descriptor/hidden_service_descriptor.py @@ -31,8 +31,10 @@ import stem.util.connection import stem.util.str_tools
from stem.descriptor import ( + CRYPTO_BLOB, PGP_BLOCK_END, Descriptor, + _descriptor_content, _descriptor_components, _read_until_keywords, _bytes_for_block, @@ -80,6 +82,20 @@ SINGLE_INTRODUCTION_POINT_FIELDS = [ BASIC_AUTH = 1 STEALTH_AUTH = 2
+HIDDEN_SERVICE_HEADER = ( + ('rendezvous-service-descriptor', 'y3olqqblqw2gbh6phimfuiroechjjafa'), + ('version', '2'), + ('permanent-key', '\n-----BEGIN RSA PUBLIC KEY-----%s-----END RSA PUBLIC KEY-----' % CRYPTO_BLOB), + ('secret-id-part', 'e24kgecavwsznj7gpbktqsiwgvngsf4e'), + ('publication-time', '2015-02-23 20:00:00'), + ('protocol-versions', '2,3'), + ('introduction-points', '\n-----BEGIN MESSAGE-----\n-----END MESSAGE-----'), +) + +HIDDEN_SERVICE_FOOTER = ( + ('signature', '\n-----BEGIN SIGNATURE-----%s-----END SIGNATURE-----' % CRYPTO_BLOB), +) +
class IntroductionPoints(collections.namedtuple('IntroductionPoints', INTRODUCTION_POINTS_ATTR.keys())): """ @@ -204,6 +220,9 @@ class HiddenServiceDescriptor(Descriptor): Moved from the deprecated `pycrypto https://www.dlitz.net/software/pycrypto/`_ module to `cryptography https://pypi.python.org/pypi/cryptography`_ for validating signatures. + + .. versionchanged:: 1.6.0 + Added the **skip_crypto_validation** constructor argument. """
ATTRIBUTES = { @@ -230,7 +249,15 @@ class HiddenServiceDescriptor(Descriptor): 'signature': _parse_signature_line, }
- def __init__(self, raw_contents, validate = False): + @classmethod + def content(cls, attr = None, exclude = ()): + return _descriptor_content(attr, exclude, HIDDEN_SERVICE_HEADER, HIDDEN_SERVICE_FOOTER) + + @classmethod + def create(cls, attr = None, exclude = (), validate = True): + return cls(cls.content(attr, exclude), validate = validate, skip_crypto_validation = True) + + def __init__(self, raw_contents, validate = False, skip_crypto_validation = False): super(HiddenServiceDescriptor, self).__init__(raw_contents, lazy_load = not validate) entries = _descriptor_components(raw_contents, validate, non_ascii_fields = ('introduction-points'))
@@ -248,7 +275,7 @@ class HiddenServiceDescriptor(Descriptor):
self._parse(entries, validate)
- if stem.prereq.is_crypto_available(): + if not skip_crypto_validation and stem.prereq.is_crypto_available(): signed_digest = self._digest_for_signature(self.permanent_key, self.signature) content_digest = self._digest_for_content(b'rendezvous-service-descriptor ', b'\nsignature\n')
diff --git a/test/mocking.py b/test/mocking.py index f1d859e..09d3631 100644 --- a/test/mocking.py +++ b/test/mocking.py @@ -23,17 +23,12 @@ Helper functions for creating mock objects. get_router_status_entry_v2 - RouterStatusEntryV2 get_router_status_entry_v3 - RouterStatusEntryV3 get_router_status_entry_micro_v3 - RouterStatusEntryMicroV3 - - stem.descriptor.hidden-service_descriptor - get_hidden_service_descriptor - HiddenServiceDescriptor """
-import base64 import hashlib import itertools import os import re -import textwrap
import stem.descriptor.extrainfo_descriptor import stem.descriptor.hidden_service_descriptor @@ -46,12 +41,6 @@ import stem.response import stem.util.str_tools
try: - # added in python 3.3 - from unittest.mock import Mock, patch -except ImportError: - from mock import Mock, patch - -try: # added in python 2.7 from collections import OrderedDict except ImportError: @@ -138,20 +127,6 @@ NETWORK_STATUS_DOCUMENT_FOOTER = ( ('directory-signature', '%s %s\n%s' % (DOC_SIG.identity, DOC_SIG.key_digest, DOC_SIG.signature)), )
-HIDDEN_SERVICE_HEADER = ( - ('rendezvous-service-descriptor', 'y3olqqblqw2gbh6phimfuiroechjjafa'), - ('version', '2'), - ('permanent-key', '\n-----BEGIN RSA PUBLIC KEY-----%s-----END RSA PUBLIC KEY-----' % CRYPTO_BLOB), - ('secret-id-part', 'e24kgecavwsznj7gpbktqsiwgvngsf4e'), - ('publication-time', '2015-02-23 20:00:00'), - ('protocol-versions', '2,3'), - ('introduction-points', '\n-----BEGIN MESSAGE-----\n-----END MESSAGE-----'), -) - -HIDDEN_SERVICE_FOOTER = ( - ('signature', '\n-----BEGIN SIGNATURE-----%s-----END SIGNATURE-----' % CRYPTO_BLOB), -) -
def get_all_combinations(attr, include_empty = False): """ @@ -372,32 +347,6 @@ def get_router_status_entry_micro_v3(attr = None, exclude = (), content = False) return stem.descriptor.router_status_entry.RouterStatusEntryMicroV3(desc_content, validate = True)
-def get_hidden_service_descriptor(attr = None, exclude = (), content = False, introduction_points_lines = None): - """ - Provides the descriptor content for... - stem.descriptor.hidden_service_descriptor.HidenServiceDescriptor - - :param dict attr: keyword/value mappings to be included in the descriptor - :param list exclude: mandatory keywords to exclude from the descriptor - :param bool content: provides the str content of the descriptor rather than the class if True - :param list introduction_points_lines: lines to be included in the introduction-points field - - :returns: HidenServiceDescriptor for the requested descriptor content - """ - - if (not attr or 'introduction-points' not in attr) and introduction_points_lines is not None: - encoded = base64.b64encode(introduction_points_lines('\n')) - attr['introduction-points'] = '\n-----BEGIN MESSAGE-----\n%s\n-----END MESSAGE-----' % '\n'.join(textwrap.wrap(encoded, 64)) - - desc_content = _get_descriptor_content(attr, exclude, HIDDEN_SERVICE_HEADER, HIDDEN_SERVICE_FOOTER) - - if content: - return desc_content - else: - with patch('stem.prereq.is_crypto_available', Mock(return_value = False)): - return stem.descriptor.hidden_service_descriptor.HiddenServiceDescriptor(desc_content, validate = True) - - def get_directory_authority(attr = None, exclude = (), is_vote = False, content = False): """ Provides the descriptor content for... diff --git a/test/unit/descriptor/hidden_service_descriptor.py b/test/unit/descriptor/hidden_service_descriptor.py index 6f9fb26..48f1d29 100644 --- a/test/unit/descriptor/hidden_service_descriptor.py +++ b/test/unit/descriptor/hidden_service_descriptor.py @@ -3,13 +3,12 @@ Unit tests for stem.descriptor.hidden_service_descriptor. """
import datetime +import functools import unittest
import stem.descriptor import stem.prereq
-from test.mocking import CRYPTO_BLOB, get_hidden_service_descriptor -from test.unit.descriptor import get_resource from test.util import require_cryptography
from stem.descriptor.hidden_service_descriptor import ( @@ -18,6 +17,12 @@ from stem.descriptor.hidden_service_descriptor import ( HiddenServiceDescriptor, )
+from test.unit.descriptor import ( + get_resource, + base_expect_invalid_attr, + base_expect_invalid_attr_for_text, +) + MESSAGE_BLOCK = """ -----BEGIN MESSAGE----- %s @@ -232,6 +237,9 @@ lj/7xMZWDrfyw5H86L0QiaZnkmD+nig1+S+Rn39mmuEgl2iwZO/ihlncUJQTEULb -----END MESSAGE-----\ """
+expect_invalid_attr = functools.partial(base_expect_invalid_attr, HiddenServiceDescriptor, 'descriptor_id', 'y3olqqblqw2gbh6phimfuiroechjjafa') +expect_invalid_attr_for_text = functools.partial(base_expect_invalid_attr_for_text, HiddenServiceDescriptor, 'descriptor_id', 'y3olqqblqw2gbh6phimfuiroechjjafa') +
class TestHiddenServiceDescriptor(unittest.TestCase): def test_for_duckduckgo_with_validation(self): @@ -403,18 +411,18 @@ class TestHiddenServiceDescriptor(unittest.TestCase): Basic sanity check that we can parse a hidden service descriptor with minimal attributes. """
- desc = get_hidden_service_descriptor() + desc = HiddenServiceDescriptor.create()
self.assertEqual('y3olqqblqw2gbh6phimfuiroechjjafa', desc.descriptor_id) self.assertEqual(2, desc.version) - self.assertTrue(CRYPTO_BLOB in desc.permanent_key) + self.assertTrue(stem.descriptor.CRYPTO_BLOB in desc.permanent_key) self.assertEqual('e24kgecavwsznj7gpbktqsiwgvngsf4e', desc.secret_id_part) self.assertEqual(datetime.datetime(2015, 2, 23, 20, 0, 0), desc.published) self.assertEqual([2, 3], desc.protocol_versions) self.assertEqual('-----BEGIN MESSAGE-----\n-----END MESSAGE-----', desc.introduction_points_encoded) self.assertEqual([], desc.introduction_points_auth) self.assertEqual(b'', desc.introduction_points_content) - self.assertTrue(CRYPTO_BLOB in desc.signature) + self.assertTrue(stem.descriptor.CRYPTO_BLOB in desc.signature) self.assertEqual([], desc.introduction_points())
def test_unrecognized_line(self): @@ -422,7 +430,7 @@ class TestHiddenServiceDescriptor(unittest.TestCase): Includes unrecognized content in the descriptor. """
- desc = get_hidden_service_descriptor({'pepperjack': 'is oh so tasty!'}) + desc = HiddenServiceDescriptor.create({'pepperjack': 'is oh so tasty!'}) self.assertEqual(['pepperjack is oh so tasty!'], desc.get_unrecognized_lines())
def test_proceeding_line(self): @@ -430,16 +438,14 @@ class TestHiddenServiceDescriptor(unittest.TestCase): Includes a line prior to the 'rendezvous-service-descriptor' entry. """
- desc_text = b'hibernate 1\n' + get_hidden_service_descriptor(content = True) - self._expect_invalid_attr(desc_text) + expect_invalid_attr_for_text(self, b'hibernate 1\n' + HiddenServiceDescriptor.content())
def test_trailing_line(self): """ Includes a line after the 'router-signature' entry. """
- desc_text = get_hidden_service_descriptor(content = True) + b'\nhibernate 1' - self._expect_invalid_attr(desc_text) + expect_invalid_attr_for_text(self, HiddenServiceDescriptor.content() + b'\nhibernate 1')
def test_required_fields(self): """ @@ -458,10 +464,10 @@ class TestHiddenServiceDescriptor(unittest.TestCase): }
for line in REQUIRED_FIELDS: - desc_text = get_hidden_service_descriptor(content = True, exclude = (line,)) + desc_text = HiddenServiceDescriptor.content(exclude = (line,))
expected = [] if line == 'protocol-versions' else None - self._expect_invalid_attr(desc_text, line_to_attr[line], expected) + expect_invalid_attr_for_text(self, desc_text, line_to_attr[line], expected)
def test_invalid_version(self): """ @@ -475,8 +481,7 @@ class TestHiddenServiceDescriptor(unittest.TestCase): )
for test_value in test_values: - desc_text = get_hidden_service_descriptor({'version': test_value}, content = True) - self._expect_invalid_attr(desc_text, 'version') + expect_invalid_attr(self, {'version': test_value}, 'version')
def test_invalid_protocol_versions(self): """ @@ -495,8 +500,7 @@ class TestHiddenServiceDescriptor(unittest.TestCase): )
for test_value in test_values: - desc_text = get_hidden_service_descriptor({'protocol-versions': test_value}, content = True) - self._expect_invalid_attr(desc_text, 'protocol_versions', []) + expect_invalid_attr(self, {'protocol-versions': test_value}, 'protocol_versions', [])
def test_introduction_points_when_empty(self): """ @@ -505,14 +509,14 @@ class TestHiddenServiceDescriptor(unittest.TestCase): are valid according to the spec. """
- missing_field_desc = get_hidden_service_descriptor(exclude = ('introduction-points',)) + missing_field_desc = HiddenServiceDescriptor.create(exclude = ('introduction-points',))
self.assertEqual(None, missing_field_desc.introduction_points_encoded) self.assertEqual([], missing_field_desc.introduction_points_auth) self.assertEqual(None, missing_field_desc.introduction_points_content) self.assertEqual([], missing_field_desc.introduction_points())
- empty_field_desc = get_hidden_service_descriptor({'introduction-points': MESSAGE_BLOCK % ''}) + empty_field_desc = HiddenServiceDescriptor.create({'introduction-points': MESSAGE_BLOCK % ''})
self.assertEqual((MESSAGE_BLOCK % '').strip(), empty_field_desc.introduction_points_encoded) self.assertEqual([], empty_field_desc.introduction_points_auth) @@ -530,30 +534,7 @@ class TestHiddenServiceDescriptor(unittest.TestCase): )
for test_value in test_values: - desc_text = get_hidden_service_descriptor({'introduction-points': test_value}, content = True) - - desc = self._expect_invalid_attr(desc_text, 'introduction_points_encoded', test_value.strip()) + desc = expect_invalid_attr(self, {'introduction-points': test_value}, 'introduction_points_encoded', test_value.strip()) self.assertEqual([], desc.introduction_points_auth) self.assertEqual(None, desc.introduction_points_content) self.assertEqual([], desc.introduction_points()) - - def _expect_invalid_attr(self, desc_text, attr = None, expected_value = None): - """ - Asserts that construction will fail due to desc_text having a malformed - attribute. If an attr is provided then we check that it matches an expected - value when we're constructed without validation. - """ - - self.assertRaises(ValueError, HiddenServiceDescriptor, desc_text, True) - desc = HiddenServiceDescriptor(desc_text, validate = False) - - if attr: - # check that the invalid attribute matches the expected value when - # constructed without validation - - self.assertEqual(expected_value, getattr(desc, attr)) - else: - # check a default attribute - self.assertEqual('y3olqqblqw2gbh6phimfuiroechjjafa', desc.descriptor_id) - - return desc
tor-commits@lists.torproject.org