[tor-commits] [stem/master] Public decryption method

atagar at torproject.org atagar at torproject.org
Sun Oct 6 02:07:35 UTC 2019


commit 3498ebedccdcd59cfc9072e2f24f5c9eb0c3daae
Author: Damian Johnson <atagar at torproject.org>
Date:   Sat Oct 5 15:06:27 2019 -0700

    Public decryption method
    
    After considering a few APIs descided to keep this simple, and simply return
    the InnerLayer which references the OuterLayer.
---
 stem/descriptor/hidden_service.py         | 58 ++++++++++++++++++++-----------
 test/unit/descriptor/hidden_service_v3.py | 36 ++++++++++++++-----
 2 files changed, 64 insertions(+), 30 deletions(-)

diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py
index 6911fa3c..e4ee13e3 100644
--- a/stem/descriptor/hidden_service.py
+++ b/stem/descriptor/hidden_service.py
@@ -671,8 +671,10 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
   def create(cls, attr = None, exclude = (), validate = True, sign = False):
     return cls(cls.content(attr, exclude, sign), validate = validate, skip_crypto_validation = not sign)
 
-  def __init__(self, raw_contents, validate = False, skip_crypto_validation = False):
+  def __init__(self, raw_contents, validate = False):
     super(HiddenServiceDescriptorV3, self).__init__(raw_contents, lazy_load = not validate)
+
+    self._inner_layer = None
     entries = _descriptor_components(raw_contents, validate)
 
     if validate:
@@ -691,39 +693,49 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
     else:
       self._entries = entries
 
-  # TODO: The following is only marked as private because it is a work in
-  # progress. This will probably become something like "body()" which decrypts
-  # and parses the internal descriptor content.
+  def decrypt(self, onion_address, validate = False):
+    """
+    Decrypt this descriptor. Hidden serice descriptors contain two encryption
+    layers (:class:`~stem.descriptor.hidden_service.OuterLayer` and
+    :class:`~stem.descriptor.hidden_service.InnerLayer`).
+
+    :param str onion_address: hidden service address this descriptor is from
+    :param bool validate: perform validation checks on decrypted content
+
+    :returns: :class:`~stem.descriptor.hidden_service.InnerLayer` with our
+      decrypted content
+
+    :raises:
+      * **ImportError** if required cryptography or sha3 module is unavailable
+      * **ValueError** if unable to decrypt or validation fails
+    """
 
-  def _decrypt(self, onion_address, outer_layer = False):
     if not stem.prereq.is_crypto_available(ed25519 = True):
       raise ImportError('Hidden service descriptor decryption requires cryptography version 2.6')
     elif not stem.prereq._is_sha3_available():
       raise ImportError('Hidden service descriptor decryption requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
 
-    blinded_key = self.signing_cert.signing_key()
-
-    if not blinded_key:
-      raise ValueError('No signing key is present')
+    if self._inner_layer is None:
+      blinded_key = self.signing_cert.signing_key()
 
-    identity_public_key = HiddenServiceDescriptorV3._public_key_from_address(onion_address)
+      if not blinded_key:
+        raise ValueError('No signing key is present')
 
-    # credential = H('credential' | public-identity-key)
-    # subcredential = H('subcredential' | credential | blinded-public-key)
+      identity_public_key = HiddenServiceDescriptorV3._public_key_from_address(onion_address)
 
-    credential = hashlib.sha3_256(b'credential%s' % (identity_public_key)).digest()
-    subcredential = hashlib.sha3_256(b'subcredential%s%s' % (credential, blinded_key)).digest()
+      # credential = H('credential' | public-identity-key)
+      # subcredential = H('subcredential' | credential | blinded-public-key)
 
-    outter_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_outter_layer(self.superencrypted, self.revision_counter, blinded_key, subcredential)
+      credential = hashlib.sha3_256(b'credential%s' % (identity_public_key)).digest()
+      subcredential = hashlib.sha3_256(b'subcredential%s%s' % (credential, blinded_key)).digest()
 
-    if outer_layer:
-      return outter_layer_plaintext
+      outer_layer = OuterLayer(stem.descriptor.hsv3_crypto.decrypt_outter_layer(self.superencrypted, self.revision_counter, blinded_key, subcredential), validate)
 
-    inner_layer_ciphertext = OuterLayer(outter_layer_plaintext).encrypted
+      inner_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_inner_layer(outer_layer.encrypted, self.revision_counter, blinded_key, subcredential)
 
-    inner_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_inner_layer(inner_layer_ciphertext, self.revision_counter, blinded_key, subcredential)
+      self._inner_layer = InnerLayer(inner_layer_plaintext, validate, outer_layer)
 
-    return inner_layer_plaintext
+    return self._inner_layer
 
   @staticmethod
   def _public_key_from_address(onion_address):
@@ -805,6 +817,8 @@ class InnerLayer(Descriptor):
 
   .. versionadded:: 1.8.0
 
+  :var stem.descriptor.hidden_service.OuterLayer outer: enclosing encryption layer
+
   :var list formats: **\\*** recognized CREATE2 cell formats
   :var list intro_auth: **\\*** introduction-layer authentication types
   :var bool is_single_service: **\\*** **True** if this is a `single onion service <https://gitweb.torproject.org/torspec.git/tree/proposals/260-rend-single-onion.txt>`_, **False** otherwise
@@ -827,9 +841,11 @@ class InnerLayer(Descriptor):
     'single-onion-service': _parse_v3_inner_single_service,
   }
 
