commit 382fdadb81c42d5ddfda83ada3ba93a8c1aea677 Author: Damian Johnson atagar@torproject.org Date: Wed Oct 2 13:31:15 2019 -0700
Adjust and test decode_address()
Ok, I really love George's decode_address() function. Clear, elegant, and self contained. Not doing anything important to it - just some minor adjustments and added test coverage. --- stem/descriptor/hidden_service.py | 56 ++++++++++++++++++++++--------- stem/descriptor/hsv3_crypto.py | 35 ------------------- test/unit/descriptor/hidden_service_v3.py | 12 +++++++ 3 files changed, 52 insertions(+), 51 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py index 38c7a34b..b1644f81 100644 --- a/stem/descriptor/hidden_service.py +++ b/stem/descriptor/hidden_service.py @@ -101,6 +101,7 @@ SINGLE_INTRODUCTION_POINT_FIELDS = [
BASIC_AUTH = 1 STEALTH_AUTH = 2 +CHECKSUM_CONSTANT = b'.onion checksum'
class IntroductionPoints(collections.namedtuple('IntroductionPoints', INTRODUCTION_POINTS_ATTR.keys())): @@ -556,23 +557,14 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): # and parses the internal descriptor content.
def _decrypt(self, onion_address, outer_layer = False): - if onion_address.endswith('.onion'): - onion_address = onion_address[:-6] - if not stem.prereq.is_crypto_available(ed25519 = True): raise ImportError('Hidden service descriptor decryption requires cryptography version 2.6') elif not stem.prereq._is_sha3_available(): raise ImportError('Hidden service descriptor decryption requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)') - elif not stem.util.tor_tools.is_valid_hidden_service_address(onion_address, version = 3): - raise ValueError("'%s.onion' isn't a valid hidden service v3 address" % onion_address)
cert_lines = self.signing_cert.split('\n') desc_signing_cert = stem.descriptor.certificate.Ed25519Certificate.parse(''.join(cert_lines[1:-1]))
- # Get crypto material. - # ASN XXX Extract to its own function and assign them to class variables - from cryptography.hazmat.primitives import serialization - for extension in desc_signing_cert.extensions: if extension.type == ExtensionType.HAS_SIGNING_KEY: blinded_key_bytes = extension.data @@ -581,23 +573,55 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): if not blinded_key_bytes: raise ValueError('No signing key extension present')
- identity_public_key = stem.descriptor.hsv3_crypto.decode_address(onion_address) - identity_public_key_bytes = identity_public_key.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw) - subcredential_bytes = stem.descriptor.hsv3_crypto.get_subcredential(identity_public_key_bytes, blinded_key_bytes) + identity_public_key = HiddenServiceDescriptorV3._public_key_from_address(onion_address) + subcredential_bytes = stem.descriptor.hsv3_crypto.get_subcredential(identity_public_key, blinded_key_bytes)
- outter_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_outter_layer(self.superencrypted, self.revision_counter, identity_public_key_bytes, blinded_key_bytes, subcredential_bytes) + outter_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_outter_layer(self.superencrypted, self.revision_counter, identity_public_key, blinded_key_bytes, subcredential_bytes)
if outer_layer: return outter_layer_plaintext
- # ATAGAR XXX this parsing function is a hack. need to replace it with some stem parsing. inner_layer_ciphertext = stem.descriptor.hsv3_crypto.parse_superencrypted_plaintext(outter_layer_plaintext)
- inner_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_inner_layer(inner_layer_ciphertext, self.revision_counter, identity_public_key_bytes, blinded_key_bytes, subcredential_bytes) + inner_layer_plaintext = stem.descriptor.hsv3_crypto.decrypt_inner_layer(inner_layer_ciphertext, self.revision_counter, identity_public_key, blinded_key_bytes, subcredential_bytes)
return inner_layer_plaintext
+ @staticmethod + def _public_key_from_address(onion_address): + # provides our hidden service ed25519 public key + + if onion_address.endswith('.onion'): + onion_address = onion_address[:-6] + + if not stem.util.tor_tools.is_valid_hidden_service_address(onion_address, version = 3): + raise ValueError("'%s.onion' isn't a valid hidden service v3 address" % onion_address) + + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey + + # onion_address = base32(PUBKEY | CHECKSUM | VERSION) + '.onion' + # CHECKSUM = H('.onion checksum' | PUBKEY | VERSION)[:2] + + decoded_address = base64.b32decode(onion_address.upper()) + + pubkey = decoded_address[:32] + checksum = decoded_address[32:34] + version = decoded_address[34:35] + + # validate our address checksum + + my_checksum_body = b'%s%s%s' % (CHECKSUM_CONSTANT, pubkey, version) + my_checksum = hashlib.sha3_256(my_checksum_body).digest()[:2] + + if (checksum != my_checksum): + raise ValueError('Bad checksum (expected %s but was %s)' % (binascii.hexlify(checksum), binascii.hexlify(my_checksum))) + + return Ed25519PublicKey.from_public_bytes(pubkey).public_bytes( + encoding = serialization.Encoding.Raw, + format = serialization.PublicFormat.Raw + ) +
# TODO: drop this alias in stem 2.x
diff --git a/stem/descriptor/hsv3_crypto.py b/stem/descriptor/hsv3_crypto.py index 7b651418..e6e8c03e 100644 --- a/stem/descriptor/hsv3_crypto.py +++ b/stem/descriptor/hsv3_crypto.py @@ -1,5 +1,4 @@ import base64 -import binascii import hashlib import struct
@@ -15,40 +14,6 @@ Onion addresses - CHECKSUM is truncated to two bytes before inserting it in onion_address """
-CHECKSUM_CONSTANT = b'.onion checksum' - - -def decode_address(onion_address): - """ - Parse onion_address_str and return the pubkey. - - onion_address = base32(PUBKEY | CHECKSUM | VERSION) + '.onion' - CHECKSUM = H('.onion checksum' | PUBKEY | VERSION)[:2] - - :return: Ed25519PublicKey - - :raises: ValueError - """ - - from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey - - # base32 decode the addr (convert to uppercase since that's what python expects) - onion_address = base64.b32decode(onion_address.upper()) - - # extract pieces of information - pubkey = onion_address[:32] - checksum = onion_address[32:34] - version = onion_address[34:35] - - # Do checksum validation - my_checksum_body = b'%s%s%s' % (CHECKSUM_CONSTANT, pubkey, version) - my_checksum = hashlib.sha3_256(my_checksum_body).digest() - - if (checksum != my_checksum[:2]): - raise ValueError('Bad checksum (expected %s but was %s)' % (binascii.hexlify(checksum), binascii.hexlify(my_checksum[:2]))) - - return Ed25519PublicKey.from_public_bytes(pubkey) -
""" Blinded key stuff diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py index 60cc06b8..2242d617 100644 --- a/test/unit/descriptor/hidden_service_v3.py +++ b/test/unit/descriptor/hidden_service_v3.py @@ -117,3 +117,15 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
for test_value in test_values: expect_invalid_attr(self, {'revision-counter': test_value}, 'revision_counter') + + def test_public_key_from_address(self): + if not stem.prereq.is_crypto_available(ed25519 = True): + self.skipTest('(requires cryptography ed25519 support)') + return + elif not stem.prereq._is_sha3_available(): + self.skipTest('(requires sha3 support)') + return + + self.assertEqual(b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1', HiddenServiceDescriptorV3._public_key_from_address(HS_ADDRESS)) + self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3._public_key_from_address, 'boom') + self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3._public_key_from_address, '5' * 56)