[tor-commits] [stem/master] Helper function for resolution of inputs to fingerprints

atagar at torproject.org atagar at torproject.org
Tue May 6 01:21:13 UTC 2014


commit fb6962ed28463a56d100c9cac7cc7517879dec99
Author: Damian Johnson <atagar at torproject.org>
Date:   Sun May 4 19:14:35 2014 -0700

    Helper function for resolution of inputs to fingerprints
    
    Splitting a big chunk of do_info() into a helper. Much better for both
    readability and testability.
---
 stem/interpretor/commands.py      |  199 ++++++++++++++++++++-----------------
 stem/interpretor/help.py          |    2 +-
 test/unit/interpretor/commands.py |   64 +++++++++++-
 3 files changed, 170 insertions(+), 95 deletions(-)

diff --git a/stem/interpretor/commands.py b/stem/interpretor/commands.py
index 2a9d402..faf9509 100644
--- a/stem/interpretor/commands.py
+++ b/stem/interpretor/commands.py
@@ -13,6 +13,70 @@ from stem.interpretor import STANDARD_OUTPUT, BOLD_OUTPUT, ERROR_OUTPUT, msg
 from stem.util.term import format
 
 
+def _get_fingerprint(arg, controller):
+  """
+  Resolves user input into a relay fingerprint. This accepts...
+
+    * Fingerprints
+    * Nicknames
+    * IPv4 addresses, either with or without an ORPort
+    * Empty input, which is resolved to ourselves if we're a relay
+
+  :param str arg: input to be resolved to a relay fingerprint
+  :param stem.control.Controller controller: tor control connection
+
+  :returns: **str** for the relay fingerprint
+
+  :raises: **ValueError** if we're unable to resolve the input to a relay
+  """
+
+  if not arg:
+    try:
+      return controller.get_info('fingerprint')
+    except:
+      raise ValueError("We aren't a relay, no information to provide")
+  elif stem.util.tor_tools.is_valid_fingerprint(arg):
+    return arg
+  elif stem.util.tor_tools.is_valid_nickname(arg):
+    try:
+      return controller.get_network_status(arg).fingerprint
+    except:
+      raise ValueError("Unable to find a relay with the nickname of '%s'" % arg)
+  elif ':' in arg or stem.util.connection.is_valid_ipv4_address(arg):
+    if ':' in arg:
+      address, port = arg.split(':', 1)
+
+      if not stem.util.connection.is_valid_ipv4_address(address):
+        raise ValueError("'%s' isn't a valid IPv4 address" % address)
+      elif port and not stem.util.connection.is_valid_port(port):
+        raise ValueError("'%s' isn't a valid port" % port)
+
+      port = int(port)
+    else:
+      address, port = arg, None
+
+    matches = {}
+
+    for desc in controller.get_network_statuses():
+      if desc.address == address:
+        if not port or desc.or_port == port:
+          matches[desc.or_port] = desc.fingerprint
+
+    if len(matches) == 0:
+      raise ValueError('No relays found at %s' % arg)
+    elif len(matches) == 1:
+      return matches.values()[0]
+    else:
+      response = "There's multiple relays at %s, include a port to specify which.\n\n" % arg
+
+      for i, or_port in enumerate(matches):
+        response += "  %i. %s:%s, fingerprint: %s\n" % (i + 1, address, or_port, matches[or_port])
+
+      raise ValueError(response)
+  else:
+    raise ValueError("'%s' isn't a fingerprint, nickname, or IP address" % arg)
+
+
 class ControlInterpretor(object):
   """
   Handles issuing requests and providing nicely formed responses, with support
@@ -60,117 +124,66 @@ class ControlInterpretor(object):
     pretty fashion.
     """
 
-    output, fingerprint = '', None
+    try:
+      fingerprint = _get_fingerprint(arg, self._controller)
+    except ValueError as exc:
+      return format(str(exc), *ERROR_OUTPUT)
 
-    # determines the fingerprint, leaving it unset and adding an error message
-    # if unsuccessful
+    micro_desc = self._controller.get_microdescriptor(fingerprint, None)
+    server_desc = self._controller.get_server_descriptor(fingerprint, None)
+    ns_desc = self._controller.get_network_status(fingerprint, None)
 
-    if not arg:
-      # uses our fingerprint if we're a relay, otherwise gives an error
+    # We'll mostly rely on the router status entry. Either the server
+    # descriptor or microdescriptor will be missing, so we'll treat them as
+    # being optional.
 
-      fingerprint = self._controller.get_info('fingerprint', None)
+    if not ns_desc:
+      return format("Unable to find consensus information for %s" % fingerprint, *ERROR_OUTPUT)
 
-      if not fingerprint:
-        output += format("We aren't a relay, no information to provide", *ERROR_OUTPUT)
-    elif stem.util.tor_tools.is_valid_fingerprint(arg):
-      fingerprint = arg
-    elif stem.util.tor_tools.is_valid_nickname(arg):
-      desc = self._controller.get_network_status(arg, None)
+    locale = self._controller.get_info('ip-to-country/%s' % ns_desc.address, None)
+    locale_label = ' (%s)' % locale if locale else ''
 
