commit 0c069727eb5337cc7c3f422387e3f4d0ad5caa9c Author: Damian Johnson atagar@torproject.org Date: Sat Jul 14 18:32:50 2012 -0700
ExitPolicyRule class
Adding a class that conforms with the dir-spec's exitpattern entity. Plus lots 'o unit tests that told me over and over and over again that I was being stupid. Hopefully this version is at least kinda sorta close to being right... --- run_tests.py | 6 +- stem/exit_policy.py | 363 +++++++++++++++++++++++++++++++++---- stem/util/connection.py | 17 ++- test/unit/exit_policy.py | 110 ----------- test/unit/exit_policy/__init__.py | 6 + test/unit/exit_policy/policy.py | 107 +++++++++++ test/unit/exit_policy/rule.py | 327 +++++++++++++++++++++++++++++++++ 7 files changed, 780 insertions(+), 156 deletions(-)
diff --git a/run_tests.py b/run_tests.py index 23f993e..3b36046 100755 --- a/run_tests.py +++ b/run_tests.py @@ -32,7 +32,8 @@ import test.unit.util.enum import test.unit.util.proc import test.unit.util.system import test.unit.util.tor_tools -import test.unit.exit_policy +import test.unit.exit_policy.policy +import test.unit.exit_policy.rule import test.unit.version import test.integ.connection.authentication import test.integ.connection.connect @@ -109,7 +110,8 @@ UNIT_TESTS = ( test.unit.descriptor.reader.TestDescriptorReader, test.unit.descriptor.server_descriptor.TestServerDescriptor, test.unit.descriptor.extrainfo_descriptor.TestExtraInfoDescriptor, - test.unit.exit_policy.TestExitPolicy, + test.unit.exit_policy.rule.TestExitPolicyRule, + test.unit.exit_policy.policy.TestExitPolicy, test.unit.version.TestVersion, test.unit.response.control_message.TestControlMessage, test.unit.response.control_line.TestControlLine, diff --git a/stem/exit_policy.py b/stem/exit_policy.py index 33f5cc1..7040563 100644 --- a/stem/exit_policy.py +++ b/stem/exit_policy.py @@ -1,61 +1,342 @@ """ -Tor Exit Policy information and requirements for its features. These can be -easily parsed and compared, for instance... +Representation of tor exit policies. These can be easily used to check if +exiting to a destination is permissable or not. For instance...
->>> exit_policies = stem.exit_policy.ExitPolicy() ->>> exit_policies.add("accept *:80") ->>> exit_policies.add("accept *:443") ->>> exit_policies.add("reject *:*") ->>> print exit_policies -accept *:80 , accept *:443, reject *:* ->>> print exit_policies.get_summary() -accept 80, 443 ->>> exit_policies.check("www.google.com", 80) -True +::
->>> microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443") ->>> print microdesc_exit_policy -accept 80,443 ->>> microdesc_exit_policy.check("www.google.com", 80) -True ->>> microdesc_exit_policy.check(80) -True + >>> exit_policies = stem.exit_policy.ExitPolicy() + >>> exit_policies.add("accept *:80") + >>> exit_policies.add("accept *:443") + >>> exit_policies.add("reject *:*") + >>> print exit_policies + accept *:80 , accept *:443, reject *:* + >>> print exit_policies.get_summary() + accept 80, 443 + >>> exit_policies.check("www.google.com", 80) + True + + >>> microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443") + >>> print microdesc_exit_policy + accept 80,443 + >>> microdesc_exit_policy.check("www.google.com", 80) + True + >>> microdesc_exit_policy.check(80) + True + +:: + + ExitPolicyRule - Single rule of an exit policy + |- is_address_wildcard - checks if we'll accept any address for our type + |- is_port_wildcard - checks if we'll accept any port + |- is_match - checks if we match a given destination + +- __str__ - string representation for this rule
-ExitPolicyLine - Single rule from the exit policy - |- __str__ - string representation - +- check - check if exiting to this ip is allowed + ExitPolicyLine - Single rule from the exit policy + |- __str__ - string representation + +- check - check if exiting to this ip is allowed
-ExitPolicy - List of ExitPolicyLine objects - |- __str__ - string representation - |- __iter__ - ExitPolicyLine entries for the exit policy - |- check - check if exiting to this ip is allowed - |- add - add new rule to the exit policy - |- get_summary - provides a summary description of the policy chain - +- is_exiting_allowed - check if exit node + ExitPolicy - List of ExitPolicyLine objects + |- __str__ - string representation + |- __iter__ - ExitPolicyLine entries for the exit policy + |- check - check if exiting to this ip is allowed + |- add - add new rule to the exit policy + |- get_summary - provides a summary description of the policy chain + +- is_exiting_allowed - check if exit node
-MicrodescriptorExitPolicy - Microdescriptor exit policy - |- check - check if exiting to this port is allowed - |- ports - returns a list of ports - |- is_accept - check if it's a list of accepted/rejected ports - +- __str__ - return the summary + MicrodescriptorExitPolicy - Microdescriptor exit policy + |- check - check if exiting to this port is allowed + |- ports - returns a list of ports + |- is_accept - check if it's a list of accepted/rejected ports + +- __str__ - return the summary """
import stem.util.connection +import stem.util.enum
+AddressType = stem.util.enum.Enum(("WILDCARD", "Wildcard"), ("IPv4", "IPv4"), ("IPv6", "IPv6"))
# ip address ranges substituted by the 'private' keyword -PRIVATE_IP_RANGES = ("0.0.0.0/8", "169.254.0.0/16", "127.0.0.0/8", "192.168.0.0/16", "10.0.0.0/8", "172.16.0.0/12") +PRIVATE_IP_RANGES = ("0.0.0.0/8", "169.254.0.0/16", "127.0.0.0/8", + "192.168.0.0/16", "10.0.0.0/8", "172.16.0.0/12") + +class ExitPolicyRule: + """ + Single rule from the user's exit policy. These rules are chained together to + form complete policies that describe where a relay will and will not allow + traffic to exit. + + The format of these rules are formally described in the dir-spec as an + "exitpattern". Note that while these are similar to tor's man page entry for + ExitPolicies, it's not the exact same. An exitpattern is better defined and + scricter in what it'll accept. For instance, ports are not optional and it + does not contain the 'private' alias. + + :var str rule: rule that we were created from + :var bool is_accept: indicates if exiting is allowed or disallowed + + :var AddressType address_type: type of address that we have + :var str address: address that this rule is for + :var str mask: subnet mask for the address (ex. "255.255.255.0") + :var int masked_bits: number of bits the subnet mask represents, None if the mask can't have a bit representation + + :var int min_port: lower end of the port range that we include (inclusive) + :var int max_port: upper end of the port range that we include (inclusive) + + :param str rule: exit policy rule to be parsed + + :raises: ValueError if input isn't a valid tor exit policy rule + """ + + # TODO: Exitpatterns are used everywhere except the torrc. This is fine for + # now, but we should add a subclass to handle those slight differences later + # if we want to provide the ability to parse torrcs. + + def __init__(self, rule): + self.rule = rule + + # policy ::= "accept" exitpattern | "reject" exitpattern + # exitpattern ::= addrspec ":" portspec + + if rule.startswith("accept"): + self.is_accept = True + elif rule.startswith("reject"): + self.is_accept = False + else: + raise ValueError("An exit policy must start with either 'accept' or 'reject': %s" % rule) + + exitpattern = rule[6:] + + if not exitpattern.startswith(" ") or (len(exitpattern) - 1 != len(exitpattern.lstrip())) : + raise ValueError("An exit policy should have a space separating its accept/reject from the exit pattern: %s" % rule) + + exitpattern = exitpattern[1:] + + if not ":" in exitpattern: + raise ValueError("An exitpattern must be of the form 'addrspec:portspec': %s" % rule) + + addrspec, portspec = exitpattern.rsplit(":", 1) + self._addr_bin = self._mask_bin = None + + # Parses the addrspec... + # addrspec ::= "*" | ip4spec | ip6spec + + if "/" in addrspec: + self.address, addr_extra = addrspec.split("/", 1) + else: + self.address, addr_extra = addrspec, None + + if addrspec == "*": + self.address_type = AddressType.WILDCARD + self.address = self.mask = self.masked_bits = None + elif stem.util.connection.is_valid_ip_address(self.address): + # ipv4spec ::= ip4 | ip4 "/" num_ip4_bits | ip4 "/" ip4mask + # ip4 ::= an IPv4 address in dotted-quad format + # ip4mask ::= an IPv4 mask in dotted-quad format + # num_ip4_bits ::= an integer between 0 and 32 + + self.address_type = AddressType.IPv4 + + if addr_extra is None: + self.mask = stem.util.connection.FULL_IPv4_MASK + self.masked_bits = 32 + elif stem.util.connection.is_valid_ip_address(addr_extra): + # provided with an ip4mask + self.mask = addr_extra + + try: + self.masked_bits = stem.util.connection.get_masked_bits(addr_extra) + except ValueError: + # mask can't be represented as a number of bits (ex. "255.255.0.255") + self.masked_bits = None + elif addr_extra.isdigit(): + # provided with a num_ip4_bits + self.mask = stem.util.connection.get_mask(int(addr_extra)) + self.masked_bits = int(addr_extra) + else: + raise ValueError("The '%s' isn't a mask nor number of bits: %s" % (addr_extra, rule)) + elif self.address.startswith("[") and self.address.endswith("]") and \ + stem.util.connection.is_valid_ipv6_address(self.address[1:-1]): + # ip6spec ::= ip6 | ip6 "/" num_ip6_bits + # ip6 ::= an IPv6 address, surrounded by square brackets. + # num_ip6_bits ::= an integer between 0 and 128 + + self.address = stem.util.connection.expand_ipv6_address(self.address[1:-1].upper()) + self.address_type = AddressType.IPv6 + + if addr_extra is None: + self.mask = stem.util.connection.FULL_IPv6_MASK + self.masked_bits = 128 + elif addr_extra.isdigit(): + # provided with a num_ip6_bits + self.mask = stem.util.connection.get_mask_ipv6(int(addr_extra)) + self.masked_bits = int(addr_extra) + else: + raise ValueError("The '%s' isn't a number of bits: %s" % (addr_extra, rule)) + else: + raise ValueError("Address isn't a wildcard, IPv4, or IPv6 address: %s" % rule) + + # Parses the portspec... + # portspec ::= "*" | port | port "-" port + # port ::= an integer between 1 and 65535, inclusive. + # + # Due to a tor bug the spec says that we should accept port of zero, but + # connections to port zero are never permitted. + + if portspec == "*": + self.min_port, self.max_port = 1, 65535 + elif portspec.isdigit(): + # provided with a single port + if stem.util.connection.is_valid_port(portspec, allow_zero = True): + self.min_port = self.max_port = int(portspec) + else: + raise ValueError("'%s' isn't within a valid port range: %s" % (portspec, rule)) + elif "-" in portspec: + # provided with a port range + port_comp = portspec.split("-", 1) + + if stem.util.connection.is_valid_port(port_comp, allow_zero = True): + self.min_port = int(port_comp[0]) + self.max_port = int(port_comp[1]) + + if self.min_port > self.max_port: + raise ValueError("Port range has a lower bound that's greater than its upper bound: %s" % rule) + else: + raise ValueError("Malformed port range: %s" % rule) + else: + raise ValueError("Port value isn't a wildcard, integer, or range: %s" % rule) + + # Pre-calculating the integer representation of our mask and masked + # address. These are used by our is_match() method to compare ourselves to + # other addresses. + + if self.address_type == AddressType.WILDCARD: + # is_match() will short circuit so these are unused + self._mask_bin = self._addr_bin = None + else: + self._mask_bin = int(stem.util.connection.get_address_binary(self.mask), 2) + self._addr_bin = int(stem.util.connection.get_address_binary(self.address), 2) & self._mask_bin + + self._str_representation = None + + def is_address_wildcard(self): + """ + True if we'll match against any address for our type, False otherwise. + + :returns: bool for if our address matching is a wildcard + """ + + return self.address_type == AddressType.WILDCARD or self.masked_bits == 0 + + def is_port_wildcard(self): + """ + True if we'll match against any port, False otherwise. + + :returns: bool for if our port matching is a wildcard + """ + + return self.min_port in (0, 1) and self.max_port == 65535 + + def is_match(self, address = None, port = None): + """ + True if we match against the given destination, False otherwise. If the + address or port is omitted then that'll only match against a wildcard. + + :param str address: IPv4 or IPv6 address (with or without brackets) + :param int port: port number + + :returns: bool indicating if we match against this destination + + :raises: ValueError if provided with a malformed address or port + """ + + # validate our input and check if the argumement doens't match our address type + if address != None: + if stem.util.connection.is_valid_ip_address(address): + if self.address_type == AddressType.IPv6: return False + elif stem.util.connection.is_valid_ipv6_address(address, allow_brackets = True): + if self.address_type == AddressType.IPv4: return False + + address = address.lstrip("[").rstrip("]") + else: + raise ValueError("'%s' isn't a valid ipv4 or ipv6 address" % address) + + if port != None and not stem.util.connection.is_valid_port(port): + raise ValueError("'%s' isn't a valid port" % port) + + if address is None: + if self.address_type != AddressType.WILDCARD: + return False + elif not self.is_address_wildcard(): + # Already got the integer representation of our mask and our address + # with the mask applied. Just need to check if this address with the + # mask applied matches. + + comparison_addr_bin = int(stem.util.connection.get_address_binary(address), 2) + comparison_addr_bin &= self._mask_bin + if self._addr_bin != comparison_addr_bin: return False + + if not self.is_port_wildcard(): + if port is None: + return False + elif port < self.min_port or port > self.max_port: + return False + + return True + + def __str__(self): + """ + Provides the string representation of our policy. This does not + necessarily match the rule that we were constructed from (due to things + like IPv6 address collapsing or the multiple representations that our mask + can have). However, it is a valid that would be accepted by our constructor + to re-create this rule. + """ + + if self._str_representation is None: + label = "accept " if self.is_accept else "reject " + + if self.address_type == AddressType.WILDCARD: + label += "*:" + else: + if self.address_type == AddressType.IPv4: + label += self.address + else: + label += "[%s]" % self.address + + # Including our mask label as follows... + # - exclde our mask if it doesn't do anything + # - use our masked bit count if we can + # - use the mask itself otherwise + + if self.mask in (stem.util.connection.FULL_IPv4_MASK, stem.util.connection.FULL_IPv6_MASK): + label += ":" + elif not self.masked_bits is None: + label += "/%i:" % self.masked_bits + else: + label += "/%s:" % self.mask + + if self.is_port_wildcard(): + label += "*" + elif self.min_port == self.max_port: + label += str(self.min_port) + else: + label += "%i-%i" % (self.min_port, self.max_port) + + self._str_representation = label + + return self._str_representation
class ExitPolicyLine: """ Single rule from the user's exit policy. These are chained together to form complete policies. """ - + def __init__(self, rule_entry): """ Exit Policy line constructor. """ + # sanitize the input a bit, cleaning up tabs and stripping quotes rule_entry = rule_entry.replace("\t", " ").replace(""", "")
@@ -83,8 +364,10 @@ class ExitPolicyLine:
# constructs the binary address just in case of comparison with a mask if self.ip_address != "*": - if not (stem.util.connection.is_valid_ip_address(self.ip_address) and stem.util.connection.is_valid_ipv6_address(self.ip_address)): - raise ExitPolicyError + if not stem.util.connection.is_valid_ip_address(self.ip_address) and \ + not stem.util.connection.is_valid_ipv6_address(self.ip_address): + raise ExitPolicyError() + self.ip_address_bin = "" for octet in self.ip_address.split("."): # Converts the int to a binary string, padded with zeros. Source: @@ -92,7 +375,7 @@ class ExitPolicyLine: self.ip_address_bin += "".join([str((int(octet) >> y) & 1) for y in range(7, -1, -1)]) else: self.ip_address_bin = "0" * 32 - + # sets the port component self.min_port, self.max_port = 0, 0 self.is_port_wildcard = entry_port == "*" @@ -109,7 +392,7 @@ class ExitPolicyLine: raise ExitPolicyError self.min_port = int(entry_port) self.max_port = int(entry_port) - + def __str__(self): # This provides the actual policy rather than the entry used to construct # it so the 'private' keyword is expanded. diff --git a/stem/util/connection.py b/stem/util/connection.py index a70389e..e1e8890 100644 --- a/stem/util/connection.py +++ b/stem/util/connection.py @@ -7,7 +7,7 @@ but for now just moving the parts we need. ::
is_valid_ip_address - checks if a string is a valid IPv4 address - is_valid_ip_ipv6_address - checks if a string is a valid IPv6 address + is_valid_ipv6_address - checks if a string is a valid IPv6 address is_valid_port - checks if something is a valid representation for a port expand_ipv6_address - provides an IPv6 address with its collapsed portions expanded get_mask - provides the mask representation for a given number of bits @@ -24,6 +24,9 @@ import hashlib
CRYPTOVARIABLE_EQUALITY_COMPARISON_NONCE = os.urandom(32)
+FULL_IPv4_MASK = "255.255.255.255" +FULL_IPv6_MASK = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF" + def is_valid_ip_address(address): """ Checks if a string is a valid IPv4 address. @@ -45,15 +48,20 @@ def is_valid_ip_address(address):
return True
-def is_valid_ipv6_address(address): +def is_valid_ipv6_address(address, allow_brackets = False): """ Checks if a string is a valid IPv6 address.
:param str address: string to be checked + :param bool allow_brackets: ignore brackets which form '[address]'
:returns: True if input is a valid IPv6 address, False otherwise """
+ if allow_brackets: + if address.startswith("[") and address.endswith("]"): + address = address[1:-1] + # addresses are made up of eight colon separated groups of four hex digits # with leading zeros being optional # https://en.wikipedia.org/wiki/IPv6#Address_format @@ -85,9 +93,10 @@ def is_valid_port(entry, allow_zero = False):
if isinstance(entry, list): for port in entry: - if not is_valid_port(port): + if not is_valid_port(port, allow_zero): return False - + + return True elif isinstance(entry, str): if not entry.isdigit(): return False diff --git a/test/unit/exit_policy.py b/test/unit/exit_policy.py deleted file mode 100644 index b12cd9a..0000000 --- a/test/unit/exit_policy.py +++ /dev/null @@ -1,110 +0,0 @@ -""" -Unit tests for the stem.exit_policy.ExitPolicy parsing and class. -""" - -import unittest -import stem.exit_policy -import stem.util.system - -import test.mocking as mocking - -class TestExitPolicy(unittest.TestCase): - def tearDown(self): - pass - - def test_parsing(self): - """ - Tests parsing by the ExitPolicy class constructor. - """ - - exit_policies = stem.exit_policy.ExitPolicy() - exit_policies.add("accept *:80") - exit_policies.add("accept *:443") - exit_policies.add("reject *:*") - self.assertEqual(str(exit_policies), "accept *:80, accept *:443, reject *:*") - - exit_policies = stem.exit_policy.ExitPolicy() - - # check ip address - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 256.255.255.255:80") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -10.255.255.255:80") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.-10.255.255:80") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.255.-10.255:80") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -255.255.255.-10:80") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept a.b.c.d:80") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.255.255:80") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -255.255:80") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255:80") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -:80") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept :80") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept ...:80") - - # check input string - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "foo 255.255.255.255:80") - - # check ports - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:0001") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:0") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:-1") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:+1") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:+1-1") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:a") - self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:70000") - - def test_check(self): - """ - Tests if exiting to this ip is allowed. - """ - - exit_policies = stem.exit_policy.ExitPolicy() - exit_policies = stem.exit_policy.ExitPolicy() - exit_policies.add("accept *:80") - exit_policies.add("accept *:443") - exit_policies.add("reject *:*") - - self.assertTrue(exit_policies.check("www.google.com", 80)) - self.assertTrue(exit_policies.check("www.atagar.com", 443)) - - self.assertFalse(exit_policies.check("www.atagar.com", 22)) - self.assertFalse(exit_policies.check("www.atagar.com", 8118)) - - def test_is_exiting_allowed(self): - """ - Tests if this is an exit node - """ - - exit_policies = stem.exit_policy.ExitPolicy() - exit_policies = stem.exit_policy.ExitPolicy() - exit_policies.add("accept *:80") - exit_policies.add("accept *:443") - exit_policies.add("reject *:*") - - self.assertTrue(exit_policies.is_exiting_allowed()) - - exit_policies = stem.exit_policy.ExitPolicy() - exit_policies = stem.exit_policy.ExitPolicy() - exit_policies.add("reject *:*") - - self.assertFalse(exit_policies.is_exiting_allowed()) - - def test_microdesc_exit_parsing(self): - microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443") - - self.assertEqual(str(microdesc_exit_policy),"accept 80,443") - - self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,-443") - self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,+443") - self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,66666") - self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "reject 80,foo") - self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "bar 80,foo") - self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "foo") - self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "bar 80-foo") - - def test_micodesc_exit_check(self): - microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443") - - self.assertTrue(microdesc_exit_policy.check(80)) - self.assertTrue(microdesc_exit_policy.check("www.atagar.com", 443)) - - self.assertFalse(microdesc_exit_policy.check(22)) - self.assertFalse(microdesc_exit_policy.check("www.atagar.com", 8118)) diff --git a/test/unit/exit_policy/__init__.py b/test/unit/exit_policy/__init__.py new file mode 100644 index 0000000..99a4651 --- /dev/null +++ b/test/unit/exit_policy/__init__.py @@ -0,0 +1,6 @@ +""" +Unit tests for stem.exit_policy.py contents. +""" + +__all__ = ["policy", "rule"] + diff --git a/test/unit/exit_policy/policy.py b/test/unit/exit_policy/policy.py new file mode 100644 index 0000000..10088e2 --- /dev/null +++ b/test/unit/exit_policy/policy.py @@ -0,0 +1,107 @@ +""" +Unit tests for the stem.exit_policy.ExitPolicy parsing and class. +""" + +import unittest +import stem.exit_policy +import stem.util.system + +import test.mocking as mocking + +class TestExitPolicy(unittest.TestCase): + def test_parsing(self): + """ + Tests parsing by the ExitPolicy class constructor. + """ + + exit_policies = stem.exit_policy.ExitPolicy() + exit_policies.add("accept *:80") + exit_policies.add("accept *:443") + exit_policies.add("reject *:*") + self.assertEqual(str(exit_policies), "accept *:80, accept *:443, reject *:*") + + exit_policies = stem.exit_policy.ExitPolicy() + + # check ip address + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 256.255.255.255:80") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -10.255.255.255:80") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.-10.255.255:80") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.255.-10.255:80") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -255.255.255.-10:80") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept a.b.c.d:80") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.255.255:80") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -255.255:80") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255:80") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -:80") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept :80") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept ...:80") + + # check input string + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "foo 255.255.255.255:80") + + # check ports + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:0001") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:0") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:-1") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:+1") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:+1-1") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:a") + self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:70000") + + def test_check(self): + """ + Tests if exiting to this ip is allowed. + """ + + exit_policies = stem.exit_policy.ExitPolicy() + exit_policies = stem.exit_policy.ExitPolicy() + exit_policies.add("accept *:80") + exit_policies.add("accept *:443") + exit_policies.add("reject *:*") + + self.assertTrue(exit_policies.check("www.google.com", 80)) + self.assertTrue(exit_policies.check("www.atagar.com", 443)) + + self.assertFalse(exit_policies.check("www.atagar.com", 22)) + self.assertFalse(exit_policies.check("www.atagar.com", 8118)) + + def test_is_exiting_allowed(self): + """ + Tests if this is an exit node + """ + + exit_policies = stem.exit_policy.ExitPolicy() + exit_policies = stem.exit_policy.ExitPolicy() + exit_policies.add("accept *:80") + exit_policies.add("accept *:443") + exit_policies.add("reject *:*") + + self.assertTrue(exit_policies.is_exiting_allowed()) + + exit_policies = stem.exit_policy.ExitPolicy() + exit_policies = stem.exit_policy.ExitPolicy() + exit_policies.add("reject *:*") + + self.assertFalse(exit_policies.is_exiting_allowed()) + + def test_microdesc_exit_parsing(self): + microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443") + + self.assertEqual(str(microdesc_exit_policy),"accept 80,443") + + self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,-443") + self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,+443") + self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,66666") + self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "reject 80,foo") + self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "bar 80,foo") + self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "foo") + self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "bar 80-foo") + + def test_micodesc_exit_check(self): + microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443") + + self.assertTrue(microdesc_exit_policy.check(80)) + self.assertTrue(microdesc_exit_policy.check("www.atagar.com", 443)) + + self.assertFalse(microdesc_exit_policy.check(22)) + self.assertFalse(microdesc_exit_policy.check("www.atagar.com", 8118)) diff --git a/test/unit/exit_policy/rule.py b/test/unit/exit_policy/rule.py new file mode 100644 index 0000000..13a414d --- /dev/null +++ b/test/unit/exit_policy/rule.py @@ -0,0 +1,327 @@ +""" +Unit tests for the stem.exit_policy.ExitPolicyRule class. +""" + +import unittest + +from stem.exit_policy import AddressType, ExitPolicyRule + +class TestExitPolicyRule(unittest.TestCase): + def test_accept_or_reject(self): + self.assertTrue(ExitPolicyRule("accept *:*").is_accept) + self.assertFalse(ExitPolicyRule("reject *:*").is_accept) + + invalid_inputs = ( + "accept", + "reject", + "accept *:*", + "accept\t*:*", + "accept\n*:*", + "acceptt *:*", + "rejectt *:*", + "blarg *:*", + " *:*", + "*:*", + "", + ) + + for rule_arg in invalid_inputs: + self.assertRaises(ValueError, ExitPolicyRule, rule_arg) + + def test_str_unchanged(self): + # provides a series of test inputs where the str() representation should + # match the input rule + + test_inputs = ( + "accept *:*", + "reject *:*", + "accept *:80", + "accept *:80-443", + "accept 127.0.0.1:80", + "accept 87.0.0.1/24:80", + "accept 156.5.38.3/255.255.0.255:80", + "accept [FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF]:80", + "accept [FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF]/32:80", + ) + + for rule_arg in test_inputs: + rule = ExitPolicyRule(rule_arg) + self.assertEquals(rule_arg, rule.rule) + self.assertEquals(rule_arg, str(rule)) + + def test_str_changed(self): + # some instances where our rule is valid but won't match our str() representation + test_inputs = { + "accept 10.0.0.1/32:80": "accept 10.0.0.1:80", + "accept 192.168.0.1/255.255.255.0:80": "accept 192.168.0.1/24:80", + "accept [::]/32:*": "accept [0000:0000:0000:0000:0000:0000:0000:0000]/32:*", + "accept [::]/128:*": "accept [0000:0000:0000:0000:0000:0000:0000:0000]:*", + } + + for rule_arg, expected_str in test_inputs.items(): + rule = ExitPolicyRule(rule_arg) + self.assertEquals(rule_arg, rule.rule) + self.assertEquals(expected_str, str(rule)) + + def test_valid_wildcard(self): + test_inputs = { + "reject *:*": (True, True), + "reject *:80": (True, False), + "accept 192.168.0.1:*": (False, True), + "accept 192.168.0.1:80": (False, False), + + "reject 127.0.0.1/0:*": (True, True), + "reject 127.0.0.1/16:*": (False, True), + "reject 127.0.0.1/32:*": (False, True), + "reject [0000:0000:0000:0000:0000:0000:0000:0000]/0:80": (True, False), + "reject [0000:0000:0000:0000:0000:0000:0000:0000]/64:80": (False, False), + "reject [0000:0000:0000:0000:0000:0000:0000:0000]/128:80": (False, False), + + "accept 192.168.0.1:0-65535": (False, True), + "accept 192.168.0.1:1-65535": (False, True), + "accept 192.168.0.1:2-65535": (False, False), + "accept 192.168.0.1:1-65534": (False, False), + } + + for rule_arg, attr in test_inputs.items(): + is_address_wildcard, is_port_wildcard = attr + + rule = ExitPolicyRule(rule_arg) + self.assertEquals(is_address_wildcard, rule.is_address_wildcard()) + self.assertEquals(is_port_wildcard, rule.is_port_wildcard()) + + def test_invalid_wildcard(self): + test_inputs = ( + "reject */16:*", + "reject 127.0.0.1/*:*", + "reject *:0-*", + "reject *:*-15", + ) + + for rule_arg in test_inputs: + self.assertRaises(ValueError, ExitPolicyRule, rule_arg) + + def test_wildcard_attributes(self): + rule = ExitPolicyRule("reject *:*") + self.assertEquals(AddressType.WILDCARD, rule.address_type) + self.assertEquals(None, rule.address) + self.assertEquals(None, rule.mask) + self.assertEquals(None, rule.masked_bits) + self.assertEquals(1, rule.min_port) + self.assertEquals(65535, rule.max_port) + + def test_valid_ipv4_addresses(self): + test_inputs = { + "0.0.0.0": ("0.0.0.0", "255.255.255.255", 32), + "127.0.0.1/32": ("127.0.0.1", "255.255.255.255", 32), + "192.168.0.50/24": ("192.168.0.50", "255.255.255.0", 24), + "255.255.255.255/0": ("255.255.255.255", "0.0.0.0", 0), + } + + for rule_addr, attr in test_inputs.items(): + address, mask, masked_bits = attr + + rule = ExitPolicyRule("accept %s:*" % rule_addr) + self.assertEquals(AddressType.IPv4, rule.address_type) + self.assertEquals(address, rule.address) + self.assertEquals(mask, rule.mask) + self.assertEquals(masked_bits, rule.masked_bits) + + def test_invalid_ipv4_addresses(self): + test_inputs = { + "256.0.0.0", + "-1.0.0.0", + "0.0.0", + "0.0.0.", + "0.0.0.a", + "127.0.0.1/-1", + "127.0.0.1/33", + } + + for rule_addr in test_inputs: + self.assertRaises(ValueError, ExitPolicyRule, "accept %s:*" % rule_addr) + + def test_valid_ipv6_addresses(self): + test_inputs = { + "[fe80:0000:0000:0000:0202:b3ff:fe1e:8329]": + ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", + "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", 128), + "[FE80::0202:b3ff:fe1e:8329]": + ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", + "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", 128), + "[0000:0000:0000:0000:0000:0000:0000:0000]/0": + ("0000:0000:0000:0000:0000:0000:0000:0000", + "0000:0000:0000:0000:0000:0000:0000:0000", 0), + "[::]": + ("0000:0000:0000:0000:0000:0000:0000:0000", + "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", 128), + } + + for rule_addr, attr in test_inputs.items(): + address, mask, masked_bits = attr + + rule = ExitPolicyRule("accept %s:*" % rule_addr) + self.assertEquals(AddressType.IPv6, rule.address_type) + self.assertEquals(address, rule.address) + self.assertEquals(mask, rule.mask) + self.assertEquals(masked_bits, rule.masked_bits) + + def test_invalid_ipv6_addresses(self): + test_inputs = ( + "fe80::0202:b3ff:fe1e:8329", + "[fe80::0202:b3ff:fe1e:8329", + "fe80::0202:b3ff:fe1e:8329]", + "[fe80::0202:b3ff:fe1e:832g]", + "[fe80:::b3ff:fe1e:8329]", + "[fe80::b3ff::fe1e:8329]", + "[fe80::0202:b3ff:fe1e:8329]/-1", + "[fe80::0202:b3ff:fe1e:8329]/129", + ) + + for rule_addr in test_inputs: + self.assertRaises(ValueError, ExitPolicyRule, "accept %s:*" % rule_addr) + + def test_valid_ports(self): + test_inputs = { + "0": (0, 0), + "1": (1, 1), + "80": (80, 80), + "80-443": (80, 443), + } + + for rule_port, attr in test_inputs.items(): + min_port, max_port = attr + + rule = ExitPolicyRule("accept 127.0.0.1:%s" % rule_port) + self.assertEquals(min_port, rule.min_port) + self.assertEquals(max_port, rule.max_port) + + def test_invalid_ports(self): + test_inputs = ( + "65536", + "a", + "5-3", + "5-", + "-3", + ) + + for rule_port in test_inputs: + self.assertRaises(ValueError, ExitPolicyRule, "accept 127.0.0.1:%s" % rule_port) + + def test_is_match_wildcard(self): + test_inputs = { + "reject *:*": { + ("192.168.0.1", 80): True, + ("0.0.0.0", 80): True, + ("255.255.255.255", 80): True, + ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", 80): True, + ("[FE80:0000:0000:0000:0202:B3FF:FE1E:8329]", 80): True, + ("192.168.0.1", None): True, + (None, 80): True, + (None, None): True, + }, + "reject 255.255.255.255/0:*": { + ("192.168.0.1", 80): True, + ("0.0.0.0", 80): True, + ("255.255.255.255", 80): True, + ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", 80): False, + ("[FE80:0000:0000:0000:0202:B3FF:FE1E:8329]", 80): False, + ("192.168.0.1", None): True, + (None, 80): False, + (None, None): False, + }, + } + + for rule_arg, matches in test_inputs.items(): + rule = ExitPolicyRule(rule_arg) + + for match_args, expected_result in matches.items(): + self.assertEquals(expected_result, rule.is_match(*match_args)) + + # port zero is special in that exit policies can include it, but it's not + # something that we can match against + + rule = ExitPolicyRule("reject *:*") + self.assertRaises(ValueError, rule.is_match, "127.0.0.1", 0) + + def test_is_match_ipv4(self): + test_inputs = { + "reject 192.168.0.50:*": { + ("192.168.0.50", 80): True, + ("192.168.0.51", 80): False, + ("192.168.0.49", 80): False, + (None, 80): False, + ("192.168.0.50", None): True, + }, + "reject 0.0.0.0/24:*": { + ("0.0.0.0", 80): True, + ("0.0.0.1", 80): True, + ("0.0.0.255", 80): True, + ("0.0.1.0", 80): False, + ("0.1.0.0", 80): False, + ("1.0.0.0", 80): False, + (None, 80): False, + ("0.0.0.0", None): True, + }, + } + + for rule_arg, matches in test_inputs.items(): + rule = ExitPolicyRule(rule_arg) + + for match_args, expected_result in matches.items(): + self.assertEquals(expected_result, rule.is_match(*match_args)) + + def test_is_match_ipv6(self): + test_inputs = { + "reject [FE80:0000:0000:0000:0202:B3FF:FE1E:8329]:*": { + ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", 80): True, + ("fe80:0000:0000:0000:0202:b3ff:fe1e:8329", 80): True, + ("[FE80:0000:0000:0000:0202:B3FF:FE1E:8329]", 80): True, + ("FE80:0000:0000:0000:0202:B3FF:FE1E:8330", 80): False, + ("FE80:0000:0000:0000:0202:B3FF:FE1E:8328", 80): False, + (None, 80): False, + ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", None): True, + }, + "reject [FE80:0000:0000:0000:0202:B3FF:FE1E:8329]/112:*": { + ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", 80): True, + ("FE80:0000:0000:0000:0202:B3FF:FE1E:0000", 80): True, + ("FE80:0000:0000:0000:0202:B3FF:FE1E:FFFF", 80): True, + ("FE80:0000:0000:0000:0202:B3FF:FE1F:8329", 80): False, + ("FE81:0000:0000:0000:0202:B3FF:FE1E:8329", 80): False, + (None, 80): False, + ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", None): True, + }, + } + + for rule_arg, matches in test_inputs.items(): + rule = ExitPolicyRule(rule_arg) + + for match_args, expected_result in matches.items(): + self.assertEquals(expected_result, rule.is_match(*match_args)) + + def test_is_match_port(self): + test_inputs = { + "reject *:80": { + ("192.168.0.50", 80): True, + ("192.168.0.50", 81): False, + ("192.168.0.50", 79): False, + (None, 80): True, + ("192.168.0.50", None): False, + }, + "reject *:80-85": { + ("192.168.0.50", 79): False, + ("192.168.0.50", 80): True, + ("192.168.0.50", 83): True, + ("192.168.0.50", 85): True, + ("192.168.0.50", 86): False, + (None, 83): True, + ("192.168.0.50", None): False, + }, + } + + for rule_arg, matches in test_inputs.items(): + rule = ExitPolicyRule(rule_arg) + + for match_args, expected_result in matches.items(): + self.assertEquals(expected_result, rule.is_match(*match_args)) +