commit ebec5fbd4a8080b1003ae87681cd5b804f5a81d7 Author: Damian Johnson atagar@torproject.org Date: Fri Jul 13 09:54:42 2012 -0700
Utilities for IPv4 and IPv6 addresses
Adding the utilities that I need for the ExitPolicy to handle and translate addresses and their masks. Pity that these aren't provided by the python builtins. The IPy package seems to do it but this isn't worth adding a new dependency. --- run_tests.py | 4 +- stem/util/connection.py | 152 +++++++++++++++++++++++++++++++++++++++++- test/unit/util/connection.py | 77 +++++++++++++++++++++ 3 files changed, 230 insertions(+), 3 deletions(-)
diff --git a/run_tests.py b/run_tests.py index 96622e8..23f993e 100755 --- a/run_tests.py +++ b/run_tests.py @@ -32,8 +32,8 @@ import test.unit.util.enum import test.unit.util.proc import test.unit.util.system import test.unit.util.tor_tools -import test.unit.version import test.unit.exit_policy +import test.unit.version import test.integ.connection.authentication import test.integ.connection.connect import test.integ.control.base_controller @@ -109,6 +109,7 @@ UNIT_TESTS = ( test.unit.descriptor.reader.TestDescriptorReader, test.unit.descriptor.server_descriptor.TestServerDescriptor, test.unit.descriptor.extrainfo_descriptor.TestExtraInfoDescriptor, + test.unit.exit_policy.TestExitPolicy, test.unit.version.TestVersion, test.unit.response.control_message.TestControlMessage, test.unit.response.control_line.TestControlLine, @@ -118,7 +119,6 @@ UNIT_TESTS = ( test.unit.response.protocolinfo.TestProtocolInfoResponse, test.unit.response.authchallenge.TestAuthChallengeResponse, test.unit.connection.authentication.TestAuthenticate, - test.unit.exit_policy.TestExitPolicy )
INTEG_TESTS = ( diff --git a/stem/util/connection.py b/stem/util/connection.py index 13f2d47..a70389e 100644 --- a/stem/util/connection.py +++ b/stem/util/connection.py @@ -3,6 +3,18 @@ Connection and networking based utility functions. This will likely be expanded later to have all of `arm's functions https://gitweb.torproject.org/arm.git/blob/HEAD:/src/util/connections.py`_, but for now just moving the parts we need. + +:: + + is_valid_ip_address - checks if a string is a valid IPv4 address + is_valid_ip_ipv6_address - checks if a string is a valid IPv6 address + is_valid_port - checks if something is a valid representation for a port + expand_ipv6_address - provides an IPv6 address with its collapsed portions expanded + get_mask - provides the mask representation for a given number of bits + get_masked_bits - provides the number of bits represented by a mask + get_mask_ipv6 - provides the IPv6 mask representation for a given number of bits + get_binary - provides the binary representation for an integer with padding + get_address_binary - provides the binary representation for an address """
import os @@ -65,7 +77,7 @@ def is_valid_port(entry, allow_zero = False): """ Checks if a string or int is a valid port number.
- :param list, str, int entry: string, integer or list to be checked + :param list,str,int entry: string, integer or list to be checked :param bool allow_zero: accept port number of zero (reserved by defintion)
:returns: True if input is an integer and within the valid port range, False otherwise @@ -88,6 +100,144 @@ def is_valid_port(entry, allow_zero = False):
return entry > 0 and entry < 65536
+def expand_ipv6_address(address): + """ + Expands abbreviated IPv6 addresses to their full colon separated hex format. + For instance... + + :: + + >>> expand_ipv6_address("2001:db8::ff00:42:8329") + "2001:0db8:0000:0000:0000:ff00:0042:8329" + + >>> expand_ipv6_address("::") + "0000:0000:0000:0000:0000:0000:0000:0000" + + :param str address: IPv6 address to be expanded + + :raises: ValueError if the address can't be expanded due to being malformed + """ + + if not is_valid_ipv6_address(address): + raise ValueError("'%s' isn't a valid IPv6 address" % address) + + # expands collapsed groupings, there can only be a single '::' in a valid + # address + if "::" in address: + missing_groups = 7 - address.count(":") + address = address.replace("::", "::" + ":" * missing_groups) + + # inserts missing zeros + for i in xrange(8): + start = i * 5 + end = address.index(":", start) if i != 7 else len(address) + missing_zeros = 4 - (end - start) + + if missing_zeros > 0: + address = address[:start] + "0" * missing_zeros + address[start:] + + return address + +def get_mask(bits): + """ + Provides the IPv4 mask for a given number of bits, in the dotted-quad format. + + :param int bits: number of bits to be converted + + :returns: str with the subnet mask representation for this many bits + + :raises: ValueError if given a number of bits outside the range of 0-32 + """ + + if bits > 32 or bits < 0: + raise ValueError("A mask can only be 0-32 bits, got %i" % bits) + + # get the binary representation of the mask + mask_bin = get_binary(2 ** bits - 1, 32)[::-1] + + # breaks it into eight character groupings + octets = [mask_bin[8 * i : 8 * (i + 1)] for i in xrange(4)] + + # converts each octet into its integer value + return ".".join([str(int(octet, 2)) for octet in octets]) + +def get_masked_bits(mask): + """ + Provides the number of bits that an IPv4 subnet mask represents. Note that + not all masks can be represented by a bit count. + + :param str mask: mask to be converted + + :returns: int with the number of bits represented by the mask + + :raises: ValueError if the mask is invalid or can't be converted + """ + + if not is_valid_ip_address(mask): + raise ValueError("'%s' is an invalid subnet mask" % mask) + + # converts octets to binary representatino + mask_bin = get_address_binary(mask) + mask_match = re.match("^(1*)(0*)$", mask_bin) + + if mask_match: + return 32 - len(mask_match.groups()[1]) + else: + raise ValueError("Unable to convert mask to a bit count: %s" % mask) + +def get_mask_ipv6(bits): + """ + Provides the IPv6 mask for a given number of bits, in the hex colon-delimited + format. + + :param int bits: number of bits to be converted + + :returns: str with the subnet mask representation for this many bits + + :raises: ValueError if given a number of bits outside the range of 0-128 + """ + + if bits > 128 or bits < 0: + raise ValueError("A mask can only be 0-128 bits, got %i" % bits) + + # get the binary representation of the mask + mask_bin = get_binary(2 ** bits - 1, 128)[::-1] + + # breaks it into sixteen character groupings + groupings = [mask_bin[16 * i : 16 * (i + 1)] for i in xrange(8)] + + # converts each group into its hex value + return ":".join(["%04x" % int(group, 2) for group in groupings]).upper() + +def get_binary(value, bits): + """ + Provides the given value as a binary string, padded with zeros to the given + number of bits. + + :param int value: value to be converted + :param int bits: number of bits to pad to + """ + + # http://www.daniweb.com/code/snippet216539.html + return "".join([str((value >> y) & 1) for y in range(bits - 1, -1, -1)]) + +def get_address_binary(address): + """ + Provides the binary value for an IPv4 or IPv6 address. + + :returns: str with the binary prepresentation of this address + + :raises: ValueError if address is neither an IPv4 nor IPv6 address + """ + + if is_valid_ip_address(address): + return "".join([get_binary(int(octet), 8) for octet in address.split(".")]) + elif is_valid_ipv6_address(address): + address = expand_ipv6_address(address) + return "".join([get_binary(int(grouping, 16), 16) for grouping in address.split(":")]) + else: + raise ValueError("'%s' is neither an IPv4 or IPv6 address" % address) + def hmac_sha256(key, msg): """ Generates a sha256 digest using the given key and message. diff --git a/test/unit/util/connection.py b/test/unit/util/connection.py index b4c7223..4df0298 100644 --- a/test/unit/util/connection.py +++ b/test/unit/util/connection.py @@ -76,4 +76,81 @@ class TestConnection(unittest.TestCase):
self.assertTrue(stem.util.connection.is_valid_port(0, allow_zero = True)) self.assertTrue(stem.util.connection.is_valid_port("0", allow_zero = True)) + + def test_expand_ipv6_address(self): + """ + Checks the expand_ipv6_address function. + """ + + test_values = { + "2001:db8::ff00:42:8329": "2001:0db8:0000:0000:0000:ff00:0042:8329", + "::": "0000:0000:0000:0000:0000:0000:0000:0000", + "::1": "0000:0000:0000:0000:0000:0000:0000:0001", + "1::1": "0001:0000:0000:0000:0000:0000:0000:0001", + } + + for test_arg, expected in test_values.items(): + self.assertEquals(expected, stem.util.connection.expand_ipv6_address(test_arg)) + + self.assertRaises(ValueError, stem.util.connection.expand_ipv6_address, "127.0.0.1") + + def test_get_mask(self): + """ + Checks the get_mask function. + """ + + self.assertEquals("255.255.255.255", stem.util.connection.get_mask(32)) + self.assertEquals("255.255.255.248", stem.util.connection.get_mask(29)) + self.assertEquals("255.255.254.0", stem.util.connection.get_mask(23)) + self.assertEquals("0.0.0.0", stem.util.connection.get_mask(0)) + + self.assertRaises(ValueError, stem.util.connection.get_mask, -1) + self.assertRaises(ValueError, stem.util.connection.get_mask, 33) + + def test_get_masked_bits(self): + """ + Checks the get_masked_bits function. + """ + + self.assertEquals(32, stem.util.connection.get_masked_bits("255.255.255.255")) + self.assertEquals(29, stem.util.connection.get_masked_bits("255.255.255.248")) + self.assertEquals(23, stem.util.connection.get_masked_bits("255.255.254.0")) + self.assertEquals(0, stem.util.connection.get_masked_bits("0.0.0.0")) + + self.assertRaises(ValueError, stem.util.connection.get_masked_bits, "blarg") + self.assertRaises(ValueError, stem.util.connection.get_masked_bits, "255.255.0.255") + + def test_get_mask_ipv6(self): + """ + Checks the get_mask_ipv6 function. + """ + + self.assertEquals("FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", stem.util.connection.get_mask_ipv6(128)) + self.assertEquals("FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFE:0000", stem.util.connection.get_mask_ipv6(111)) + self.assertEquals("0000:0000:0000:0000:0000:0000:0000:0000", stem.util.connection.get_mask_ipv6(0)) + + self.assertRaises(ValueError, stem.util.connection.get_mask_ipv6, -1) + self.assertRaises(ValueError, stem.util.connection.get_mask, 129) + + def test_get_address_binary(self): + """ + Checks the get_address_binary function. + """ + + test_values = { + "0.0.0.0": "00000000000000000000000000000000", + "1.2.3.4": "00000001000000100000001100000100", + "127.0.0.1": "01111111000000000000000000000001", + "255.255.255.255": "11111111111111111111111111111111", + "::": "0" * 128, + "::1": ("0" * 127) + "1", + "1::1": "0000000000000001" + ("0" * 111) + "1", + "2001:db8::ff00:42:8329": "00100000000000010000110110111000000000000000000000000000000000000000000000000000111111110000000000000000010000101000001100101001", + } + + for test_arg, expected in test_values.items(): + self.assertEquals(expected, stem.util.connection.get_address_binary(test_arg)) + + self.assertRaises(ValueError, stem.util.connection.get_address_binary, "") + self.assertRaises(ValueError, stem.util.connection.get_address_binary, "blarg")
tor-commits@lists.torproject.org