-      if desc:
-        fingerprint = desc.fingerprint
-      else:
-        return format("Unable to find a relay with the nickname of '%s'" % arg, *ERROR_OUTPUT)
-    elif ':' in arg or stem.util.connection.is_valid_ipv4_address(arg):
-      # we got an address, so looking up the fingerprint
-
-      if ':' in arg:
-        address, port = arg.split(':', 1)
-
-        if not stem.util.connection.is_valid_ipv4_address(address):
-          return format("'%s' isn't a valid IPv4 address" % address, *ERROR_OUTPUT)
-        elif port and not stem.util.connection.is_valid_port(port):
-          return format("'%s' isn't a valid port" % port, *ERROR_OUTPUT)
-
-        port = int(port)
-      else:
-        address, port = arg, None
-
-      matches = {}
-
-      for desc in self._controller.get_network_statuses():
-        if desc.address == address:
-          if not port or desc.or_port == port:
-            matches[desc.or_port] = desc.fingerprint
-
-      if len(matches) == 0:
-        output += format('No relays found at %s' % arg, *ERROR_OUTPUT)
-      elif len(matches) == 1:
-        fingerprint = matches.values()[0]
-      else:
-        output += format("There's multiple relays at %s, include a port to specify which.\n\n" % arg, *ERROR_OUTPUT)
-
-        for i, or_port in enumerate(matches):
-          output += format("  %i. %s:%s, fingerprint: %s\n" % (i + 1, address, or_port, matches[or_port]), *ERROR_OUTPUT)
+    if server_desc:
+      exit_policy_label = server_desc.exit_policy.summary()
+    elif micro_desc:
+      exit_policy_label = micro_desc.exit_policy.summary()
     else:
-      return format("'%s' isn't a fingerprint, nickname, or IP address" % arg, *ERROR_OUTPUT)
-
-    if fingerprint:
-      micro_desc = self._controller.get_microdescriptor(fingerprint, None)
-      server_desc = self._controller.get_server_descriptor(fingerprint, None)
-      ns_desc = self._controller.get_network_status(fingerprint, None)
-
-      # We'll mostly rely on the router status entry. Either the server
-      # descriptor or microdescriptor will be missing, so we'll treat them as
-      # being optional.
-
-      if not ns_desc:
-        return format("Unable to find consensus information for %s" % fingerprint, *ERROR_OUTPUT)
-
-      locale = self._controller.get_info('ip-to-country/%s' % ns_desc.address, None)
-      locale_label = ' (%s)' % locale if locale else ''
-
-      if server_desc:
-        exit_policy_label = server_desc.exit_policy.summary()
-      elif micro_desc:
-        exit_policy_label = micro_desc.exit_policy.summary()
-      else:
-        exit_policy_label = 'Unknown'
+      exit_policy_label = 'Unknown'
 
-      output += '%s (%s)\n' % (ns_desc.nickname, fingerprint)
+    output = '%s (%s)\n' % (ns_desc.nickname, fingerprint)
 
-      output += format('address: ', *BOLD_OUTPUT)
-      output += '%s:%s%s\n' % (ns_desc.address, ns_desc.or_port, locale_label)
+    output += format('address: ', *BOLD_OUTPUT)
+    output += '%s:%s%s\n' % (ns_desc.address, ns_desc.or_port, locale_label)
 
-      output += format('published: ', *BOLD_OUTPUT)
-      output += ns_desc.published.strftime('%H:%M:%S %d/%m/%Y') + '\n'
+    output += format('published: ', *BOLD_OUTPUT)
+    output += ns_desc.published.strftime('%H:%M:%S %d/%m/%Y') + '\n'
 
-      if server_desc:
-        output += format('os: ', *BOLD_OUTPUT)
-        output += server_desc.platform.decode('utf-8', 'replace') + '\n'
+    if server_desc:
+      output += format('os: ', *BOLD_OUTPUT)
+      output += server_desc.platform.decode('utf-8', 'replace') + '\n'
 
-        output += format('version: ', *BOLD_OUTPUT)
-        output += str(server_desc.tor_version) + '\n'
+      output += format('version: ', *BOLD_OUTPUT)
+      output += str(server_desc.tor_version) + '\n'
 
-      output += format('flags: ', *BOLD_OUTPUT)
-      output += ', '.join(ns_desc.flags) + '\n'
+    output += format('flags: ', *BOLD_OUTPUT)
+    output += ', '.join(ns_desc.flags) + '\n'
 
-      output += format('exit policy: ', *BOLD_OUTPUT)
-      output += exit_policy_label + '\n'
+    output += format('exit policy: ', *BOLD_OUTPUT)
+    output += exit_policy_label + '\n'
 
