commit 33a96a6a754ca7e65f1d5741c6f813c447849ef8 Author: Damian Johnson atagar@torproject.org Date: Fri Nov 8 16:48:25 2019 -0800
Refactor and move layer encryption
We already had a _decrypt_layer() helper from the last branch. While doing the same for encryption it became evedent that these helpers are mostly identical so refactoring the common crypto into a third function.
Still not perfectly happy, but closer. :P --- stem/descriptor/hidden_service.py | 54 ++++++++++++++-------- stem/descriptor/hsv3_crypto.py | 95 --------------------------------------- 2 files changed, 36 insertions(+), 113 deletions(-)
diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py index 5fb7a189..f0baf678 100644 --- a/stem/descriptor/hidden_service.py +++ b/stem/descriptor/hidden_service.py @@ -445,12 +445,6 @@ def _parse_file(descriptor_file, desc_type = None, validate = False, **kwargs):
def _decrypt_layer(encrypted_block, constant, revision_counter, subcredential, blinded_key): - from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - from cryptography.hazmat.backends import default_backend - - def pack(val): - return struct.pack('>Q', val) - if encrypted_block.startswith('-----BEGIN MESSAGE-----\n') and encrypted_block.endswith('\n-----END MESSAGE-----'): encrypted_block = encrypted_block[24:-22]
@@ -466,22 +460,42 @@ def _decrypt_layer(encrypted_block, constant, revision_counter, subcredential, b ciphertext = encrypted[SALT_LEN:-MAC_LEN] expected_mac = encrypted[-MAC_LEN:]
- kdf = hashlib.shake_256(blinded_key + subcredential + pack(revision_counter) + salt + constant) + cipher, mac_for = _layer_cipher(constant, revision_counter, subcredential, blinded_key, salt) + + if expected_mac != mac_for(ciphertext): + raise ValueError('Malformed mac (expected %s, but was %s)' % (expected_mac, mac_for(ciphertext))) + + decryptor = cipher.decryptor() + plaintext = decryptor.update(ciphertext) + decryptor.finalize() + + return stem.util.str_tools._to_unicode(plaintext) + + +def _encrypt_layer(plaintext, constant, revision_counter, subcredential, blinded_key): + salt = os.urandom(16) + cipher, mac_for = _layer_cipher(constant, revision_counter, subcredential, blinded_key, salt) + + encryptor = cipher.encryptor() + ciphertext = encryptor.update(plaintext) + encryptor.finalize() + + return salt + ciphertext + mac_for(ciphertext) + + +def _layer_cipher(constant, revision_counter, subcredential, blinded_key, salt): + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + from cryptography.hazmat.backends import default_backend + + kdf = hashlib.shake_256(blinded_key + subcredential + struct.pack('>Q', revision_counter) + salt + constant) keys = kdf.digest(S_KEY_LEN + S_IV_LEN + MAC_LEN)
secret_key = keys[:S_KEY_LEN] secret_iv = keys[S_KEY_LEN:S_KEY_LEN + S_IV_LEN] mac_key = keys[S_KEY_LEN + S_IV_LEN:]
- mac = hashlib.sha3_256(pack(len(mac_key)) + mac_key + pack(len(salt)) + salt + ciphertext).digest() - - if mac != expected_mac: - raise ValueError('Malformed mac (expected %s, but was %s)' % (expected_mac, mac)) - cipher = Cipher(algorithms.AES(secret_key), modes.CTR(secret_iv), default_backend()) - decryptor = cipher.decryptor() + mac_prefix = struct.pack('>Q', len(mac_key)) + mac_key + struct.pack('>Q', len(salt)) + salt
- return stem.util.str_tools._to_unicode(decryptor.update(ciphertext) + decryptor.finalize()) + return cipher, lambda ciphertext: hashlib.sha3_256(mac_prefix + ciphertext).digest()
def _parse_protocol_versions_line(descriptor, entries): @@ -917,18 +931,22 @@ def _get_middle_descriptor_layer_body(encrypted): b'%s' % (fake_pub_key_bytes_b64, fake_clients, encrypted)
-def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_counter, blinded_key_bytes, subcredential): +def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_counter, blinded_key, subcredential): """ Get the superencrypted blob (which also includes the encrypted blob) that should be attached to the descriptor """
inner_descriptor_layer = stem.util.str_tools._to_bytes('create2-formats 2\n' + '\n'.join(map(IntroductionPointV3.encode, intro_points)) + '\n') - inner_ciphertext = stem.descriptor.hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential) + inner_ciphertext = _encrypt_layer(inner_descriptor_layer, b'hsdir-encrypted-data', revision_counter, subcredential, blinded_key) inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b'encrypted')
middle_descriptor_layer = _get_middle_descriptor_layer_body(inner_ciphertext_b64) - outter_ciphertext = stem.descriptor.hsv3_crypto.encrypt_outter_layer(middle_descriptor_layer, revision_counter, blinded_key_bytes, subcredential) + + padding_bytes_needed = stem.descriptor.hsv3_crypto._get_padding_needed(len(middle_descriptor_layer)) + middle_descriptor_layer = middle_descriptor_layer + b'\x00' * padding_bytes_needed + + outter_ciphertext = _encrypt_layer(middle_descriptor_layer, b'hsdir-superencrypted-data', revision_counter, subcredential, blinded_key)
return b64_and_wrap_desc_layer(outter_ciphertext)
@@ -1279,7 +1297,7 @@ class InnerLayer(Descriptor): super(InnerLayer, self).__init__(content, lazy_load = not validate) self.outer = outer_layer
- # inner layer begins with a few header fields, followed by multiple any + # inner layer begins with a few header fields, followed by any # number of introduction-points
div = content.find('\nintroduction-point ') diff --git a/stem/descriptor/hsv3_crypto.py b/stem/descriptor/hsv3_crypto.py index 80759aa2..b762c5ee 100644 --- a/stem/descriptor/hsv3_crypto.py +++ b/stem/descriptor/hsv3_crypto.py @@ -1,7 +1,3 @@ -import hashlib -import struct -import os - from stem.descriptor import slow_ed25519
@@ -109,82 +105,6 @@ Descriptor encryption """
-def pack(val): - return struct.pack('>Q', val) - - -def get_desc_keys(secret_data, string_constant, subcredential, revision_counter, salt): - """ - secret_input = SECRET_DATA | subcredential | INT_8(revision_counter) - - keys = KDF(secret_input | salt | STRING_CONSTANT, S_KEY_LEN + S_IV_LEN + MAC_KEY_LEN) - - SECRET_KEY = first S_KEY_LEN bytes of keys - SECRET_IV = next S_IV_LEN bytes of keys - MAC_KEY = last MAC_KEY_LEN bytes of keys - - where - - 2.5.1.1. First layer encryption logic - SECRET_DATA = blinded-public-key - STRING_CONSTANT = "hsdir-superencrypted-data" - - 2.5.2.1. Second layer encryption keys - SECRET_DATA = blinded-public-key | descriptor_cookie - STRING_CONSTANT = "hsdir-encrypted-data" - """ - - secret_input = b'%s%s%s' % (secret_data, subcredential, pack(revision_counter)) - - kdf = hashlib.shake_256(secret_input + salt + string_constant) - - keys = kdf.digest(S_KEY_LEN + S_IV_LEN + MAC_LEN) - - secret_key = keys[:S_KEY_LEN] - secret_iv = keys[S_KEY_LEN:S_KEY_LEN + S_IV_LEN] - mac_key = keys[S_KEY_LEN + S_IV_LEN:] - - return secret_key, secret_iv, mac_key - - -def get_desc_encryption_mac(key, salt, ciphertext): - mac = hashlib.sha3_256(pack(len(key)) + key + pack(len(salt)) + salt + ciphertext).digest() - return mac - - -def _encrypt_descriptor_layer(plaintext, revision_counter, subcredential, secret_data, string_constant): - """ - Encrypt descriptor layer at 'plaintext' - """ - - from cryptography.hazmat.backends import default_backend - from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - - salt = os.urandom(16) - - secret_key, secret_iv, mac_key = get_desc_keys(secret_data, string_constant, subcredential, revision_counter, salt) - - # Now time to encrypt descriptor - cipher = Cipher(algorithms.AES(secret_key), modes.CTR(secret_iv), default_backend()) - encryptor = cipher.encryptor() - ciphertext = encryptor.update(plaintext) + encryptor.finalize() - - mac = get_desc_encryption_mac(mac_key, salt, ciphertext) - - return salt + ciphertext + mac - - -def encrypt_inner_layer(plaintext, revision_counter, blinded_key_bytes, subcredential): - """ - Encrypt the inner layer of the descriptor - """ - - secret_data = blinded_key_bytes - string_constant = b'hsdir-encrypted-data' - - return _encrypt_descriptor_layer(plaintext, revision_counter, subcredential, secret_data, string_constant) - - def ceildiv(a, b): """ Like // division but return the ceiling instead of the floor @@ -205,18 +125,3 @@ def _get_padding_needed(plaintext_len):
final_size = ceildiv(plaintext_len, PAD_MULTIPLE_BYTES) * PAD_MULTIPLE_BYTES return final_size - plaintext_len - - -def encrypt_outter_layer(plaintext, revision_counter, blinded_key_bytes, subcredential): - """ - Encrypt the outer layer of the descriptor - """ - - secret_data = blinded_key_bytes - string_constant = b'hsdir-superencrypted-data' - - # In the outter layer we first need to pad the plaintext - padding_bytes_needed = _get_padding_needed(len(plaintext)) - padded_plaintext = plaintext + b'\x00' * padding_bytes_needed - - return _encrypt_descriptor_layer(padded_plaintext, revision_counter, subcredential, secret_data, string_constant)