commit fb6962ed28463a56d100c9cac7cc7517879dec99 Author: Damian Johnson atagar@torproject.org Date: Sun May 4 19:14:35 2014 -0700
Helper function for resolution of inputs to fingerprints
Splitting a big chunk of do_info() into a helper. Much better for both readability and testability. --- stem/interpretor/commands.py | 199 ++++++++++++++++++++----------------- stem/interpretor/help.py | 2 +- test/unit/interpretor/commands.py | 64 +++++++++++- 3 files changed, 170 insertions(+), 95 deletions(-)
diff --git a/stem/interpretor/commands.py b/stem/interpretor/commands.py index 2a9d402..faf9509 100644 --- a/stem/interpretor/commands.py +++ b/stem/interpretor/commands.py @@ -13,6 +13,70 @@ from stem.interpretor import STANDARD_OUTPUT, BOLD_OUTPUT, ERROR_OUTPUT, msg from stem.util.term import format
+def _get_fingerprint(arg, controller): + """ + Resolves user input into a relay fingerprint. This accepts... + + * Fingerprints + * Nicknames + * IPv4 addresses, either with or without an ORPort + * Empty input, which is resolved to ourselves if we're a relay + + :param str arg: input to be resolved to a relay fingerprint + :param stem.control.Controller controller: tor control connection + + :returns: **str** for the relay fingerprint + + :raises: **ValueError** if we're unable to resolve the input to a relay + """ + + if not arg: + try: + return controller.get_info('fingerprint') + except: + raise ValueError("We aren't a relay, no information to provide") + elif stem.util.tor_tools.is_valid_fingerprint(arg): + return arg + elif stem.util.tor_tools.is_valid_nickname(arg): + try: + return controller.get_network_status(arg).fingerprint + except: + raise ValueError("Unable to find a relay with the nickname of '%s'" % arg) + elif ':' in arg or stem.util.connection.is_valid_ipv4_address(arg): + if ':' in arg: + address, port = arg.split(':', 1) + + if not stem.util.connection.is_valid_ipv4_address(address): + raise ValueError("'%s' isn't a valid IPv4 address" % address) + elif port and not stem.util.connection.is_valid_port(port): + raise ValueError("'%s' isn't a valid port" % port) + + port = int(port) + else: + address, port = arg, None + + matches = {} + + for desc in controller.get_network_statuses(): + if desc.address == address: + if not port or desc.or_port == port: + matches[desc.or_port] = desc.fingerprint + + if len(matches) == 0: + raise ValueError('No relays found at %s' % arg) + elif len(matches) == 1: + return matches.values()[0] + else: + response = "There's multiple relays at %s, include a port to specify which.\n\n" % arg + + for i, or_port in enumerate(matches): + response += " %i. %s:%s, fingerprint: %s\n" % (i + 1, address, or_port, matches[or_port]) + + raise ValueError(response) + else: + raise ValueError("'%s' isn't a fingerprint, nickname, or IP address" % arg) + + class ControlInterpretor(object): """ Handles issuing requests and providing nicely formed responses, with support @@ -60,117 +124,66 @@ class ControlInterpretor(object): pretty fashion. """
- output, fingerprint = '', None + try: + fingerprint = _get_fingerprint(arg, self._controller) + except ValueError as exc: + return format(str(exc), *ERROR_OUTPUT)
- # determines the fingerprint, leaving it unset and adding an error message - # if unsuccessful + micro_desc = self._controller.get_microdescriptor(fingerprint, None) + server_desc = self._controller.get_server_descriptor(fingerprint, None) + ns_desc = self._controller.get_network_status(fingerprint, None)
- if not arg: - # uses our fingerprint if we're a relay, otherwise gives an error + # We'll mostly rely on the router status entry. Either the server + # descriptor or microdescriptor will be missing, so we'll treat them as + # being optional.
- fingerprint = self._controller.get_info('fingerprint', None) + if not ns_desc: + return format("Unable to find consensus information for %s" % fingerprint, *ERROR_OUTPUT)
- if not fingerprint: - output += format("We aren't a relay, no information to provide", *ERROR_OUTPUT) - elif stem.util.tor_tools.is_valid_fingerprint(arg): - fingerprint = arg - elif stem.util.tor_tools.is_valid_nickname(arg): - desc = self._controller.get_network_status(arg, None) + locale = self._controller.get_info('ip-to-country/%s' % ns_desc.address, None) + locale_label = ' (%s)' % locale if locale else ''
- if desc: - fingerprint = desc.fingerprint - else: - return format("Unable to find a relay with the nickname of '%s'" % arg, *ERROR_OUTPUT) - elif ':' in arg or stem.util.connection.is_valid_ipv4_address(arg): - # we got an address, so looking up the fingerprint - - if ':' in arg: - address, port = arg.split(':', 1) - - if not stem.util.connection.is_valid_ipv4_address(address): - return format("'%s' isn't a valid IPv4 address" % address, *ERROR_OUTPUT) - elif port and not stem.util.connection.is_valid_port(port): - return format("'%s' isn't a valid port" % port, *ERROR_OUTPUT) - - port = int(port) - else: - address, port = arg, None - - matches = {} - - for desc in self._controller.get_network_statuses(): - if desc.address == address: - if not port or desc.or_port == port: - matches[desc.or_port] = desc.fingerprint - - if len(matches) == 0: - output += format('No relays found at %s' % arg, *ERROR_OUTPUT) - elif len(matches) == 1: - fingerprint = matches.values()[0] - else: - output += format("There's multiple relays at %s, include a port to specify which.\n\n" % arg, *ERROR_OUTPUT) - - for i, or_port in enumerate(matches): - output += format(" %i. %s:%s, fingerprint: %s\n" % (i + 1, address, or_port, matches[or_port]), *ERROR_OUTPUT) + if server_desc: + exit_policy_label = server_desc.exit_policy.summary() + elif micro_desc: + exit_policy_label = micro_desc.exit_policy.summary() else: - return format("'%s' isn't a fingerprint, nickname, or IP address" % arg, *ERROR_OUTPUT) - - if fingerprint: - micro_desc = self._controller.get_microdescriptor(fingerprint, None) - server_desc = self._controller.get_server_descriptor(fingerprint, None) - ns_desc = self._controller.get_network_status(fingerprint, None) - - # We'll mostly rely on the router status entry. Either the server - # descriptor or microdescriptor will be missing, so we'll treat them as - # being optional. - - if not ns_desc: - return format("Unable to find consensus information for %s" % fingerprint, *ERROR_OUTPUT) - - locale = self._controller.get_info('ip-to-country/%s' % ns_desc.address, None) - locale_label = ' (%s)' % locale if locale else '' - - if server_desc: - exit_policy_label = server_desc.exit_policy.summary() - elif micro_desc: - exit_policy_label = micro_desc.exit_policy.summary() - else: - exit_policy_label = 'Unknown' + exit_policy_label = 'Unknown'
- output += '%s (%s)\n' % (ns_desc.nickname, fingerprint) + output = '%s (%s)\n' % (ns_desc.nickname, fingerprint)
- output += format('address: ', *BOLD_OUTPUT) - output += '%s:%s%s\n' % (ns_desc.address, ns_desc.or_port, locale_label) + output += format('address: ', *BOLD_OUTPUT) + output += '%s:%s%s\n' % (ns_desc.address, ns_desc.or_port, locale_label)
- output += format('published: ', *BOLD_OUTPUT) - output += ns_desc.published.strftime('%H:%M:%S %d/%m/%Y') + '\n' + output += format('published: ', *BOLD_OUTPUT) + output += ns_desc.published.strftime('%H:%M:%S %d/%m/%Y') + '\n'
- if server_desc: - output += format('os: ', *BOLD_OUTPUT) - output += server_desc.platform.decode('utf-8', 'replace') + '\n' + if server_desc: + output += format('os: ', *BOLD_OUTPUT) + output += server_desc.platform.decode('utf-8', 'replace') + '\n'
- output += format('version: ', *BOLD_OUTPUT) - output += str(server_desc.tor_version) + '\n' + output += format('version: ', *BOLD_OUTPUT) + output += str(server_desc.tor_version) + '\n'
- output += format('flags: ', *BOLD_OUTPUT) - output += ', '.join(ns_desc.flags) + '\n' + output += format('flags: ', *BOLD_OUTPUT) + output += ', '.join(ns_desc.flags) + '\n'
- output += format('exit policy: ', *BOLD_OUTPUT) - output += exit_policy_label + '\n' + output += format('exit policy: ', *BOLD_OUTPUT) + output += exit_policy_label + '\n'
- if server_desc: - contact = server_desc.contact + if server_desc: + contact = server_desc.contact
- # clears up some highly common obscuring + # clears up some highly common obscuring
- for alias in (' at ', ' AT '): - contact = contact.replace(alias, '@') + for alias in (' at ', ' AT '): + contact = contact.replace(alias, '@')
- for alias in (' dot ', ' DOT '): - contact = contact.replace(alias, '.') + for alias in (' dot ', ' DOT '): + contact = contact.replace(alias, '.')
- output += format('contact: ', *BOLD_OUTPUT) - output += contact + '\n' + output += format('contact: ', *BOLD_OUTPUT) + output += contact + '\n'
return output.strip()
diff --git a/stem/interpretor/help.py b/stem/interpretor/help.py index 20439d6..039bbef 100644 --- a/stem/interpretor/help.py +++ b/stem/interpretor/help.py @@ -23,7 +23,7 @@ def response(controller, arg): """ Provides our /help response.
- :param stem.Controller controller: tor control connection + :param stem.control.Controller controller: tor control connection :param str arg: controller or interpretor command to provide help output for
:returns: **str** with our help response diff --git a/test/unit/interpretor/commands.py b/test/unit/interpretor/commands.py index db0a53b..1517e3a 100644 --- a/test/unit/interpretor/commands.py +++ b/test/unit/interpretor/commands.py @@ -1,20 +1,82 @@ import unittest
+import stem import stem.response
-from stem.interpretor.commands import ControlInterpretor +from stem.interpretor.commands import ControlInterpretor, _get_fingerprint
from test import mocking from test.unit.interpretor import CONTROLLER
+try: + # added in python 3.3 + from unittest.mock import Mock +except ImportError: + from mock import Mock + EXPECTED_EVENTS_RESPONSE = """\ \x1b[34mBW 15 25\x1b[0m \x1b[34mBW 758 570\x1b[0m \x1b[34mDEBUG connection_edge_process_relay_cell(): Got an extended cell! Yay.\x1b[0m """
+FINGERPRINT = '9695DFC35FFEB861329B9F1AB04C46397020CE31' +
class TestInterpretorCommands(unittest.TestCase): + def test_get_fingerprint_for_ourselves(self): + controller = Mock() + + controller.get_info.side_effect = lambda arg: { + 'fingerprint': FINGERPRINT, + }[arg] + + self.assertEqual(FINGERPRINT, _get_fingerprint('', controller)) + + controller.get_info.side_effect = stem.ControllerError + self.assertRaises(ValueError, _get_fingerprint, '', controller) + + def test_get_fingerprint_for_fingerprint(self): + self.assertEqual(FINGERPRINT, _get_fingerprint(FINGERPRINT, Mock())) + + def test_get_fingerprint_for_nickname(self): + controller, descriptor = Mock(), Mock() + descriptor.fingerprint = FINGERPRINT + + controller.get_network_status.side_effect = lambda arg: { + 'moria1': descriptor, + }[arg] + + self.assertEqual(FINGERPRINT, _get_fingerprint('moria1', controller)) + + controller.get_network_status.side_effect = stem.ControllerError + self.assertRaises(ValueError, _get_fingerprint, 'moria1', controller) + + def test_get_fingerprint_for_address(self): + controller = Mock() + + self.assertRaises(ValueError, _get_fingerprint, '127.0.0.1:-1', controller) + self.assertRaises(ValueError, _get_fingerprint, '127.0.0.901:80', controller) + + descriptor = Mock() + descriptor.address = '127.0.0.1' + descriptor.or_port = 80 + descriptor.fingerprint = FINGERPRINT + + controller.get_network_statuses.return_value = [descriptor] + + self.assertEqual(FINGERPRINT, _get_fingerprint('127.0.0.1', controller)) + self.assertEqual(FINGERPRINT, _get_fingerprint('127.0.0.1:80', controller)) + self.assertRaises(ValueError, _get_fingerprint, '127.0.0.1:81', controller) + self.assertRaises(ValueError, _get_fingerprint, '127.0.0.2', controller) + + def test_get_fingerprint_for_unrecognized_inputs(self): + self.assertRaises(ValueError, _get_fingerprint, 'blarg!', Mock()) + + def test_quit(self): + interpretor = ControlInterpretor(CONTROLLER) + self.assertRaises(stem.SocketClosed, interpretor.run_command, '/quit') + def test_help(self): interpretor = ControlInterpretor(CONTROLLER)
tor-commits@lists.torproject.org