-      if server_desc:
-        contact = server_desc.contact
+    if server_desc:
+      contact = server_desc.contact
 
-        # clears up some highly common obscuring
+      # clears up some highly common obscuring
 
-        for alias in (' at ', ' AT '):
-          contact = contact.replace(alias, '@')
+      for alias in (' at ', ' AT '):
+        contact = contact.replace(alias, '@')
 
-        for alias in (' dot ', ' DOT '):
-          contact = contact.replace(alias, '.')
+      for alias in (' dot ', ' DOT '):
+        contact = contact.replace(alias, '.')
 
-        output += format('contact: ', *BOLD_OUTPUT)
-        output += contact + '\n'
+      output += format('contact: ', *BOLD_OUTPUT)
+      output += contact + '\n'
 
     return output.strip()
 
diff --git a/stem/interpretor/help.py b/stem/interpretor/help.py
index 20439d6..039bbef 100644
--- a/stem/interpretor/help.py
+++ b/stem/interpretor/help.py
@@ -23,7 +23,7 @@ def response(controller, arg):
   """
   Provides our /help response.
 
-  :param stem.Controller controller: tor control connection
+  :param stem.control.Controller controller: tor control connection
   :param str arg: controller or interpretor command to provide help output for
 
   :returns: **str** with our help response
diff --git a/test/unit/interpretor/commands.py b/test/unit/interpretor/commands.py
index db0a53b..1517e3a 100644
--- a/test/unit/interpretor/commands.py
+++ b/test/unit/interpretor/commands.py
@@ -1,20 +1,82 @@
 import unittest
 
+import stem
 import stem.response
 
-from stem.interpretor.commands import ControlInterpretor
+from stem.interpretor.commands import ControlInterpretor, _get_fingerprint
 
 from test import mocking
 from test.unit.interpretor import CONTROLLER
 
+try:
+  # added in python 3.3
+  from unittest.mock import Mock
+except ImportError:
+  from mock import Mock
+
 EXPECTED_EVENTS_RESPONSE = """\
 \x1b[34mBW 15 25\x1b[0m
 \x1b[34mBW 758 570\x1b[0m
 \x1b[34mDEBUG connection_edge_process_relay_cell(): Got an extended cell! Yay.\x1b[0m
 """
 
+FINGERPRINT = '9695DFC35FFEB861329B9F1AB04C46397020CE31'
+
 
 class TestInterpretorCommands(unittest.TestCase):
+  def test_get_fingerprint_for_ourselves(self):
+    controller = Mock()
+
+    controller.get_info.side_effect = lambda arg: {
+      'fingerprint': FINGERPRINT,
+    }[arg]
+
+    self.assertEqual(FINGERPRINT, _get_fingerprint('', controller))
+
+    controller.get_info.side_effect = stem.ControllerError
+    self.assertRaises(ValueError, _get_fingerprint, '', controller)
+
+  def test_get_fingerprint_for_fingerprint(self):
+    self.assertEqual(FINGERPRINT, _get_fingerprint(FINGERPRINT, Mock()))
+
+  def test_get_fingerprint_for_nickname(self):
+    controller, descriptor = Mock(), Mock()
+    descriptor.fingerprint = FINGERPRINT
+
+    controller.get_network_status.side_effect = lambda arg: {
+      'moria1': descriptor,
+    }[arg]
+
+    self.assertEqual(FINGERPRINT, _get_fingerprint('moria1', controller))
+
+    controller.get_network_status.side_effect = stem.ControllerError
+    self.assertRaises(ValueError, _get_fingerprint, 'moria1', controller)
+
+  def test_get_fingerprint_for_address(self):
+    controller = Mock()
+
+    self.assertRaises(ValueError, _get_fingerprint, '127.0.0.1:-1', controller)
+    self.assertRaises(ValueError, _get_fingerprint, '127.0.0.901:80', controller)
+
+    descriptor = Mock()
+    descriptor.address = '127.0.0.1'
+    descriptor.or_port = 80
+    descriptor.fingerprint = FINGERPRINT
+
+    controller.get_network_statuses.return_value = [descriptor]
+
+    self.assertEqual(FINGERPRINT, _get_fingerprint('127.0.0.1', controller))
+    self.assertEqual(FINGERPRINT, _get_fingerprint('127.0.0.1:80', controller))
+    self.assertRaises(ValueError, _get_fingerprint, '127.0.0.1:81', controller)
+    self.assertRaises(ValueError, _get_fingerprint, '127.0.0.2', controller)
+
+  def test_get_fingerprint_for_unrecognized_inputs(self):
+    self.assertRaises(ValueError, _get_fingerprint, 'blarg!', Mock())
+
+  def test_quit(self):
+    interpretor = ControlInterpretor(CONTROLLER)
+    self.assertRaises(stem.SocketClosed, interpretor.run_command, '/quit')
+
   def test_help(self):
     interpretor = ControlInterpretor(CONTROLLER)
 





More information about the tor-commits mailing list