commit 0f7ed61b9980fb1c2994045ac9aefde8ed7194c4 Author: Damian Johnson atagar@torproject.org Date: Sun Oct 13 17:22:19 2019 -0700
Pycodestyle corrections
STATIC CHECKS * /home/atagar/Desktop/stem/stem/descriptor/slow_ed25519.py line 15 - E741 ambiguous variable name 'l' | l = 2**252 + 27742317777372353535851937790883648493 line 17 - E302 expected 2 blank lines, found 1 | def H(m): line 20 - E231 missing whitespace after ',' | def expmod(b,e,m): line 20 - E302 expected 2 blank lines, found 1 | def expmod(b,e,m): line 21 - E701 multiple statements on one line (colon) | if e == 0: return 1 line 22 - E226 missing whitespace around arithmetic operator | t = expmod(b,e//2,m)**2 % m line 22 - E231 missing whitespace after ',' | t = expmod(b,e//2,m)**2 % m line 23 - E226 missing whitespace around arithmetic operator | if e & 1: t = (t*b) % m line 23 - E701 multiple statements on one line (colon) | if e & 1: t = (t*b) % m line 26 - E302 expected 2 blank lines, found 1 | def inv(x): line 27 - E231 missing whitespace after ',' | return expmod(x,q-2,q) line 27 - E226 missing whitespace around arithmetic operator | return expmod(x,q-2,q) line 29 - E305 expected 2 blank lines after class or function definition, found 1 | d = -121665 * inv(121666) line 30 - E741 ambiguous variable name 'I' | I = expmod(2,(q-1)//4,q) line 30 - E231 missing whitespace after ',' | I = expmod(2,(q-1)//4,q) line 30 - E226 missing whitespace around arithmetic operator | I = expmod(2,(q-1)//4,q) line 32 - E302 expected 2 blank lines, found 1 | def xrecover(y): line 33 - E226 missing whitespace around arithmetic operator | xx = (y*y-1) * inv(d*y*y+1) line 34 - E226 missing whitespace around arithmetic operator | x = expmod(xx,(q+3)//8,q) line 34 - E231 missing whitespace after ',' | x = expmod(xx,(q+3)//8,q) line 35 - E226 missing whitespace around arithmetic operator | if (x*x - xx) % q != 0: x = (x*I) % q line 35 - E701 multiple statements on one line (colon) | if (x*x - xx) % q != 0: x = (x*I) % q line 36 - E226 missing whitespace around arithmetic operator | if x % 2 != 0: x = q-x line 36 - E701 multiple statements on one line (colon) | if x % 2 != 0: x = q-x line 39 - E305 expected 2 blank lines after class or function definition, found 1 | By = 4 * inv(5) line 41 - E231 missing whitespace after ',' | B = [Bx % q,By % q] line 43 - E231 missing whitespace after ',' | def edwards(P,Q): line 43 - E302 expected 2 blank lines, found 1 | def edwards(P,Q): line 48 - E226 missing whitespace around arithmetic operator | x3 = (x1*y2+x2*y1) * inv(1+d*x1*x2*y1*y2) line 49 - E226 missing whitespace around arithmetic operator | y3 = (y1*y2+x1*x2) * inv(1-d*x1*x2*y1*y2) line 50 - E231 missing whitespace after ',' | return [x3 % q,y3 % q] line 52 - E231 missing whitespace after ',' | def scalarmult(P,e): line 52 - E302 expected 2 blank lines, found 1 | def scalarmult(P,e): line 53 - E701 multiple statements on one line (colon) | if e == 0: return [0,1] line 53 - E231 missing whitespace after ',' | if e == 0: return [0,1] line 54 - E231 missing whitespace after ',' | Q = scalarmult(P,e//2) line 54 - E226 missing whitespace around arithmetic operator | Q = scalarmult(P,e//2) line 55 - E231 missing whitespace after ',' | Q = edwards(Q,Q) line 56 - E231 missing whitespace after ',' | if e & 1: Q = edwards(Q,P) line 56 - E701 multiple statements on one line (colon) | if e & 1: Q = edwards(Q,P) line 59 - E302 expected 2 blank lines, found 1 | def encodeint(y): line 61 - E226 missing whitespace around arithmetic operator | return b''.join([chr(sum([bits[i * 8 + j] << j for j in range(8)])) for i in range(b//8)]) line 63 - E302 expected 2 blank lines, found 1 | def encodepoint(P): line 67 - E226 missing whitespace around arithmetic operator | return b''.join([chr(sum([bits[i * 8 + j] << j for j in range(8)])) for i in range(b//8)]) line 69 - E302 expected 2 blank lines, found 1 | def bit(h,i): line 69 - E231 missing whitespace after ',' | def bit(h,i): line 70 - E228 missing whitespace around modulo operator | return (ord(h[i//8:i//8+1]) >> (i%8)) & 1 line 70 - E226 missing whitespace around arithmetic operator | return (ord(h[i//8:i//8+1]) >> (i%8)) & 1 line 72 - E302 expected 2 blank lines, found 1 | def publickey(sk): line 74 - E231 missing whitespace after ',' | a = 2**(b-2) + sum(2**i * bit(h,i) for i in range(3,b-2)) line 74 - E226 missing whitespace around arithmetic operator | a = 2**(b-2) + sum(2**i * bit(h,i) for i in range(3,b-2)) line 75 - E231 missing whitespace after ',' | A = scalarmult(B,a) line 78 - E302 expected 2 blank lines, found 1 | def Hint(m): line 80 - E231 missing whitespace after ',' | return sum(2**i * bit(h,i) for i in range(2*b)) line 80 - E226 missing whitespace around arithmetic operator | return sum(2**i * bit(h,i) for i in range(2*b)) line 82 - E302 expected 2 blank lines, found 1 | def signature(m,sk,pk): line 82 - E231 missing whitespace after ',' | def signature(m,sk,pk): line 84 - E231 missing whitespace after ',' | a = 2**(b-2) + sum(2**i * bit(h,i) for i in range(3,b-2)) line 84 - E226 missing whitespace around arithmetic operator | a = 2**(b-2) + sum(2**i * bit(h,i) for i in range(3,b-2)) line 85 - E226 missing whitespace around arithmetic operator | r = Hint(b''.join([h[i:i+1] for i in range(b//8,b//4)]) + m) line 85 - E231 missing whitespace after ',' | r = Hint(b''.join([h[i:i+1] for i in range(b//8,b//4)]) + m) line 86 - E231 missing whitespace after ',' | R = scalarmult(B,r) line 90 - E302 expected 2 blank lines, found 1 | def isoncurve(P): line 93 - E226 missing whitespace around arithmetic operator | return (-x*x + y*y - 1 - d*x*x*y*y) % q == 0 line 95 - E302 expected 2 blank lines, found 1 | def decodeint(s): line 96 - E231 missing whitespace after ',' | return sum(2**i * bit(s,i) for i in range(0,b)) line 98 - E302 expected 2 blank lines, found 1 | def decodepoint(s): line 99 - E231 missing whitespace after ',' | y = sum(2**i * bit(s,i) for i in range(0,b-1)) line 99 - E226 missing whitespace around arithmetic operator | y = sum(2**i * bit(s,i) for i in range(0,b-1)) line 101 - E231 missing whitespace after ',' | if x & 1 != bit(s,b-1): x = q-x line 101 - E226 missing whitespace around arithmetic operator | if x & 1 != bit(s,b-1): x = q-x line 101 - E701 multiple statements on one line (colon) | if x & 1 != bit(s,b-1): x = q-x line 102 - E231 missing whitespace after ',' | P = [x,y] line 103 - E701 multiple statements on one line (colon) | if not isoncurve(P): raise Exception("decoding point that is not on curve") line 103 - use single rather than double quotes | if not isoncurve(P): raise Exception("decoding point that is not on curve") line 106 - E231 missing whitespace after ',' | def checkvalid(s,m,pk): line 106 - E302 expected 2 blank lines, found 1 | def checkvalid(s,m,pk): line 107 - use single rather than double quotes | if len(s) != b//4: raise Exception("signature length is wrong") line 107 - E701 multiple statements on one line (colon) | if len(s) != b//4: raise Exception("signature length is wrong") line 107 - E226 missing whitespace around arithmetic operator | if len(s) != b//4: raise Exception("signature length is wrong") line 108 - use single rather than double quotes | if len(pk) != b//8: raise Exception("public-key length is wrong") line 108 - E226 missing whitespace around arithmetic operator | if len(pk) != b//8: raise Exception("public-key length is wrong") line 108 - E701 multiple statements on one line (colon) | if len(pk) != b//8: raise Exception("public-key length is wrong") line 109 - E226 missing whitespace around arithmetic operator | R = decodepoint(s[0:b//8]) line 111 - E226 missing whitespace around arithmetic operator | S = decodeint(s[b//8:b//4]) line 113 - E231 missing whitespace after ',' | if scalarmult(B,S) != edwards(R,scalarmult(A,h)): line 114 - use single rather than double quotes | raise Exception("signature does not pass verification")
* /home/atagar/Desktop/stem/test/unit/descriptor/hidden_service_v3.py line 55 - E302 expected 2 blank lines, found 1 | class TestHiddenServiceDescriptorV3(unittest.TestCase): line 259 - E226 missing whitespace around arithmetic operator | private_identity_key = Ed25519PrivateKey.from_private_bytes(b"a"*32) line 259 - use single rather than double quotes | private_identity_key = Ed25519PrivateKey.from_private_bytes(b"a"*32) line 274 - use single rather than double quotes | blind_param = bytearray.fromhex("677776AE42464CAAB0DF0BF1E68A5FB651A390A6A8243CF4B60EE73A6AC2E4E3") line 292 - E261 at least two spaces before inline comment | original_found = False # Make sure we found all the intro points
* /home/atagar/Desktop/stem/test/unit/descriptor/certificate.py line 231 - E128 continuation line under-indented for visual indent | format=serialization.PublicFormat.Raw))
* /home/atagar/Desktop/stem/stem/client/datatype.py line 584 - E302 expected 2 blank lines, found 1 | class LinkByIPv4(LinkSpecifier):
* /home/atagar/Desktop/stem/stem/descriptor/ed25519_exts_ref.py line 12 - 'from .slow_ed25519 import *' used; unable to detect undefined names | from .slow_ed25519 import * line 15 - 'random' imported but unused | import random line 20 - E265 block comment should start with '# ' | #define a synonym that doesn't look like 1 line 21 - 'l' may be undefined, or defined from star imports: .slow_ed25519 | ell = l line 26 - E302 expected 2 blank lines, found 1 | def blindESK(esk, param): line 27 - 'bit' may be undefined, or defined from star imports: .slow_ed25519 | mult = 2**(b-2) + sum(2**i * bit(param,i) for i in range(3,b-2)) line 27 - E226 missing whitespace around arithmetic operator | mult = 2**(b-2) + sum(2**i * bit(param,i) for i in range(3,b-2)) line 27 - 'b' may be undefined, or defined from star imports: .slow_ed25519 | mult = 2**(b-2) + sum(2**i * bit(param,i) for i in range(3,b-2)) line 27 - E231 missing whitespace after ',' | mult = 2**(b-2) + sum(2**i * bit(param,i) for i in range(3,b-2)) line 28 - 'decodeint' may be undefined, or defined from star imports: .slow_ed25519 | s = decodeint(esk[:32]) line 32 - use single rather than double quotes | k_prime = H(b"Derive temporary signing key hash input" + k)[:32] line 32 - 'H' may be undefined, or defined from star imports: .slow_ed25519 | k_prime = H(b"Derive temporary signing key hash input" + k)[:32] line 33 - 'encodeint' may be undefined, or defined from star imports: .slow_ed25519 | return encodeint(s_prime) + k_prime line 35 - E302 expected 2 blank lines, found 1 | def blindPK(pk, param): line 36 - 'bit' may be undefined, or defined from star imports: .slow_ed25519 | mult = 2**(b-2) + sum(2**i * bit(param,i) for i in range(3,b-2)) line 36 - E226 missing whitespace around arithmetic operator | mult = 2**(b-2) + sum(2**i * bit(param,i) for i in range(3,b-2)) line 36 - 'b' may be undefined, or defined from star imports: .slow_ed25519 | mult = 2**(b-2) + sum(2**i * bit(param,i) for i in range(3,b-2)) line 36 - E231 missing whitespace after ',' | mult = 2**(b-2) + sum(2**i * bit(param,i) for i in range(3,b-2)) line 37 - 'decodepoint' may be undefined, or defined from star imports: .slow_ed25519 | P = decodepoint(pk) line 38 - 'encodepoint' may be undefined, or defined from star imports: .slow_ed25519 | return encodepoint(scalarmult(P, mult)) line 38 - 'scalarmult' may be undefined, or defined from star imports: .slow_ed25519 | return encodepoint(scalarmult(P, mult)) line 40 - E302 expected 2 blank lines, found 1 | def expandSK(sk): line 41 - 'H' may be undefined, or defined from star imports: .slow_ed25519 | h = H(sk) line 42 - E226 missing whitespace around arithmetic operator | a = 2**(b-2) + sum(2**i * bit(h,i) for i in range(3,b-2)) line 42 - E231 missing whitespace after ',' | a = 2**(b-2) + sum(2**i * bit(h,i) for i in range(3,b-2)) line 42 - 'b' may be undefined, or defined from star imports: .slow_ed25519 | a = 2**(b-2) + sum(2**i * bit(h,i) for i in range(3,b-2)) line 42 - 'bit' may be undefined, or defined from star imports: .slow_ed25519 | a = 2**(b-2) + sum(2**i * bit(h,i) for i in range(3,b-2)) line 43 - E231 missing whitespace after ',' | k = b''.join([h[i:i+1] for i in range(b//8,b//4)]) line 43 - 'b' may be undefined, or defined from star imports: .slow_ed25519 | k = b''.join([h[i:i+1] for i in range(b//8,b//4)]) line 43 - E226 missing whitespace around arithmetic operator | k = b''.join([h[i:i+1] for i in range(b//8,b//4)]) line 45 - 'encodeint' may be undefined, or defined from star imports: .slow_ed25519 | return encodeint(a)+k line 45 - E226 missing whitespace around arithmetic operator | return encodeint(a)+k line 47 - E302 expected 2 blank lines, found 1 | def publickeyFromESK(h): line 48 - 'decodeint' may be undefined, or defined from star imports: .slow_ed25519 | a = decodeint(h[:32]) line 49 - E231 missing whitespace after ',' | A = scalarmult(B,a) line 49 - 'B' may be undefined, or defined from star imports: .slow_ed25519 | A = scalarmult(B,a) line 49 - 'scalarmult' may be undefined, or defined from star imports: .slow_ed25519 | A = scalarmult(B,a) line 50 - 'encodepoint' may be undefined, or defined from star imports: .slow_ed25519 | return encodepoint(A) line 52 - E231 missing whitespace after ',' | def signatureWithESK(m,h,pk): line 52 - E302 expected 2 blank lines, found 1 | def signatureWithESK(m,h,pk): line 53 - 'decodeint' may be undefined, or defined from star imports: .slow_ed25519 | a = decodeint(h[:32]) line 54 - E226 missing whitespace around arithmetic operator | r = Hint(b''.join([h[i:i+1] for i in range(b//8,b//4)]) + m) line 54 - E231 missing whitespace after ',' | r = Hint(b''.join([h[i:i+1] for i in range(b//8,b//4)]) + m) line 54 - 'b' may be undefined, or defined from star imports: .slow_ed25519 | r = Hint(b''.join([h[i:i+1] for i in range(b//8,b//4)]) + m) line 54 - 'Hint' may be undefined, or defined from star imports: .slow_ed25519 | r = Hint(b''.join([h[i:i+1] for i in range(b//8,b//4)]) + m) line 55 - 'scalarmult' may be undefined, or defined from star imports: .slow_ed25519 | R = scalarmult(B,r) line 55 - E231 missing whitespace after ',' | R = scalarmult(B,r) line 55 - 'B' may be undefined, or defined from star imports: .slow_ed25519 | R = scalarmult(B,r) line 56 - 'encodepoint' may be undefined, or defined from star imports: .slow_ed25519 | S = (r + Hint(encodepoint(R) + pk + m) * a) % l line 56 - 'l' may be undefined, or defined from star imports: .slow_ed25519 | S = (r + Hint(encodepoint(R) + pk + m) * a) % l line 56 - 'Hint' may be undefined, or defined from star imports: .slow_ed25519 | S = (r + Hint(encodepoint(R) + pk + m) * a) % l line 57 - 'encodeint' may be undefined, or defined from star imports: .slow_ed25519 | return encodepoint(R) + encodeint(S) line 57 - 'encodepoint' may be undefined, or defined from star imports: .slow_ed25519 | return encodepoint(R) + encodeint(S) line 59 - E302 expected 2 blank lines, found 1 | def newSK(): line 62 - E302 expected 2 blank lines, found 1 | def random_scalar(entropy_f): # 0..L-1 inclusive line 62 - E261 at least two spaces before inline comment | def random_scalar(entropy_f): # 0..L-1 inclusive line 64 - E226 missing whitespace around arithmetic operator | oversized = int(binascii.hexlify(entropy_f(32+32)), 16) line 69 - E305 expected 2 blank lines after class or function definition, found 1 | MSG = b"This is extremely silly. But it is also incredibly serious business!" line 69 - use single rather than double quotes | MSG = b"This is extremely silly. But it is also incredibly serious business!" line 71 - E302 expected 2 blank lines, found 1 | class SelfTest(unittest.TestCase): line 75 - 'checkvalid' may be undefined, or defined from star imports: .slow_ed25519 | checkvalid(sig, MSG, pk) line 78 - E226 missing whitespace around arithmetic operator | checkvalid(sig, MSG*2, pk) line 78 - 'checkvalid' may be undefined, or defined from star imports: .slow_ed25519 | checkvalid(sig, MSG*2, pk) line 87 - 'publickey' may be undefined, or defined from star imports: .slow_ed25519 | pk = publickey(sk) line 89 - 'signature' may be undefined, or defined from star imports: .slow_ed25519 | sig1 = signature(MSG, sk, pk) line 97 - 'publickey' may be undefined, or defined from star imports: .slow_ed25519 | pk2 = publickey(sk) line 117 - 'inv' may be undefined, or defined from star imports: .slow_ed25519 | By = 4 * inv(5) line 118 - 'xrecover' may be undefined, or defined from star imports: .slow_ed25519 | Bx = xrecover(By) line 119 - E231 missing whitespace after ',' | B = [Bx % q,By % q] line 119 - 'q' may be undefined, or defined from star imports: .slow_ed25519 | B = [Bx % q,By % q] line 122 - 'scalarmult' may be undefined, or defined from star imports: .slow_ed25519 | identity = scalarmult(B, ell) line 126 - 'decodepoint' may be undefined, or defined from star imports: .slow_ed25519 | pk = decodepoint(publickey(sk)) line 126 - 'publickey' may be undefined, or defined from star imports: .slow_ed25519 | pk = decodepoint(publickey(sk)) line 127 - 'scalarmult' may be undefined, or defined from star imports: .slow_ed25519 | identity2 = scalarmult(pk, ell) line 132 - E231 missing whitespace after ',' | assert(identity == [0,1]) line 136 - 'scalarmult' may be undefined, or defined from star imports: .slow_ed25519 | result = scalarmult(identity, scalar) line 142 - E305 expected 2 blank lines after class or function definition, found 1 | RAND_INPUTS = [ line 163 - use single rather than double quotes | PREFIX = "ED25519_" line 165 - E302 expected 2 blank lines, found 1 | def writeArray(name, array): line 166 - use single rather than double quotes | print("static const char *{prefix}{name}[] = {{".format( line 167 - E231 missing whitespace after ',' | prefix=PREFIX,name=name)) line 173 - E231 missing whitespace after ',' | print(' "{0}"\n "{1}",'.format(h1,h2)) line 176 - use single rather than double quotes | print("};\n") line 178 - E302 expected 2 blank lines, found 1 | def comment(text, initial="/**"): line 178 - use single rather than double quotes | def comment(text, initial="/**"): line 180 - use single rather than double quotes | print(textwrap.fill(text,initial_indent=" * ",subsequent_indent=" * ")) line 180 - E231 missing whitespace after ',' | print(textwrap.fill(text,initial_indent=" * ",subsequent_indent=" * ")) line 181 - use single rather than double quotes | print(" */") line 183 - E302 expected 2 blank lines, found 1 | def makeTestVectors(): line 189 - E303 too many blank lines (2) | comment("""Secret key seeds used as inputs for the ed25519 test vectors. line 191 - E201 whitespace after '[' | secretKeys = [ bytes.fromhex(r) for r in RAND_INPUTS ] line 191 - E202 whitespace before ']' | secretKeys = [ bytes.fromhex(r) for r in RAND_INPUTS ] line 192 - use single rather than double quotes | writeArray("SECRET_KEYS", secretKeys) line 196 - E201 whitespace after '[' | expandedSecretKeys = [ expandSK(sk) for sk in secretKeys ] line 196 - E202 whitespace before ']' | expandedSecretKeys = [ expandSK(sk) for sk in secretKeys ] line 197 - use single rather than double quotes | writeArray("EXPANDED_SECRET_KEYS", expandedSecretKeys) line 200 - E202 whitespace before ']' | publicKeys = [ publickey(sk) for sk in secretKeys ] line 200 - 'publickey' may be undefined, or defined from star imports: .slow_ed25519 | publicKeys = [ publickey(sk) for sk in secretKeys ] line 200 - E201 whitespace after '[' | publicKeys = [ publickey(sk) for sk in secretKeys ] line 204 - E202 whitespace before ']' | blindingParams = [ binascii.a2b_hex(r) for r in BLINDING_PARAMS ] line 204 - E201 whitespace after '[' | blindingParams = [ binascii.a2b_hex(r) for r in BLINDING_PARAMS ] line 204 - E222 multiple spaces after operator | blindingParams = [ binascii.a2b_hex(r) for r in BLINDING_PARAMS ] line 205 - use single rather than double quotes | writeArray("BLINDING_PARAMS", blindingParams) line 210 - use single rather than double quotes | writeArray("BLINDED_SECRET_KEYS", line 212 - E231 missing whitespace after ',' | for sk,bp in zip(secretKeys,blindingParams))) line 217 - use single rather than double quotes | writeArray("BLINDED_PUBLIC_KEYS", line 218 - E231 missing whitespace after ',' | (blindPK(pk, bp) for pk,bp in zip(publicKeys,blindingParams))) line 222 - use single rather than double quotes | writeArray("SELF_SIGNATURES", line 223 - 'signature' may be undefined, or defined from star imports: .slow_ed25519 | (signature(pk, sk, pk) for pk,sk in zip(publicKeys,secretKeys))) line 223 - E231 missing whitespace after ',' | (signature(pk, sk, pk) for pk,sk in zip(publicKeys,secretKeys))) line 227 - E303 too many blank lines (3) | if __name__ == '__main__': line 229 - use single rather than double quotes | if len(sys.argv) == 1 or sys.argv[1] not in ("SelfTest", "MakeVectors"):
* /home/atagar/Desktop/stem/stem/descriptor/hidden_service.py line 38 - 'struct' imported but unused | import struct line 190 - use single rather than double quotes | raise ValueError("Either auth key or auth key cert needs to be provided") line 215 - use single rather than double quotes | ls_block = b"" line 227 - use single rather than double quotes | raise ValueError("Cannot encode: Descriptor signing key not provided") line 231 - use single rather than double quotes | body = b"" line 233 - use single rather than double quotes | body += b"introduction-point %s\n" % (self._encode_link_specifier_block()) line 237 - E128 continuation line under-indented for visual indent | format=serialization.PublicFormat.Raw) line 238 - use single rather than double quotes | body += b"onion-key ntor %s\n" % (base64.b64encode(onion_key_bytes)) line 247 - use single rather than double quotes | body += b"auth-key\n%s\n" % (auth_key_cert_b64_blob) line 252 - use single rather than double quotes | body += b"enc-key ntor %s\n" % (base64.b64encode(enc_key_bytes)) line 261 - use single rather than double quotes | body += b"enc-key-cert\n%s\n" % (enc_key_cert_b64_blob) line 270 - E302 expected 2 blank lines, found 1 | class AuthorizedClient(collections.namedtuple('AuthorizedClient', ['id', 'iv', 'cookie'])): line 466 - E303 too many blank lines (2) | introduction_points.append( line 792 - E305 expected 2 blank lines after class or function definition, found 1 | import stem.descriptor.certificate line 792 - E402 module level import not at top of file | import stem.descriptor.certificate line 793 - E402 module level import not at top of file | from stem.descriptor import hsv3_crypto line 793 - redefinition of unused 'hsv3_crypto' from line 47 | from stem.descriptor import hsv3_crypto line 794 - E402 module level import not at top of file | from cryptography.hazmat.primitives import serialization line 795 - E402 module level import not at top of file | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey line 796 - E402 module level import not at top of file | from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey line 797 - E402 module level import not at top of file | import datetime line 799 - E302 expected 2 blank lines, found 1 | def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key): line 824 - E302 expected 2 blank lines, found 1 | def _get_descriptor_revision_counter(): line 828 - use single rather than double quotes | def b64_and_wrap_desc_layer(layer_bytes, prefix_bytes=b""): line 828 - E302 expected 2 blank lines, found 1 | def b64_and_wrap_desc_layer(layer_bytes, prefix_bytes=b""): line 837 - E302 expected 2 blank lines, found 1 | def _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey): line 846 - use single rather than double quotes | raise ValueError("Need a proper intro points set") line 848 - use single rather than double quotes | final_body = b"create2-formats 2\n" line 856 - E302 expected 2 blank lines, found 1 | def _get_fake_clients_bytes(): line 860 - use single rather than double quotes | final_bytes = b"" line 862 - E261 at least two spaces before inline comment | num_fake_clients = 16 # default for when client auth is disabled line 865 - use single rather than double quotes | client_id = base64.b64encode(os.urandom(8)).rstrip(b"=") line 866 - use single rather than double quotes | client_iv = base64.b64encode(os.urandom(16)).rstrip(b"=") line 867 - use single rather than double quotes | descriptor_cookie = base64.b64encode(os.urandom(16)).rstrip(b"=") line 869 - use single rather than double quotes | final_bytes += b"%s %s %s %s\n" % (b"auth-client", client_id, client_iv, descriptor_cookie) line 873 - E302 expected 2 blank lines, found 1 | def _get_middle_descriptor_layer_body(encrypted): line 878 - local variable 'fake_priv_key' is assigned to but never used | fake_priv_key = X25519PrivateKey.generate() line 888 - use single rather than double quotes | b"%s" % (fake_pub_key_bytes_b64, fake_clients, encrypted) line 890 - E302 expected 2 blank lines, found 1 | def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, line 902 - use single rather than double quotes | inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b"encrypted") line 902 - E303 too many blank lines (2) | inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b"encrypted") line 911 - E302 expected 2 blank lines, found 1 | def _get_v3_desc_signature(desc_str, signing_key): line 915 - use single rather than double quotes | desc_str = b"Tor onion service descriptor sig v3" + desc_str line 918 - use single rather than double quotes | signature = signature.rstrip(b"=") line 919 - use single rather than double quotes | return b"signature %s" % (signature) line 1008 - E303 too many blank lines (2) | # Blind the identity key to get ephemeral blinded key line 1039 - use single rather than double quotes | desc_content += b"\n" line 1047 - E303 too many blank lines (2) | @classmethod line 1082 - use single rather than double quotes | descriptor_body = raw_contents.split(b"signature")[0] # everything before the signature line 1082 - E261 at least two spaces before inline comment | descriptor_body = raw_contents.split(b"signature")[0] # everything before the signature line 1083 - use single rather than double quotes | signature_body = b"Tor onion service descriptor sig v3" + descriptor_body
* /home/atagar/Desktop/stem/stem/descriptor/certificate.py line 269 - undefined name 'key_type' | raise ValueError("Certificate is not an ed25519 cert (%d)" % key_type) line 269 - use single rather than double quotes | raise ValueError("Certificate is not an ed25519 cert (%d)" % key_type) line 346 - E302 expected 2 blank lines, found 1 | class MyED25519Certificate(object): line 440 - E128 continuation line under-indented for visual indent | format=serialization.PublicFormat.Raw)
* /home/atagar/Desktop/stem/stem/descriptor/hsv3_crypto.py line 11 - 'cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey' imported but unused | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey line 16 - E302 expected 2 blank lines, found 1 | def pubkeys_are_equal(pubkey1, pubkey2): line 27 - E305 expected 2 blank lines after class or function definition, found 1 | """ line 40 - E302 expected 2 blank lines, found 0 | class HSv3PrivateBlindedKey(object): line 60 - E302 expected 2 blank lines, found 1 | class HSv3PublicBlindedKey(object): line 71 - undefined name 'ext' | ext.slow_ed25519.checkvalid(signature, message, self.public_key) line 73 - E305 expected 2 blank lines after class or function definition, found 1 | """ line 79 - E302 expected 2 blank lines, found 0 | def get_subcredential(public_identity_key, blinded_key): line 80 - use single rather than double quotes | cred_bytes_constant = "credential".encode() line 81 - use single rather than double quotes | subcred_bytes_constant = "subcredential".encode() line 83 - use single rather than double quotes | credential = hashlib.sha3_256(b"%s%s" % (cred_bytes_constant, public_identity_key)).digest() line 84 - use single rather than double quotes | subcredential = hashlib.sha3_256(b"%s%s%s" % (subcred_bytes_constant, credential, blinded_key)).digest() line 88 - E305 expected 2 blank lines after class or function definition, found 1 | """ line 99 - use single rather than double quotes | CHECKSUM_CONSTANT = b".onion checksum" line 101 - E302 expected 2 blank lines, found 1 | def encode_onion_address(ed25519_pub_key_bytes): line 106 - W291 trailing whitespace | if not stem.prereq._is_sha3_available(): line 110 - use single rather than double quotes | checksum_body = b"%s%s%d" % (CHECKSUM_CONSTANT, ed25519_pub_key_bytes, version) line 113 - use single rather than double quotes | onion_address_bytes = b"%s%s%d" % (ed25519_pub_key_bytes, checksum, version) line 114 - use single rather than double quotes | onion_address = base64.b32encode(onion_address_bytes) + b".onion" line 115 - use single rather than double quotes | assert(len(onion_address) == 56 + len(".onion")) line 119 - E305 expected 2 blank lines after class or function definition, found 1 | """ line 159 - E302 expected 2 blank lines, found 1 | def pack(val): line 184 - use single rather than double quotes | secret_input = b"%s%s%s" % (secret_data, subcredential, pack(revision_counter)) line 196 - E302 expected 2 blank lines, found 1 | def get_desc_encryption_mac(key, salt, ciphertext): line 201 - E302 expected 2 blank lines, found 1 | def _encrypt_descriptor_layer(plaintext, revision_counter, line 227 - use single rather than double quotes | string_constant = b"hsdir-encrypted-data" line 233 - E231 missing whitespace after ',' | def ceildiv(a,b): line 239 - E302 expected 2 blank lines, found 1 | def _get_padding_needed(plaintext_len): line 248 - E226 missing whitespace around arithmetic operator | final_size = ceildiv(plaintext_len, PAD_MULTIPLE_BYTES)*PAD_MULTIPLE_BYTES line 251 - E302 expected 2 blank lines, found 1 | def encrypt_outter_layer(plaintext, revision_counter, blinded_key_bytes, subcredential): line 256 - use single rather than double quotes | string_constant = b"hsdir-superencrypted-data" line 260 - E226 missing whitespace around arithmetic operator | padded_plaintext = plaintext + b'\x00'*padding_bytes_needed --- stem/client/datatype.py | 1 + stem/descriptor/certificate.py | 6 +- stem/descriptor/ed25519_exts_ref.py | 326 +++++++++++++++--------------- stem/descriptor/hidden_service.py | 170 ++++++++-------- stem/descriptor/hsv3_crypto.py | 260 ++++++++++++------------ stem/descriptor/slow_ed25519.py | 140 ++++++++----- test/unit/descriptor/certificate.py | 13 +- test/unit/descriptor/hidden_service_v3.py | 16 +- 8 files changed, 490 insertions(+), 442 deletions(-)
diff --git a/stem/client/datatype.py b/stem/client/datatype.py index b1f65955..76957819 100644 --- a/stem/client/datatype.py +++ b/stem/client/datatype.py @@ -581,6 +581,7 @@ class LinkSpecifier(object): cell += self.value return bytes(cell)
+ class LinkByIPv4(LinkSpecifier): """ TLS connection to an IPv4 address. diff --git a/stem/descriptor/certificate.py b/stem/descriptor/certificate.py index b83d58a5..4ae999ed 100644 --- a/stem/descriptor/certificate.py +++ b/stem/descriptor/certificate.py @@ -266,7 +266,7 @@ class Ed25519CertificateV1(Ed25519Certificate):
# Make sure it's an ed25519 cert if (self.key_type != 1): - raise ValueError("Certificate is not an ed25519 cert (%d)" % key_type) + raise ValueError('Certificate is not an ed25519 cert (%d)' % self.key_type)
ed_key = Ed25519PublicKey.from_public_bytes(self.key) return ed_key @@ -343,6 +343,7 @@ class Ed25519CertificateV1(Ed25519Certificate): except InvalidSignature: raise ValueError('Descriptor Ed25519 certificate signature invalid (Signature was forged or corrupt)')
+ class MyED25519Certificate(object): """ This class represents an ed25519 certificate and it's made for encoding it into a string. @@ -436,8 +437,7 @@ class MyED25519Certificate(object): obj += Size.CHAR.pack(self.cert_key_type)
# Encode CERTIFIED_KEY - certified_pub_key_bytes = self.certified_pub_key.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw) + certified_pub_key_bytes = self.certified_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) assert(len(certified_pub_key_bytes) == 32) obj += certified_pub_key_bytes
diff --git a/stem/descriptor/ed25519_exts_ref.py b/stem/descriptor/ed25519_exts_ref.py index e966e4f7..23701315 100644 --- a/stem/descriptor/ed25519_exts_ref.py +++ b/stem/descriptor/ed25519_exts_ref.py @@ -8,137 +8,142 @@ Includes self-tester and test vector generator. """
-from . import slow_ed25519 -from .slow_ed25519 import * +from stem.descriptor import slow_ed25519
import os -import random import unittest import binascii import textwrap
-#define a synonym that doesn't look like 1 -ell = l +# define a synonym that doesn't look like 1 +ell = slow_ed25519.l
# This replaces expmod above and makes it go a lot faster. slow_ed25519.expmod = pow
+ def blindESK(esk, param): - mult = 2**(b-2) + sum(2**i * bit(param,i) for i in range(3,b-2)) - s = decodeint(esk[:32]) - s_prime = (s * mult) % ell - k = esk[32:] - assert(len(k) == 32) - k_prime = H(b"Derive temporary signing key hash input" + k)[:32] - return encodeint(s_prime) + k_prime + mult = 2 ** (slow_ed25519.b - 2) + sum(2 ** i * slow_ed25519.bit(param, i) for i in range(3, slow_ed25519.b - 2)) + s = slow_ed25519.decodeint(esk[:32]) + s_prime = (s * mult) % ell + k = esk[32:] + assert(len(k) == 32) + k_prime = slow_ed25519.H(b'Derive temporary signing key hash input' + k)[:32] + return slow_ed25519.encodeint(s_prime) + k_prime +
def blindPK(pk, param): - mult = 2**(b-2) + sum(2**i * bit(param,i) for i in range(3,b-2)) - P = decodepoint(pk) - return encodepoint(scalarmult(P, mult)) + mult = 2 ** (slow_ed25519.b - 2) + sum(2 ** i * slow_ed25519.bit(param, i) for i in range(3, slow_ed25519.b - 2)) + P = slow_ed25519.decodepoint(pk) + return slow_ed25519.encodepoint(slow_ed25519.scalarmult(P, mult)) +
def expandSK(sk): - h = H(sk) - a = 2**(b-2) + sum(2**i * bit(h,i) for i in range(3,b-2)) - k = b''.join([h[i:i+1] for i in range(b//8,b//4)]) - assert len(k) == 32 - return encodeint(a)+k + h = slow_ed25519.H(sk) + a = 2 ** (slow_ed25519.b - 2) + sum(2 ** i * slow_ed25519.bit(h, i) for i in range(3, slow_ed25519.b - 2)) + k = b''.join([h[i:i + 1] for i in range(slow_ed25519.b // 8, slow_ed25519.b // 4)]) + assert len(k) == 32 + return slow_ed25519.encodeint(a) + k +
def publickeyFromESK(h): - a = decodeint(h[:32]) - A = scalarmult(B,a) - return encodepoint(A) + a = slow_ed25519.decodeint(h[:32]) + A = slow_ed25519.scalarmult(slow_ed25519.B, a) + return slow_ed25519.encodepoint(A) + + +def signatureWithESK(m, h, pk): + a = slow_ed25519.decodeint(h[:32]) + r = slow_ed25519.Hint(b''.join([h[i:i + 1] for i in range(slow_ed25519.b // 8, slow_ed25519.b // 4)]) + m) + R = slow_ed25519.scalarmult(slow_ed25519.B, r) + S = (r + slow_ed25519.Hint(slow_ed25519.encodepoint(R) + pk + m) * a) % slow_ed25519.l + + return slow_ed25519.encodepoint(R) + slow_ed25519.encodeint(S)
-def signatureWithESK(m,h,pk): - a = decodeint(h[:32]) - r = Hint(b''.join([h[i:i+1] for i in range(b//8,b//4)]) + m) - R = scalarmult(B,r) - S = (r + Hint(encodepoint(R) + pk + m) * a) % l - return encodepoint(R) + encodeint(S)
def newSK(): - return os.urandom(32) + return os.urandom(32) + + +def random_scalar(entropy_f): + # 0..L-1 inclusive + # reduce the bias to a safe level by generating 256 extra bits + + oversized = int(binascii.hexlify(entropy_f(32 + 32)), 16) + return oversized % ell
-def random_scalar(entropy_f): # 0..L-1 inclusive - # reduce the bias to a safe level by generating 256 extra bits - oversized = int(binascii.hexlify(entropy_f(32+32)), 16) - return oversized % ell
# ------------------------------------------------------------
-MSG = b"This is extremely silly. But it is also incredibly serious business!" +MSG = b'This is extremely silly. But it is also incredibly serious business!' +
class SelfTest(unittest.TestCase): + def _testSignatures(self, esk, pk): + sig = signatureWithESK(MSG, esk, pk) + slow_ed25519.checkvalid(sig, MSG, pk) + bad = False + + try: + slow_ed25519.checkvalid(sig, MSG * 2, pk) + bad = True + except Exception: + pass + + self.assertFalse(bad) + + def testExpand(self): + sk = newSK() + pk = slow_ed25519.publickey(sk) + esk = expandSK(sk) + sig1 = slow_ed25519.signature(MSG, sk, pk) + sig2 = signatureWithESK(MSG, esk, pk) + self.assertEqual(sig1, sig2) + + def testSignatures(self): + sk = newSK() + esk = expandSK(sk) + pk = publickeyFromESK(esk) + pk2 = slow_ed25519.publickey(sk) + self.assertEqual(pk, pk2) + + self._testSignatures(esk, pk) + + def testBlinding(self): + sk = newSK() + esk = expandSK(sk) + pk = publickeyFromESK(esk) + param = os.urandom(32) + besk = blindESK(esk, param) + bpk = blindPK(pk, param) + bpk2 = publickeyFromESK(besk) + self.assertEqual(bpk, bpk2) + + self._testSignatures(besk, bpk) + + def testIdentity(self): + # Get identity E by doing: E = l*B, where l is the group order + identity = slow_ed25519.scalarmult(slow_ed25519.B, ell) + + # Get identity E by doing: E = l*A, where A is a random point + sk = newSK() + pk = slow_ed25519.decodepoint(slow_ed25519.publickey(sk)) + identity2 = slow_ed25519.scalarmult(pk, ell) + + # Check that identities match + assert(identity == identity2) + # Check that identity is the point (0, 1) + assert(identity == [0, 1]) + + # Check identity element: a*E = E, where a is a random scalar + scalar = random_scalar(os.urandom) + result = slow_ed25519.scalarmult(identity, scalar) + assert(result == identity == identity2)
- def _testSignatures(self, esk, pk): - sig = signatureWithESK(MSG, esk, pk) - checkvalid(sig, MSG, pk) - bad = False - try: - checkvalid(sig, MSG*2, pk) - bad = True - except Exception: - pass - - self.assertFalse(bad) - - def testExpand(self): - sk = newSK() - pk = publickey(sk) - esk = expandSK(sk) - sig1 = signature(MSG, sk, pk) - sig2 = signatureWithESK(MSG, esk, pk) - self.assertEqual(sig1, sig2) - - def testSignatures(self): - sk = newSK() - esk = expandSK(sk) - pk = publickeyFromESK(esk) - pk2 = publickey(sk) - self.assertEqual(pk, pk2) - - self._testSignatures(esk, pk) - - def testBlinding(self): - sk = newSK() - esk = expandSK(sk) - pk = publickeyFromESK(esk) - param = os.urandom(32) - besk = blindESK(esk, param) - bpk = blindPK(pk, param) - bpk2 = publickeyFromESK(besk) - self.assertEqual(bpk, bpk2) - - self._testSignatures(besk, bpk) - - def testIdentity(self): - # Base point: - # B is the unique point (x, 4/5) \in E for which x is positive - By = 4 * inv(5) - Bx = xrecover(By) - B = [Bx % q,By % q] - - # Get identity E by doing: E = l*B, where l is the group order - identity = scalarmult(B, ell) - - # Get identity E by doing: E = l*A, where A is a random point - sk = newSK() - pk = decodepoint(publickey(sk)) - identity2 = scalarmult(pk, ell) - - # Check that identities match - assert(identity == identity2) - # Check that identity is the point (0,1) - assert(identity == [0,1]) - - # Check identity element: a*E = E, where a is a random scalar - scalar = random_scalar(os.urandom) - result = scalarmult(identity, scalar) - assert(result == identity == identity2)
# ------------------------------------------------------------
-# From pprint.pprint([ binascii.b2a_hex(os.urandom(32)) for _ in xrange(8) ]) +# From pprint.pprint([binascii.b2a_hex(os.urandom(32)) for _ in xrange(8)]) RAND_INPUTS = [ '26c76712d89d906e6672dafa614c42e5cb1caac8c6568e4d2493087db51f0d36', 'fba7a5366b5cb98c2667a18783f5cf8f4f8d1a2ce939ad22a6e685edde85128d', @@ -149,7 +154,7 @@ RAND_INPUTS = [ '4377c40431c30883c5fbd9bc92ae48d1ed8a47b81d13806beac5351739b5533d', 'c6bbcce615839756aed2cc78b1de13884dd3618f48367a17597a16c1cd7a290b']
-# From pprint.pprint([ binascii.b2a_hex(os.urandom(32)) for _ in xrange(8) ]) +# From pprint.pprint([binascii.b2a_hex(os.urandom(32)) for _ in xrange(8)]) BLINDING_PARAMS = [ '54a513898b471d1d448a2f3c55c1de2c0ef718c447b04497eeb999ed32027823', '831e9b5325b5d31b7ae6197e9c7a7baf2ec361e08248bce055908971047a2347', @@ -160,76 +165,79 @@ BLINDING_PARAMS = [ '97f45142597c473a4b0e9a12d64561133ad9e1155fe5a9807fe6af8a93557818', '3f44f6a5a92cde816635dfc12ade70539871078d2ff097278be2a555c9859cd0']
-PREFIX = "ED25519_" +PREFIX = 'ED25519_' +
def writeArray(name, array): - print("static const char *{prefix}{name}[] = {{".format( - prefix=PREFIX,name=name)) - for a in array: - h = a.hex() - if len(h) > 70: - h1 = h[:70] - h2 = h[70:] - print(' "{0}"\n "{1}",'.format(h1,h2)) - else: - print(' "{0}",'.format(h)) - print("};\n") - -def comment(text, initial="/**"): - print(initial) - print(textwrap.fill(text,initial_indent=" * ",subsequent_indent=" * ")) - print(" */") + print('static const char *{prefix}{name}[] = {{'.format(prefix = PREFIX, name = name)) + + for a in array: + h = a.hex() + + if len(h) > 70: + h1 = h[:70] + h2 = h[70:] + print(' "{0}"\n "{1}",'.format(h1, h2)) + else: + print(' "{0}",'.format(h)) + + print('};\n') + + +def comment(text, initial='/**'): + print(initial) + print(textwrap.fill(text, initial_indent = ' * ', subsequent_indent = ' * ')) + print(' */') +
def makeTestVectors(): - comment("""Test vectors for our ed25519 implementation and related - functions. These were automatically generated by the - ed25519_exts_ref.py script.""", initial="/*") + comment("""Test vectors for our ed25519 implementation and related + functions. These were automatically generated by the + ed25519_exts_ref.py script.""", initial = '/*')
+ comment("""Secret key seeds used as inputs for the ed25519 test vectors. + Randomly generated. """) + secretKeys = [bytes.fromhex(r) for r in RAND_INPUTS] + writeArray('SECRET_KEYS', secretKeys)
- comment("""Secret key seeds used as inputs for the ed25519 test vectors. - Randomly generated. """) - secretKeys = [ bytes.fromhex(r) for r in RAND_INPUTS ] - writeArray("SECRET_KEYS", secretKeys) + comment("""Secret ed25519 keys after expansion from seeds. This is how Tor + represents them internally.""") + expandedSecretKeys = [expandSK(sk) for sk in secretKeys] + writeArray('EXPANDED_SECRET_KEYS', expandedSecretKeys)
- comment("""Secret ed25519 keys after expansion from seeds. This is how Tor - represents them internally.""") - expandedSecretKeys = [ expandSK(sk) for sk in secretKeys ] - writeArray("EXPANDED_SECRET_KEYS", expandedSecretKeys) + comment('Public keys derived from the above secret keys') + publicKeys = [slow_ed25519.publickey(sk) for sk in secretKeys] + writeArray('PUBLIC_KEYS', publicKeys)
- comment("""Public keys derived from the above secret keys""") - publicKeys = [ publickey(sk) for sk in secretKeys ] - writeArray("PUBLIC_KEYS", publicKeys) + comment('Parameters used for key blinding tests. Randomly generated.') + blindingParams = [binascii.a2b_hex(r) for r in BLINDING_PARAMS] + writeArray('BLINDING_PARAMS', blindingParams)
- comment("""Parameters used for key blinding tests. Randomly generated.""") - blindingParams = [ binascii.a2b_hex(r) for r in BLINDING_PARAMS ] - writeArray("BLINDING_PARAMS", blindingParams) + comment("""Blinded secret keys for testing key blinding. The nth blinded + key corresponds to the nth secret key blidned with the nth + blinding parameter.""")
- comment("""Blinded secret keys for testing key blinding. The nth blinded - key corresponds to the nth secret key blidned with the nth - blinding parameter.""") - writeArray("BLINDED_SECRET_KEYS", - (blindESK(expandSK(sk), bp) - for sk,bp in zip(secretKeys,blindingParams))) + writeArray('BLINDED_SECRET_KEYS', (blindESK(expandSK(sk), bp) for sk, bp in zip(secretKeys, blindingParams)))
- comment("""Blinded public keys for testing key blinding. The nth blinded - key corresponds to the nth public key blidned with the nth - blinding parameter.""") - writeArray("BLINDED_PUBLIC_KEYS", - (blindPK(pk, bp) for pk,bp in zip(publicKeys,blindingParams))) + comment("""Blinded public keys for testing key blinding. The nth blinded + key corresponds to the nth public key blidned with the nth + blinding parameter.""")
- comment("""Signatures of the public keys, made with their corresponding - secret keys.""") - writeArray("SELF_SIGNATURES", - (signature(pk, sk, pk) for pk,sk in zip(publicKeys,secretKeys))) + writeArray('BLINDED_PUBLIC_KEYS', (blindPK(pk, bp) for pk, bp in zip(publicKeys, blindingParams)))
+ comment("""Signatures of the public keys, made with their corresponding + secret keys.""") + writeArray('SELF_SIGNATURES', (slow_ed25519.signature(pk, sk, pk) for pk, sk in zip(publicKeys, secretKeys)))
if __name__ == '__main__': - import sys - if len(sys.argv) == 1 or sys.argv[1] not in ("SelfTest", "MakeVectors"): - print ("You should specify one of 'SelfTest' or 'MakeVectors'") - sys.exit(1) - if sys.argv[1] == 'SelfTest': - unittest.main() - else: - makeTestVectors() + import sys + + if len(sys.argv) == 1 or sys.argv[1] not in ('SelfTest', 'MakeVectors'): + print ("You should specify one of 'SelfTest' or 'MakeVectors'") + sys.exit(1) + + if sys.argv[1] == 'SelfTest': + unittest.main() + else: + makeTestVectors() diff --git a/stem/descriptor/hidden_service.py b/stem/descriptor/hidden_service.py index b96c8a87..b7bf9a08 100644 --- a/stem/descriptor/hidden_service.py +++ b/stem/descriptor/hidden_service.py @@ -32,22 +32,27 @@ These are only available through the Controller's import base64 import binascii import collections +import datetime import hashlib import io import os -import struct import time
import stem.client.datatype +import stem.descriptor.certificate import stem.prereq import stem.util.connection import stem.util.str_tools import stem.util.tor_tools
from stem.descriptor import hsv3_crypto - from stem.descriptor.certificate import Ed25519Certificate, CertType
+from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey + + from stem.descriptor import ( PGP_BLOCK_END, Descriptor, @@ -169,8 +174,7 @@ class IntroductionPointV3(object):
:var Ed25519Certificate descriptor_signing_key: hsv3 descriptor signing key (needed to encode the intro point) """ - def __init__(self, link_specifiers, onion_key, enc_key, - auth_key=None, auth_key_cert=None, legacy_key=None, enc_key_cert=None, legacy_key_cert=None): + def __init__(self, link_specifiers, onion_key, enc_key, auth_key=None, auth_key_cert=None, legacy_key=None, enc_key_cert=None, legacy_key_cert=None): """ Initialize this intro point.
@@ -184,10 +188,10 @@ class IntroductionPointV3(object): """
# if not link_specifiers or not onion_key or not enc_key: - # raise ValueError("Introduction point missing essential keys") + # raise ValueError('Introduction point missing essential keys')
if not auth_key and not auth_key_cert: - raise ValueError("Either auth key or auth key cert needs to be provided") + raise ValueError('Either auth key or auth key cert needs to be provided')
# If we have an auth key cert but not an auth key, extract the key if auth_key_cert and not auth_key: @@ -212,8 +216,10 @@ class IntroductionPointV3(object): LSLEN (Link specifier length) [1 byte] LSPEC (Link specifier) [LSLEN bytes] """ - ls_block = b"" + + ls_block = b'' ls_block += chr(len(self.link_specifiers)) + for ls in self.link_specifiers: ls_block += ls.pack()
@@ -223,42 +229,33 @@ class IntroductionPointV3(object): """ Encode this introduction point into bytes """ + if not descriptor_signing_privkey: - raise ValueError("Cannot encode: Descriptor signing key not provided") + raise ValueError('Cannot encode: Descriptor signing key not provided')
cert_expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54)
- body = b"" + body = b''
- body += b"introduction-point %s\n" % (self._encode_link_specifier_block()) + body += b'introduction-point %s\n' % (self._encode_link_specifier_block())
# Onion key - onion_key_bytes = self.onion_key.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw) - body += b"onion-key ntor %s\n" % (base64.b64encode(onion_key_bytes)) + onion_key_bytes = self.onion_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) + body += b'onion-key ntor %s\n' % (base64.b64encode(onion_key_bytes))
# Build auth key certificate - auth_key_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type=CertType.HS_V3_INTRO_AUTH, - expiration_date=cert_expiration_date, - cert_key_type=1, certified_pub_key=self.auth_key, - signing_priv_key=descriptor_signing_privkey, - include_signing_key=True) + auth_key_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_INTRO_AUTH, expiration_date = cert_expiration_date, cert_key_type = 1, certified_pub_key = self.auth_key, signing_priv_key = descriptor_signing_privkey, include_signing_key = True) auth_key_cert_b64_blob = auth_key_cert.encode_for_descriptor() - body += b"auth-key\n%s\n" % (auth_key_cert_b64_blob) + body += b'auth-key\n%s\n' % (auth_key_cert_b64_blob)
# Build enc key line - enc_key_bytes = self.enc_key.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw) - body += b"enc-key ntor %s\n" % (base64.b64encode(enc_key_bytes)) + enc_key_bytes = self.enc_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) + body += b'enc-key ntor %s\n' % (base64.b64encode(enc_key_bytes))
# Build enc key cert (this does not actually need to certify anything because of #29583) - enc_key_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type=CertType.HS_V3_INTRO_ENC, - expiration_date=cert_expiration_date, - cert_key_type=1, certified_pub_key=self.auth_key, - signing_priv_key=descriptor_signing_privkey, - include_signing_key=True) + enc_key_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_INTRO_ENC, expiration_date = cert_expiration_date, cert_key_type = 1, certified_pub_key = self.auth_key, signing_priv_key = descriptor_signing_privkey, include_signing_key = True) enc_key_cert_b64_blob = enc_key_cert.encode_for_descriptor() - body += b"enc-key-cert\n%s\n" % (enc_key_cert_b64_blob) + body += b'enc-key-cert\n%s\n' % (enc_key_cert_b64_blob)
# We are called to encode legacy key, but we don't know how # TODO do legacy keys! @@ -267,6 +264,7 @@ class IntroductionPointV3(object):
return body
+ class AuthorizedClient(collections.namedtuple('AuthorizedClient', ['id', 'iv', 'cookie'])): """ Client authorized to use a v3 hidden service. @@ -459,19 +457,16 @@ def _parse_v3_introduction_points(descriptor, entries):
legacy_key = entry['legacy-key'][0][2] if 'legacy-key' in entry else None legacy_key_cert = entry['legacy-key-cert'][0][2] if 'legacy-key-cert' in entry else None -# if legacy_key_cert: -# legacy_key_cert = Ed25519Certificate.parse(legacy_key_cert) -
introduction_points.append( IntroductionPointV3( - link_specifiers=link_specifiers, - onion_key=onion_key, - auth_key_cert=auth_key_cert, - enc_key=enc_key, - enc_key_cert=enc_key_cert, - legacy_key=legacy_key, - legacy_key_cert=legacy_key_cert, + link_specifiers = link_specifiers, + onion_key = onion_key, + auth_key_cert = auth_key_cert, + enc_key = enc_key, + enc_key_cert = enc_key_cert, + legacy_key = legacy_key, + legacy_key_cert = legacy_key_cert, ) )
@@ -789,12 +784,6 @@ class HiddenServiceDescriptorV2(BaseHiddenServiceDescriptor):
return introduction_points
-import stem.descriptor.certificate -from stem.descriptor import hsv3_crypto -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey -from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey -import datetime
def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key): """ @@ -804,6 +793,7 @@ def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key
'blinded_priv_key' key that signs the certificate """ + # 54 hours expiration date like tor does expiration_date = datetime.datetime.utcnow() + datetime.timedelta(hours=54)
@@ -819,21 +809,26 @@ def _get_descriptor_signing_cert(descriptor_signing_public_key, blinded_priv_key cert_base64 = stem.util.str_tools._to_unicode(base64.b64encode(signing_cert_bytes)) cert_blob = '\n'.join(stem.util.str_tools._split_by_length(cert_base64, 64))
- return '\n-----BEGIN %s-----\n%s\n-----END %s-----' % ("ED25519 CERT", cert_blob, "ED25519 CERT") + return '\n-----BEGIN %s-----\n%s\n-----END %s-----' % ('ED25519 CERT', cert_blob, 'ED25519 CERT') +
def _get_descriptor_revision_counter(): # TODO replace with OPE scheme return int(time.time())
-def b64_and_wrap_desc_layer(layer_bytes, prefix_bytes=b""): + +def b64_and_wrap_desc_layer(layer_bytes, prefix_bytes=b''): """ Encode the descriptor layer in 'layer_bytes' to base64, and then wrap it up so that it can be included in the descriptor. """ + layer_b64 = base64.b64encode(layer_bytes) layer_blob = b'\n'.join(stem.util.str_tools._split_by_length(layer_b64, 64)) + return b'%s\n-----BEGIN MESSAGE-----\n%s\n-----END MESSAGE-----' % (prefix_bytes, layer_blob)
+ def _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey): """ Get the inner descriptor layer into bytes. @@ -842,10 +837,11 @@ def _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey): this layer, and we also need the descriptor signing key to encode the intro points. """ + if (not intro_points or len(intro_points) == 0): - raise ValueError("Need a proper intro points set") + raise ValueError('Need a proper intro points set')
- final_body = b"create2-formats 2\n" + final_body = b'create2-formats 2\n'
# Now encode all the intro points for intro_point in intro_points: @@ -853,70 +849,68 @@ def _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey):
return final_body
+ def _get_fake_clients_bytes(): """ Generate fake client authorization data for the middle layer """ - final_bytes = b""
- num_fake_clients = 16 # default for when client auth is disabled + final_bytes = b'' + num_fake_clients = 16 # default for when client auth is disabled
for _ in range(num_fake_clients): - client_id = base64.b64encode(os.urandom(8)).rstrip(b"=") - client_iv = base64.b64encode(os.urandom(16)).rstrip(b"=") - descriptor_cookie = base64.b64encode(os.urandom(16)).rstrip(b"=") + client_id = base64.b64encode(os.urandom(8)).rstrip(b'=') + client_iv = base64.b64encode(os.urandom(16)).rstrip(b'=') + descriptor_cookie = base64.b64encode(os.urandom(16)).rstrip(b'=')
- final_bytes += b"%s %s %s %s\n" % (b"auth-client", client_id, client_iv, descriptor_cookie) + final_bytes += b'%s %s %s %s\n' % (b'auth-client', client_id, client_iv, descriptor_cookie)
return final_bytes
+ def _get_middle_descriptor_layer_body(encrypted): """ Get the middle descriptor layer as bytes (It's just fake client auth data since client auth is disabled) """ - fake_priv_key = X25519PrivateKey.generate() + fake_pub_key = X25519PrivateKey.generate().public_key() - fake_pub_key_bytes = fake_pub_key.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw) + fake_pub_key_bytes = fake_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) fake_pub_key_bytes_b64 = base64.b64encode(fake_pub_key_bytes) fake_clients = _get_fake_clients_bytes()
- return b"desc-auth-type x25519\n" \ - b"desc-auth-ephemeral-key %s\n" \ - b"%s" \ - b"%s" % (fake_pub_key_bytes_b64, fake_clients, encrypted) + return b'desc-auth-type x25519\n' \ + b'desc-auth-ephemeral-key %s\n' \ + b'%s' \ + b'%s' % (fake_pub_key_bytes_b64, fake_clients, encrypted)
-def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, - revision_counter, blinded_key_bytes, subcredential): + +def _get_superencrypted_blob(intro_points, descriptor_signing_privkey, revision_counter, blinded_key_bytes, subcredential): """ Get the superencrypted blob (which also includes the encrypted blob) that should be attached to the descriptor """ - inner_descriptor_layer = _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey)
- inner_ciphertext = hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, - revision_counter, blinded_key_bytes, subcredential) - - - inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b"encrypted") + inner_descriptor_layer = _get_inner_descriptor_layer_body(intro_points, descriptor_signing_privkey) + inner_ciphertext = hsv3_crypto.encrypt_inner_layer(inner_descriptor_layer, revision_counter, blinded_key_bytes, subcredential) + inner_ciphertext_b64 = b64_and_wrap_desc_layer(inner_ciphertext, b'encrypted')
middle_descriptor_layer = _get_middle_descriptor_layer_body(inner_ciphertext_b64) - - outter_ciphertext = hsv3_crypto.encrypt_outter_layer(middle_descriptor_layer, - revision_counter, blinded_key_bytes, subcredential) + outter_ciphertext = hsv3_crypto.encrypt_outter_layer(middle_descriptor_layer, revision_counter, blinded_key_bytes, subcredential)
return b64_and_wrap_desc_layer(outter_ciphertext)
+ def _get_v3_desc_signature(desc_str, signing_key): """ Compute the descriptor signature and return it as bytes """ - desc_str = b"Tor onion service descriptor sig v3" + desc_str + + desc_str = b'Tor onion service descriptor sig v3' + desc_str
signature = base64.b64encode(signing_key.sign(desc_str)) - signature = signature.rstrip(b"=") - return b"signature %s" % (signature) + signature = signature.rstrip(b'=') + return b'signature %s' % (signature)
class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): @@ -957,9 +951,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): }
@classmethod - def content(cls, attr = None, exclude = (), sign = False, - ed25519_private_identity_key = None, intro_points = None, - blinding_param = None): + def content(cls, attr = None, exclude = (), sign = False, ed25519_private_identity_key = None, intro_points = None, blinding_param = None): """ 'ed25519_private_identity_key' is the Ed25519PrivateKey of the onion service
@@ -968,6 +960,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): 'blinding_param' is a 32 byte blinding factor that should be used to derive the blinded key from the identity key """ + if sign: raise NotImplementedError('Signing of %s not implemented' % cls.__name__)
@@ -990,6 +983,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): # return it to the caller, otherwise the caller will have no way to decode # the descriptor without knowing the private key or the onion address, so # for now we consider it a mandatory argument. + if not ed25519_private_identity_key: raise ValueError('Need to provide a private ed25519 identity key to create a descriptor')
@@ -1001,16 +995,12 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor):
# Get the identity public key public_identity_key = ed25519_private_identity_key.public_key() - public_identity_key_bytes = public_identity_key.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw) - + public_identity_key_bytes = public_identity_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
# Blind the identity key to get ephemeral blinded key - blinded_privkey = hsv3_crypto.HSv3PrivateBlindedKey(ed25519_private_identity_key, - blinding_param=blinding_param) + blinded_privkey = hsv3_crypto.HSv3PrivateBlindedKey(ed25519_private_identity_key, blinding_param = blinding_param) blinded_pubkey = blinded_privkey.public_key() - blinded_pubkey_bytes = blinded_pubkey.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw) + blinded_pubkey_bytes = blinded_pubkey.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
# Generate descriptor signing key descriptor_signing_private_key = Ed25519PrivateKey.generate() @@ -1024,8 +1014,7 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): # this descriptor object so that we don't have to carry them around # functions and instead we could use e.g. self.descriptor_signing_public_key # But because this is a @classmethod this is not possible :/ - superencrypted_blob = _get_superencrypted_blob(intro_points, descriptor_signing_private_key, - revision_counter_int, blinded_pubkey_bytes, subcredential) + superencrypted_blob = _get_superencrypted_blob(intro_points, descriptor_signing_private_key, revision_counter_int, blinded_pubkey_bytes, subcredential)
desc_content = _descriptor_content(attr, exclude, ( ('hs-descriptor', '3'), @@ -1036,14 +1025,13 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): ), ())
# Add a final newline before the signature block - desc_content += b"\n" + desc_content += b'\n'
# Compute the signature and append it to the descriptor signature = _get_v3_desc_signature(desc_content, descriptor_signing_private_key) final_desc = desc_content + signature return final_desc
- @classmethod def create(cls, attr = None, exclude = (), validate = True, sign = False): # Create a string-representation of the descriptor and then parse it @@ -1079,8 +1067,8 @@ class HiddenServiceDescriptorV3(BaseHiddenServiceDescriptor): # Verify the signature! # First compute the body that was signed descriptor_signing_key = self.signing_cert.certified_ed25519_key() - descriptor_body = raw_contents.split(b"signature")[0] # everything before the signature - signature_body = b"Tor onion service descriptor sig v3" + descriptor_body + descriptor_body = raw_contents.split(b'signature')[0] # everything before the signature + signature_body = b'Tor onion service descriptor sig v3' + descriptor_body
# Decode base64 signature missing_padding = len(self.signature) % 4 diff --git a/stem/descriptor/hsv3_crypto.py b/stem/descriptor/hsv3_crypto.py index dd64a95f..71f76e9c 100644 --- a/stem/descriptor/hsv3_crypto.py +++ b/stem/descriptor/hsv3_crypto.py @@ -3,26 +3,27 @@ import hashlib import struct import os
+import stem.descriptor.slow_ed25519 import stem.prereq
from stem.descriptor import ed25519_exts_ref from stem.descriptor import slow_ed25519
-from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization
+ def pubkeys_are_equal(pubkey1, pubkey2): - """ - Compare the raw bytes of the two pubkeys and return True if they are the same - """ - pubkey1_bytes = pubkey1.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw) - pubkey2_bytes = pubkey2.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw) + """ + Compare the raw bytes of the two pubkeys and return True if they are the same + """ + + pubkey1_bytes = pubkey1.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) + pubkey2_bytes = pubkey2.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw) + + return pubkey1_bytes == pubkey2_bytes
- return pubkey1_bytes == pubkey2_bytes
""" HSv3 Key blinding @@ -37,38 +38,41 @@ certificate module. - HSv3PrivateBlindedKey: represents the private part of a blinded ed25519 key of an onion service and should expose a public_key() method and a sign() method. """ + + class HSv3PrivateBlindedKey(object): - def __init__(self, hazmat_private_key, blinding_param): - secret_seed = hazmat_private_key.private_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PrivateFormat.Raw, - encryption_algorithm=serialization.NoEncryption()) - assert(len(secret_seed) == 32) + def __init__(self, hazmat_private_key, blinding_param): + secret_seed = hazmat_private_key.private_bytes(encoding = serialization.Encoding.Raw, format = serialization.PrivateFormat.Raw, encryption_algorithm = serialization.NoEncryption()) + assert(len(secret_seed) == 32)
- expanded_identity_priv_key = ed25519_exts_ref.expandSK(secret_seed) - identity_public_key = slow_ed25519.publickey(secret_seed) + expanded_identity_priv_key = ed25519_exts_ref.expandSK(secret_seed) + identity_public_key = slow_ed25519.publickey(secret_seed)
- self.blinded_secret_key = ed25519_exts_ref.blindESK(expanded_identity_priv_key, blinding_param) - blinded_public_key = ed25519_exts_ref.blindPK(identity_public_key, blinding_param) - self.blinded_public_key = HSv3PublicBlindedKey(blinded_public_key) + self.blinded_secret_key = ed25519_exts_ref.blindESK(expanded_identity_priv_key, blinding_param) + blinded_public_key = ed25519_exts_ref.blindPK(identity_public_key, blinding_param) + self.blinded_public_key = HSv3PublicBlindedKey(blinded_public_key)
- def public_key(self): - return self.blinded_public_key + def public_key(self): + return self.blinded_public_key + + def sign(self, msg): + return ed25519_exts_ref.signatureWithESK(msg, self.blinded_secret_key, self.blinded_public_key.public_bytes())
- def sign(self, msg): - return ed25519_exts_ref.signatureWithESK(msg, self.blinded_secret_key, self.blinded_public_key.public_bytes())
class HSv3PublicBlindedKey(object): - def __init__(self, public_key): - self.public_key = public_key + def __init__(self, public_key): + self.public_key = public_key + + def public_bytes(self, encoding=None, format=None): + return self.public_key
- def public_bytes(self, encoding=None, format=None): - return self.public_key + def verify(self, signature, message): + """ + raises exception if sig not valid + """ + + stem.descriptor.slow_ed25519.checkvalid(signature, message, self.public_key)
- def verify(self, signature, message): - """ - raises exception if sig not valid - """ - ext.slow_ed25519.checkvalid(signature, message, self.public_key)
""" subcredential @@ -76,14 +80,17 @@ subcredential subcredential = H("subcredential" | credential | blinded-public-ke credential = H("credential" | public-identity-key) """ + + def get_subcredential(public_identity_key, blinded_key): - cred_bytes_constant = "credential".encode() - subcred_bytes_constant = "subcredential".encode() + cred_bytes_constant = 'credential'.encode() + subcred_bytes_constant = 'subcredential'.encode()
- credential = hashlib.sha3_256(b"%s%s" % (cred_bytes_constant, public_identity_key)).digest() - subcredential = hashlib.sha3_256(b"%s%s%s" % (subcred_bytes_constant, credential, blinded_key)).digest() + credential = hashlib.sha3_256(b'%s%s' % (cred_bytes_constant, public_identity_key)).digest() + subcredential = hashlib.sha3_256(b'%s%s%s' % (subcred_bytes_constant, credential, blinded_key)).digest() + + return subcredential
- return subcredential
""" Onion address @@ -96,25 +103,28 @@ Onion address - ".onion checksum" is a constant string - CHECKSUM is truncated to two bytes before inserting it in onion_address """ -CHECKSUM_CONSTANT = b".onion checksum" + +CHECKSUM_CONSTANT = b'.onion checksum' +
def encode_onion_address(ed25519_pub_key_bytes): - """ - Given the public key, return the onion address - """ + """ + Given the public key, return the onion address + """ + + if not stem.prereq._is_sha3_available(): + raise ImportError('Encoding onion addresses requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)')
- if not stem.prereq._is_sha3_available(): - raise ImportError('Encoding onion addresses requires python 3.6+ or the pysha3 module (https://pypi.org/project/pysha3/)') + version = 3 + checksum_body = b'%s%s%d' % (CHECKSUM_CONSTANT, ed25519_pub_key_bytes, version) + checksum = hashlib.sha3_256(checksum_body).digest()[:2]
- version = 3 - checksum_body = b"%s%s%d" % (CHECKSUM_CONSTANT, ed25519_pub_key_bytes, version) - checksum = hashlib.sha3_256(checksum_body).digest()[:2] + onion_address_bytes = b'%s%s%d' % (ed25519_pub_key_bytes, checksum, version) + onion_address = base64.b32encode(onion_address_bytes) + b'.onion' + assert(len(onion_address) == 56 + len('.onion'))
- onion_address_bytes = b"%s%s%d" % (ed25519_pub_key_bytes, checksum, version) - onion_address = base64.b32encode(onion_address_bytes) + b".onion" - assert(len(onion_address) == 56 + len(".onion")) + return onion_address.lower()
- return onion_address.lower()
""" Basic descriptor logic: @@ -151,113 +161,115 @@ S_IV_LEN = 16 MAC_KEY_LEN = 32
""" - Descriptor encryption - """
+ def pack(val): - return struct.pack('>Q', val) + return struct.pack('>Q', val)
-def get_desc_keys(secret_data, string_constant, - subcredential, revision_counter, salt): - """ - secret_input = SECRET_DATA | subcredential | INT_8(revision_counter) +def get_desc_keys(secret_data, string_constant, subcredential, revision_counter, salt): + """ + secret_input = SECRET_DATA | subcredential | INT_8(revision_counter)
- keys = KDF(secret_input | salt | STRING_CONSTANT, S_KEY_LEN + S_IV_LEN + MAC_KEY_LEN) + keys = KDF(secret_input | salt | STRING_CONSTANT, S_KEY_LEN + S_IV_LEN + MAC_KEY_LEN)
- SECRET_KEY = first S_KEY_LEN bytes of keys - SECRET_IV = next S_IV_LEN bytes of keys - MAC_KEY = last MAC_KEY_LEN bytes of keys + SECRET_KEY = first S_KEY_LEN bytes of keys + SECRET_IV = next S_IV_LEN bytes of keys + MAC_KEY = last MAC_KEY_LEN bytes of keys
- where + where
- 2.5.1.1. First layer encryption logic - SECRET_DATA = blinded-public-key - STRING_CONSTANT = "hsdir-superencrypted-data" + 2.5.1.1. First layer encryption logic + SECRET_DATA = blinded-public-key + STRING_CONSTANT = "hsdir-superencrypted-data"
- 2.5.2.1. Second layer encryption keys - SECRET_DATA = blinded-public-key | descriptor_cookie - STRING_CONSTANT = "hsdir-encrypted-data" - """ - secret_input = b"%s%s%s" % (secret_data, subcredential, pack(revision_counter)) + 2.5.2.1. Second layer encryption keys + SECRET_DATA = blinded-public-key | descriptor_cookie + STRING_CONSTANT = "hsdir-encrypted-data" + """
- kdf = hashlib.shake_256(secret_input + salt + string_constant) + secret_input = b'%s%s%s' % (secret_data, subcredential, pack(revision_counter))
- keys = kdf.digest(S_KEY_LEN + S_IV_LEN + MAC_LEN) + kdf = hashlib.shake_256(secret_input + salt + string_constant)
- secret_key = keys[:S_KEY_LEN] - secret_iv = keys[S_KEY_LEN:S_KEY_LEN + S_IV_LEN] - mac_key = keys[S_KEY_LEN + S_IV_LEN:] + keys = kdf.digest(S_KEY_LEN + S_IV_LEN + MAC_LEN) + + secret_key = keys[:S_KEY_LEN] + secret_iv = keys[S_KEY_LEN:S_KEY_LEN + S_IV_LEN] + mac_key = keys[S_KEY_LEN + S_IV_LEN:] + + return secret_key, secret_iv, mac_key
- return secret_key, secret_iv, mac_key
def get_desc_encryption_mac(key, salt, ciphertext): + mac = hashlib.sha3_256(pack(len(key)) + key + pack(len(salt)) + salt + ciphertext).digest() + return mac
- mac = hashlib.sha3_256(pack(len(key)) + key + pack(len(salt)) + salt + ciphertext).digest() - return mac
-def _encrypt_descriptor_layer(plaintext, revision_counter, - subcredential, - secret_data, string_constant): - """ - Encrypt descriptor layer at 'plaintext' - """ - salt = os.urandom(16) +def _encrypt_descriptor_layer(plaintext, revision_counter, subcredential, secret_data, string_constant): + """ + Encrypt descriptor layer at 'plaintext' + """ + + salt = os.urandom(16)
- secret_key, secret_iv, mac_key = get_desc_keys(secret_data, string_constant, - subcredential, revision_counter, salt) + secret_key, secret_iv, mac_key = get_desc_keys(secret_data, string_constant, subcredential, revision_counter, salt)
- # Now time to encrypt descriptor - cipher = Cipher(algorithms.AES(secret_key), modes.CTR(secret_iv), default_backend()) - encryptor = cipher.encryptor() - ciphertext = encryptor.update(plaintext) + encryptor.finalize() + # Now time to encrypt descriptor + cipher = Cipher(algorithms.AES(secret_key), modes.CTR(secret_iv), default_backend()) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(plaintext) + encryptor.finalize()
- mac = get_desc_encryption_mac(mac_key, salt, ciphertext) + mac = get_desc_encryption_mac(mac_key, salt, ciphertext)
- return salt + ciphertext + mac + return salt + ciphertext + mac
def encrypt_inner_layer(plaintext, revision_counter, blinded_key_bytes, subcredential): - """ - Encrypt the inner layer of the descriptor - """ - secret_data = blinded_key_bytes - string_constant = b"hsdir-encrypted-data" + """ + Encrypt the inner layer of the descriptor + """
- return _encrypt_descriptor_layer(plaintext, revision_counter, subcredential, - secret_data, string_constant) + secret_data = blinded_key_bytes + string_constant = b'hsdir-encrypted-data'
+ return _encrypt_descriptor_layer(plaintext, revision_counter, subcredential, secret_data, string_constant) + + +def ceildiv(a, b): + """ + Like // division but return the ceiling instead of the floor + """ + + return -(-a // b)
-def ceildiv(a,b): - """ - Like // division but return the ceiling instead of the floor - """ - return -(-a // b)
def _get_padding_needed(plaintext_len): - """ - Get descriptor padding needed for this descriptor layer. - From the spec: - Before encryption the plaintext is padded with NUL bytes to the nearest - multiple of 10k bytes. - """ - PAD_MULTIPLE_BYTES = 10000 + """ + Get descriptor padding needed for this descriptor layer. + From the spec: + Before encryption the plaintext is padded with NUL bytes to the nearest + multiple of 10k bytes. + """ + + PAD_MULTIPLE_BYTES = 10000 + + final_size = ceildiv(plaintext_len, PAD_MULTIPLE_BYTES) * PAD_MULTIPLE_BYTES + return final_size - plaintext_len
- final_size = ceildiv(plaintext_len, PAD_MULTIPLE_BYTES)*PAD_MULTIPLE_BYTES - return final_size - plaintext_len
def encrypt_outter_layer(plaintext, revision_counter, blinded_key_bytes, subcredential): - """ - Encrypt the outer layer of the descriptor - """ - secret_data = blinded_key_bytes - string_constant = b"hsdir-superencrypted-data" + """ + Encrypt the outer layer of the descriptor + """ + + secret_data = blinded_key_bytes + string_constant = b'hsdir-superencrypted-data'
- # In the outter layer we first need to pad the plaintext - padding_bytes_needed = _get_padding_needed(len(plaintext)) - padded_plaintext = plaintext + b'\x00'*padding_bytes_needed + # In the outter layer we first need to pad the plaintext + padding_bytes_needed = _get_padding_needed(len(plaintext)) + padded_plaintext = plaintext + b'\x00' * padding_bytes_needed
- return _encrypt_descriptor_layer(padded_plaintext, revision_counter, subcredential, - secret_data, string_constant) + return _encrypt_descriptor_layer(padded_plaintext, revision_counter, subcredential, secret_data, string_constant) diff --git a/stem/descriptor/slow_ed25519.py b/stem/descriptor/slow_ed25519.py index a9bbd7c2..ffca5b02 100644 --- a/stem/descriptor/slow_ed25519.py +++ b/stem/descriptor/slow_ed25519.py @@ -11,104 +11,150 @@ import hashlib
b = 256 -q = 2**255 - 19 -l = 2**252 + 27742317777372353535851937790883648493 +q = 2 ** 255 - 19 +l = 2 ** 252 + 27742317777372353535851937790883648493 +
def H(m): return hashlib.sha512(m).digest()
-def expmod(b,e,m): - if e == 0: return 1 - t = expmod(b,e//2,m)**2 % m - if e & 1: t = (t*b) % m + +def expmod(b, e, m): + if e == 0: + return 1 + + t = expmod(b, e // 2, m) ** 2 % m + + if e & 1: + t = (t * b) % m + return t
+ def inv(x): - return expmod(x,q-2,q) + return expmod(x, q - 2, q) +
d = -121665 * inv(121666) -I = expmod(2,(q-1)//4,q) +I = expmod(2, (q - 1) // 4, q) +
def xrecover(y): - xx = (y*y-1) * inv(d*y*y+1) - x = expmod(xx,(q+3)//8,q) - if (x*x - xx) % q != 0: x = (x*I) % q - if x % 2 != 0: x = q-x + xx = (y * y - 1) * inv(d * y * y + 1) + x = expmod(xx, (q + 3) // 8, q) + + if (x * x - xx) % q != 0: + x = (x * I) % q + + if x % 2 != 0: + x = q - x + return x
+ By = 4 * inv(5) Bx = xrecover(By) -B = [Bx % q,By % q] +B = [Bx % q, By % q] +
-def edwards(P,Q): +def edwards(P, Q): x1 = P[0] y1 = P[1] x2 = Q[0] y2 = Q[1] - x3 = (x1*y2+x2*y1) * inv(1+d*x1*x2*y1*y2) - y3 = (y1*y2+x1*x2) * inv(1-d*x1*x2*y1*y2) - return [x3 % q,y3 % q] - -def scalarmult(P,e): - if e == 0: return [0,1] - Q = scalarmult(P,e//2) - Q = edwards(Q,Q) - if e & 1: Q = edwards(Q,P) + x3 = (x1 * y2 + x2 * y1) * inv(1 + d * x1 * x2 * y1 * y2) + y3 = (y1 * y2 + x1 * x2) * inv(1 - d * x1 * x2 * y1 * y2) + return [x3 % q, y3 % q] + + +def scalarmult(P, e): + if e == 0: + return [0, 1] + + Q = scalarmult(P, e // 2) + Q = edwards(Q, Q) + + if e & 1: + Q = edwards(Q, P) + return Q
+ def encodeint(y): bits = [(y >> i) & 1 for i in range(b)] - return b''.join([chr(sum([bits[i * 8 + j] << j for j in range(8)])) for i in range(b//8)]) + return b''.join([chr(sum([bits[i * 8 + j] << j for j in range(8)])) for i in range(b // 8)]) +
def encodepoint(P): x = P[0] y = P[1] bits = [(y >> i) & 1 for i in range(b - 1)] + [x & 1] - return b''.join([chr(sum([bits[i * 8 + j] << j for j in range(8)])) for i in range(b//8)])
-def bit(h,i): - return (ord(h[i//8:i//8+1]) >> (i%8)) & 1 + return b''.join([chr(sum([bits[i * 8 + j] << j for j in range(8)])) for i in range(b // 8)]) + + +def bit(h, i): + return (ord(h[i // 8:i // 8 + 1]) >> (i % 8)) & 1 +
def publickey(sk): h = H(sk) - a = 2**(b-2) + sum(2**i * bit(h,i) for i in range(3,b-2)) - A = scalarmult(B,a) + a = 2 ** (b - 2) + sum(2 ** i * bit(h, i) for i in range(3, b - 2)) + A = scalarmult(B, a) + return encodepoint(A)
+ def Hint(m): h = H(m) - return sum(2**i * bit(h,i) for i in range(2*b)) + return sum(2 ** i * bit(h, i) for i in range(2 * b)) +
-def signature(m,sk,pk): +def signature(m, sk, pk): h = H(sk) - a = 2**(b-2) + sum(2**i * bit(h,i) for i in range(3,b-2)) - r = Hint(b''.join([h[i:i+1] for i in range(b//8,b//4)]) + m) - R = scalarmult(B,r) + a = 2 ** (b - 2) + sum(2 ** i * bit(h, i) for i in range(3, b - 2)) + r = Hint(b''.join([h[i:i + 1] for i in range(b // 8, b // 4)]) + m) + R = scalarmult(B, r) S = (r + Hint(encodepoint(R) + pk + m) * a) % l return encodepoint(R) + encodeint(S)
+ def isoncurve(P): x = P[0] y = P[1] - return (-x*x + y*y - 1 - d*x*x*y*y) % q == 0 + return (-x * x + y * y - 1 - d * x * x * y * y) % q == 0 +
def decodeint(s): - return sum(2**i * bit(s,i) for i in range(0,b)) + return sum(2 ** i * bit(s, i) for i in range(0, b)) +
def decodepoint(s): - y = sum(2**i * bit(s,i) for i in range(0,b-1)) + y = sum(2 ** i * bit(s, i) for i in range(0, b - 1)) x = xrecover(y) - if x & 1 != bit(s,b-1): x = q-x - P = [x,y] - if not isoncurve(P): raise Exception("decoding point that is not on curve") + + if x & 1 != bit(s, b - 1): + x = q - x + + P = [x, y] + + if not isoncurve(P): + raise Exception('decoding point that is not on curve') + return P
-def checkvalid(s,m,pk): - if len(s) != b//4: raise Exception("signature length is wrong") - if len(pk) != b//8: raise Exception("public-key length is wrong") - R = decodepoint(s[0:b//8]) + +def checkvalid(s, m, pk): + if len(s) != b // 4: + raise Exception('signature length is wrong') + + if len(pk) != b // 8: + raise Exception('public-key length is wrong') + + R = decodepoint(s[0:b // 8]) A = decodepoint(pk) - S = decodeint(s[b//8:b//4]) + S = decodeint(s[b // 8:b // 4]) h = Hint(encodepoint(R) + pk + m) - if scalarmult(B,S) != edwards(R,scalarmult(A,h)): - raise Exception("signature does not pass verification") + + if scalarmult(B, S) != edwards(R, scalarmult(A, h)): + raise Exception('signature does not pass verification') diff --git a/test/unit/descriptor/certificate.py b/test/unit/descriptor/certificate.py index 386fe1c1..d86c169a 100644 --- a/test/unit/descriptor/certificate.py +++ b/test/unit/descriptor/certificate.py @@ -207,12 +207,7 @@ class TestEd25519Certificate(unittest.TestCase):
expiration_date = datetime.datetime(2037, 8, 28, 17, 0)
- my_ed_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type=CertType.HS_V3_DESC_SIGNING, - expiration_date=expiration_date, - cert_key_type=1, - certified_pub_key=certified_pub_key, - signing_priv_key=signing_priv_key, - include_signing_key=True) + my_ed_cert = stem.descriptor.certificate.MyED25519Certificate(cert_type = CertType.HS_V3_DESC_SIGNING, expiration_date = expiration_date, cert_key_type = 1, certified_pub_key = certified_pub_key, signing_priv_key = signing_priv_key, include_signing_key = True)
ed_cert_bytes = my_ed_cert.encode() self.assertTrue(my_ed_cert) @@ -225,7 +220,5 @@ class TestEd25519Certificate(unittest.TestCase): self.assertEqual(ed_cert_parsed.type, my_ed_cert.cert_type) self.assertEqual(ed_cert_parsed.expiration, my_ed_cert.expiration_date) self.assertEqual(ed_cert_parsed.key_type, my_ed_cert.cert_key_type) - self.assertEqual(ed_cert_parsed.key, my_ed_cert.certified_pub_key.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw)) - self.assertEqual(ed_cert_parsed.signing_key(), my_ed_cert.signing_pub_key.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw)) + self.assertEqual(ed_cert_parsed.key, my_ed_cert.certified_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)) + self.assertEqual(ed_cert_parsed.signing_key(), my_ed_cert.signing_pub_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)) diff --git a/test/unit/descriptor/hidden_service_v3.py b/test/unit/descriptor/hidden_service_v3.py index ba236ce8..b3a26931 100644 --- a/test/unit/descriptor/hidden_service_v3.py +++ b/test/unit/descriptor/hidden_service_v3.py @@ -52,6 +52,7 @@ with open(get_resource('hidden_service_v3_outer_layer')) as outer_layer_file: with open(get_resource('hidden_service_v3_inner_layer')) as inner_layer_file: INNER_LAYER_STR = inner_layer_file.read()
+ class TestHiddenServiceDescriptorV3(unittest.TestCase): def test_real_descriptor(self): """ @@ -256,10 +257,9 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): return
# Build the service - private_identity_key = Ed25519PrivateKey.from_private_bytes(b"a"*32) + private_identity_key = Ed25519PrivateKey.from_private_bytes(b'a' * 32) public_identity_key = private_identity_key.public_key() - pubkey_bytes = public_identity_key.public_bytes(encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw) + pubkey_bytes = public_identity_key.public_bytes(encoding = serialization.Encoding.Raw, format = serialization.PublicFormat.Raw)
onion_address = hsv3_crypto.encode_onion_address(pubkey_bytes).decode()
@@ -271,12 +271,10 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase):
# TODO: replace with bytes.fromhex() when we drop python 2.x support
- blind_param = bytearray.fromhex("677776AE42464CAAB0DF0BF1E68A5FB651A390A6A8243CF4B60EE73A6AC2E4E3") + blind_param = bytearray.fromhex('677776AE42464CAAB0DF0BF1E68A5FB651A390A6A8243CF4B60EE73A6AC2E4E3')
# Build the descriptor - desc_string = HiddenServiceDescriptorV3.content(ed25519_private_identity_key=private_identity_key, - intro_points=intro_points, - blinding_param=blind_param) + desc_string = HiddenServiceDescriptorV3.content(ed25519_private_identity_key = private_identity_key, intro_points = intro_points, blinding_param = blind_param) desc_string = desc_string.decode()
# Parse the descriptor @@ -288,11 +286,13 @@ class TestHiddenServiceDescriptorV3(unittest.TestCase): # Match introduction points of the parsed descriptor and the generated # descriptor and do some sanity checks between them to make sure that # parsing was done right! + for desc_intro in inner_layer.introduction_points: - original_found = False # Make sure we found all the intro points + original_found = False # Make sure we found all the intro points
for original_intro in intro_points: # Match intro points + if hsv3_crypto.pubkeys_are_equal(desc_intro.auth_key, original_intro.auth_key): original_found = True self.assertTrue(hsv3_crypto.pubkeys_are_equal(desc_intro.enc_key, original_intro.enc_key))