[tor-commits] [stem/master] Inner layer creation and encryption

atagar at torproject.org atagar at torproject.org
Sun Nov 17 23:40:39 UTC 2019


commit 41f8a42a9e8735513b72d9ba3f6a5c11acf02943
Author: Damian Johnson <atagar at torproject.org>
Date:   Mon Nov 11 15:32:55 2019 -0800

    Inner layer creation and encryption
    
    InnerLayer creation, encryption, and test. While _get_superencrypted_blob()
    provided a great demo, it was limited to just introduction points. Now we'll
    support anything the layer does.
---
 stem/descriptor/__init__.py               |  2 +-
 stem/descriptor/hidden_service.py         | 41 +++++++++++++++++++++++---
 test/unit/descriptor/hidden_service_v3.py | 48 +++++++++++++++++++++++++++++++
 3 files changed, 86 insertions(+), 5 deletions(-)

diff --git a/stem/descriptor/__init__.py b/stem/descriptor/__init__.py
index 3a3d1838..862d5aca 100644
--- a/stem/descriptor/__init__.py
+++ b/stem/descriptor/__init__.py
@@ -975,7 +975,7 @@ class Descriptor(object):
     :returns: **bytes** for the descriptor's contents
     """
 
-    return self._raw_contents
+    return stem.util.str_tools._to_bytes(self._raw_contents)
 
   def get_unrecognized_lines(self):
     """
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index 141f54b0..fcd29032 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -920,14 +920,14 @@ def _get_middle_descriptor_layer_body(encrypted):
     b'%s' % (fake_pub_key_bytes_b64, fake_clients, encrypted)
 
 
-def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_counter, blinded_key, subcredential):
+def _get_superencrypted_blob(intro_points, revision_counter, blinded_key, subcredential):
   """
   Get the superencrypted blob (which also includes the encrypted blob) that
   should be attached to the descriptor
   """
 
-  inner_descriptor_layer = stem.util.str_tools._to_bytes('create2-formats 2\n' + '\n'.join(map(IntroductionPointV3.encode, intro_points)) + '\n')
-  inner_ciphertext_b64 = b'encrypted\n' + _encrypt_layer(inner_descriptor_layer, b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
+  inner_layer = InnerLayer.create(introduction_points = intro_points)
+  inner_ciphertext_b64 = b'encrypted\n' + inner_layer.encrypt(revision_counter, blinded_key, subcredential)
 
   middle_descriptor_layer = _get_middle_descriptor_layer_body(inner_ciphertext_b64)
 
@@ -1042,7 +1042,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
     # this descriptor object so that we don't have to carry them around
     # functions and instead we could use e.g. self.descriptor_signing_public_key
     # But because this is a @classmethod this is not possible :/
-    superencrypted_blob = _get_superencrypted_blob(intro_points, signing_key, revision_counter_int, blinded_pubkey_bytes, subcredential)
+    superencrypted_blob = _get_superencrypted_blob(intro_points, revision_counter_int, blinded_pubkey_bytes, subcredential)
 
     desc_content = _descriptor_content(attr, exclude, (
       ('hs-descriptor', '3'),
@@ -1281,6 +1281,24 @@ class InnerLayer(Descriptor):
     plaintext = _decrypt_layer(outer_layer.encrypted, b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
     return InnerLayer(plaintext, validate = True, outer_layer = outer_layer)
 
+  @classmethod
+  def content(cls, attr = None, exclude = (), sign = False, introduction_points = None):
+    if sign:
+      raise NotImplementedError('Signing of %s not implemented' % cls.__name__)
+
+    if introduction_points:
+      suffix = '\n' + '\n'.join(map(IntroductionPointV3.encode, introduction_points))
+    else:
+      suffix = ''
+
+    return _descriptor_content(attr, exclude, (
+      ('create2-formats', '2'),
+    )) + suffix
+
+  @classmethod
+  def create(cls, attr = None, exclude = (), validate = True, sign = False, introduction_points = None):
+    return cls(cls.content(attr, exclude, sign, introduction_points), validate = validate)
+
   def __init__(self, content, validate = False, outer_layer = None):
     super(InnerLayer, self).__init__(content, lazy_load = not validate)
     self.outer = outer_layer
@@ -1304,6 +1322,21 @@ class InnerLayer(Descriptor):
     else:
       self._entries = entries
 
+  def encrypt(self, revision_counter, blinded_key, subcredential):
+    """
+    Encrypts into the content contained within the OuterLayer.
+
+    :param int revision_counter: descriptor revision number
+    :param bytes blinded_key: descriptor signing key
+    :param bytes subcredential: public key hash
+
+    :returns: base64 encoded content of the outer layer's 'encrypted' field
+    """
+
+    if not stem.prereq.is_crypto_available(ed25519 = True):
+      raise ImportError('Hidden service descriptor encryption requires cryptography version 2.6')
+
+    return _encrypt_layer(self.get_bytes(), b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key)
 
 # TODO: drop this alias in stem 2.x
 
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 163ea3db..f6173768 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -28,6 +28,12 @@ from test.unit.descriptor import (
 )
 
 try:
+  # added in python 2.7
+  from collections import OrderedDict
+except ImportError:
+  from stem.util.ordereddict import OrderedDict
+
+try:
   # added in python 3.3
   from unittest.mock import patch, Mock
 except ImportError:
@@ -284,6 +290,48 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
     reparsed = IntroductionPointV3.parse(intro_point.encode())
     self.assertEqual(intro_point, reparsed)
 
+  def test_inner_layer_creation(self):
+    """
+    Internal layer creation.
+    """
+
+    # minimal layer
+
+    self.assertEqual('create2-formats 2', InnerLayer.content())
+    self.assertEqual([2], InnerLayer.create().formats)
+
+    # specify their only mandatory parameter (formats)
+
+    self.assertEqual('create2-formats 1 2 3', InnerLayer.content({'create2-formats': '1 2 3'}))
+    self.assertEqual([1, 2, 3], InnerLayer.create({'create2-formats': '1 2 3'}).formats)
+
+    # include optional parameters
+
+    desc = InnerLayer.create(OrderedDict((
+      ('intro-auth-required', 'ed25519'),
+      ('single-onion-service', ''),
+    )))
+
+    self.assertEqual([2], desc.formats)
+    self.assertEqual(['ed25519'], desc.intro_auth)
+    self.assertEqual(True, desc.is_single_service)
+    self.assertEqual([], desc.introduction_points)
+
+    # include introduction points
+
+    desc = InnerLayer.create(introduction_points = [
+      IntroductionPointV3.create('1.1.1.1', 9001),
+      IntroductionPointV3.create('2.2.2.2', 9001),
+      IntroductionPointV3.create('3.3.3.3', 9001),
+    ])
+
+    self.assertEqual(3, len(desc.introduction_points))
+    self.assertEqual('1.1.1.1', desc.introduction_points[0].link_specifiers[0].address)
+
+    self.assertTrue(InnerLayer.content(introduction_points = [
+      IntroductionPointV3.create('1.1.1.1', 9001),
+    ]).startswith('create2-formats 2\nintroduction-point AQAGAQEBASMp'))
+
   @test.require.ed25519_support
   def test_encode_decode_descriptor(self):
     """





More information about the tor-commits mailing list