-  def __init__(self, content, validate = False):
+  def __init__(self, content, validate = False, outer_layer = None):
     super(InnerLayer, self).__init__(content, lazy_load = not validate)
 
+    self.outer = outer_layer
+
     # inner layer begins with a few header fields, followed by multiple any
     # number of introduction-points
 
diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py
index 36b85f7c..093d5cb6 100644
--- a/test/unit/descriptor/hidden_service_v3.py
+++ b/test/unit/descriptor/hidden_service_v3.py
@@ -35,6 +35,15 @@ BDwQZ8rhp05oCqhhY3oFHqG9KS7HGzv9g2v1/PrVJMbkfpwu1YK4b3zIZAk=
 -----END ED25519 CERT-----\
 """
 
+with open(get_resource('hidden_service_v3'), 'rb') as descriptor_file:
+  HS_DESC_STR = descriptor_file.read()
+
+with open(get_resource('hidden_service_v3_outer_layer'), 'rb') as outer_layer_file:
+  OUTER_LAYER_STR = outer_layer_file.read()
+
+with open(get_resource('hidden_service_v3_inner_layer'), 'rb') as inner_layer_file:
+  INNER_LAYER_STR = inner_layer_file.read()
+
 
 class TestHiddenServiceDescriptorV3(unittest.TestCase):
   def test_real_descriptor(self):
@@ -54,20 +63,30 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
     self.assertTrue('eaH8VdaTKS' in desc.superencrypted)
     self.assertEqual('aglChCQF+lbzKgyxJJTpYGVShV/GMDRJ4+cRGCp+a2y/yX/tLSh7hzqI7rVZrUoGj74Xr1CLMYO3fXYCS+DPDQ', desc.signature)
 
-    if stem.prereq.is_crypto_available(ed25519 = True) and stem.prereq._is_sha3_available():
-      with open(get_resource('hidden_service_v3_outer_layer'), 'rb') as outer_layer_file:
-        self.assertEqual(outer_layer_file.read(), desc._decrypt(HS_ADDRESS, outer_layer = True))
+  def test_decryption(self):
+    """
+    Decrypt our descriptor and validate its content.
+    """
+
+    if not stem.prereq.is_crypto_available(ed25519 = True):
+      self.skipTest('(requires cryptography ed25519 support)')
+      return
+    elif not stem.prereq._is_sha3_available():
+      self.skipTest('(requires sha3 support)')
+      return
+
+    desc = HiddenServiceDescriptorV3.from_str(HS_DESC_STR)
+    inner_layer = desc.decrypt(HS_ADDRESS)
 
-      with open(get_resource('hidden_service_v3_inner_layer'), 'rb') as outer_layer_file:
-        self.assertEqual(outer_layer_file.read(), desc._decrypt(HS_ADDRESS, outer_layer = False))
+    self.assertEqual(INNER_LAYER_STR, str(inner_layer))
+    self.assertEqual(OUTER_LAYER_STR.rstrip(b'\x00'), str(inner_layer.outer))
 
   def test_outer_layer(self):
     """
     Parse the outer layer of our test descriptor.
     """
 
-    with open(get_resource('hidden_service_v3_outer_layer'), 'rb') as descriptor_file:
-      desc = OuterLayer(descriptor_file.read())
+    desc = OuterLayer(OUTER_LAYER_STR)
 
     self.assertEqual('x25519', desc.auth_type)
     self.assertEqual('WjZCU9sV1oxkxaPcd7/YozeZgq0lEs6DhWyrdYRNJR4=', desc.ephemeral_key)
@@ -85,8 +104,7 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
     Parse the inner layer of our test descriptor.
     """
 
-    with open(get_resource('hidden_service_v3_inner_layer'), 'rb') as descriptor_file:
-      desc = InnerLayer(descriptor_file.read())
+    desc = InnerLayer(INNER_LAYER_STR)
 
     self.assertEqual([2], desc.formats)
     self.assertEqual(['ed25519'], desc.intro_auth)





More information about the tor-commits mailing list