[tor-commits] [stem/master] Rewriting authentication integ tests

atagar at torproject.org atagar at torproject.org
Sun Jan 29 08:54:42 UTC 2012


commit a73be509009942b4b2711b49a3b30e3c3f6e609b
Author: Damian Johnson <atagar at torproject.org>
Date:   Sat Jan 28 17:06:36 2012 -0800

    Rewriting authentication integ tests
    
    Excessive testing helper functions are bad. They hurt test readability and
    makes the code a pita to trace through. This cleans up the authentication
    integration tests to collapse the helper functions into just the few that make
    life better. Much more maintainable now. \o/
---
 stem/socket.py                          |    6 +
 test/integ/connection/authentication.py |  276 ++++++++++++-------------------
 2 files changed, 116 insertions(+), 166 deletions(-)

diff --git a/stem/socket.py b/stem/socket.py
index f3aeaf8..889713d 100644
--- a/stem/socket.py
+++ b/stem/socket.py
@@ -217,6 +217,12 @@ class ControlSocket:
     self._send_cond.release()
     self._recv_cond.release()
   
+  def __enter__(self):
+    return self
+  
+  def __exit__(self, type, value, traceback):
+    self.close()
+  
   def _make_socket(self):
     """
     Constructs and connects new socket. This is implemented by subclasses.
diff --git a/test/integ/connection/authentication.py b/test/integ/connection/authentication.py
index bc94ead..cc36c66 100644
--- a/test/integ/connection/authentication.py
+++ b/test/integ/connection/authentication.py
@@ -5,7 +5,6 @@ stem.connection.authenticate* functions.
 
 import os
 import unittest
-import functools
 
 import test.runner
 import stem.connection
@@ -23,6 +22,60 @@ MULTIPLE_AUTH_FAIL = "Authentication failed: Password did not match HashedContro
 INCORRECT_COOKIE_FAIL = "Authentication failed: Authentication cookie did not match expected value."
 INCORRECT_PASSWORD_FAIL = "Authentication failed: Password did not match HashedControlPassword value from configuration"
 
+def _can_authenticate(auth_type):
+  """
+  Checks if a given authentication method can authenticate to our control
+  socket.
+  
+  Arguments:
+    auth_type (stem.connection.AuthMethod) - authentication method to check
+  
+  Returns:
+    bool that's True if we should be able to authenticate and False otherwise
+  """
+  
+  tor_options = test.runner.get_runner().get_options()
+  password_auth = test.runner.Torrc.PASSWORD in tor_options
+  cookie_auth = test.runner.Torrc.COOKIE in tor_options
+  
+  if not password_auth and not cookie_auth: return True # open socket
+  elif auth_type == stem.connection.AuthMethod.PASSWORD: return password_auth
+  elif auth_type == stem.connection.AuthMethod.COOKIE: return cookie_auth
+  else: return False
+
+def _get_auth_failure_message(auth_type):
+  """
+  Provides the message that tor will respond with if our current method of
+  authentication fails. Note that this test will need to be updated if tor
+  changes its rejection reponse.
+  
+  Arguments:
+    auth_type (stem.connection.AuthMethod) - authentication method to check
+  
+  Returns:
+    string with the rejection message that tor would provide
+  """
+  
+  tor_options = test.runner.get_runner().get_options()
+  password_auth = test.runner.Torrc.PASSWORD in tor_options
+  cookie_auth = test.runner.Torrc.COOKIE in tor_options
+  
+  if cookie_auth and password_auth:
+    return MULTIPLE_AUTH_FAIL
+  elif cookie_auth:
+    if auth_type == stem.connection.AuthMethod.COOKIE:
+      return INCORRECT_COOKIE_FAIL
+    else:
+      return COOKIE_AUTH_FAIL
+  elif password_auth:
+    if auth_type == stem.connection.AuthMethod.PASSWORD:
+      return INCORRECT_PASSWORD_FAIL
+    else:
+      return PASSWORD_AUTH_FAIL
+  else:
+    # shouldn't happen, if so then the test has a bug
+    raise ValueError("No methods of authentication. If this is an open socket then auth shoulnd't fail.")
+
 class TestAuthenticate(unittest.TestCase):
   def setUp(self):
     # none of these tests apply if there's no control connection
@@ -34,10 +87,9 @@ class TestAuthenticate(unittest.TestCase):
     Tests that the authenticate function can authenticate to our socket.
     """
     
-    control_socket = test.runner.get_runner().get_tor_socket(False)
-    stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD)
-    test.runner.exercise_socket(self, control_socket)
-    control_socket.close()
+    with test.runner.get_runner().get_tor_socket(False) as control_socket:
+      stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD)
+      test.runner.exercise_socket(self, control_socket)
   
   def test_authenticate_general_example(self):
     """
@@ -87,34 +139,25 @@ class TestAuthenticate(unittest.TestCase):
     is_password_only = test.runner.Torrc.PASSWORD in tor_options and not test.runner.Torrc.COOKIE in tor_options
     
     # tests without a password
-    control_socket = runner.get_tor_socket(False)
-    auth_function = functools.partial(stem.connection.authenticate, control_socket)
-    
-    if is_password_only:
-      self.assertRaises(stem.connection.MissingPassword, auth_function)
-    else:
-      auth_function()
-      test.runner.exercise_socket(self, control_socket)
-    
-    control_socket.close()
+    with runner.get_tor_socket(False) as control_socket:
+      if is_password_only:
+        self.assertRaises(stem.connection.MissingPassword, stem.connection.authenticate, control_socket)
+      else:
+        stem.connection.authenticate(control_socket)
+        test.runner.exercise_socket(self, control_socket)
     
     # tests with the incorrect password
-    control_socket = runner.get_tor_socket(False)
-    auth_function = functools.partial(stem.connection.authenticate, control_socket, "blarg")
-    
-    if is_password_only:
-      self.assertRaises(stem.connection.IncorrectPassword, auth_function)
-    else:
-      auth_function()
-      test.runner.exercise_socket(self, control_socket)
-    
-    control_socket.close()
+    with runner.get_tor_socket(False) as control_socket:
+      if is_password_only:
+        self.assertRaises(stem.connection.IncorrectPassword, stem.connection.authenticate, control_socket, "blarg")
+      else:
+        stem.connection.authenticate(control_socket, "blarg")
+        test.runner.exercise_socket(self, control_socket)
     
     # tests with the right password
-    control_socket = runner.get_tor_socket(False)
-    stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD)
-    test.runner.exercise_socket(self, control_socket)
-    control_socket.close()
+    with runner.get_tor_socket(False) as control_socket:
+      stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD)
+      test.runner.exercise_socket(self, control_socket)
   
   def test_authenticate_none(self):
     """
@@ -122,11 +165,11 @@ class TestAuthenticate(unittest.TestCase):
     """
     
     auth_type = stem.connection.AuthMethod.NONE
-    if self._can_authenticate(auth_type):
+    
+    if _can_authenticate(auth_type):
       self._check_auth(auth_type)
     else:
       self.assertRaises(stem.connection.OpenAuthRejected, self._check_auth, auth_type)
-      self._assert_auth_rejected_msg(auth_type)
   
   def test_authenticate_password(self):
     """
@@ -136,26 +179,24 @@ class TestAuthenticate(unittest.TestCase):
     auth_type = stem.connection.AuthMethod.PASSWORD
     auth_value = test.runner.CONTROL_PASSWORD
     
-    if self._can_authenticate(auth_type):
+    if _can_authenticate(auth_type):
       self._check_auth(auth_type, auth_value)
     else:
       self.assertRaises(stem.connection.PasswordAuthRejected, self._check_auth, auth_type, auth_value)
-      self._assert_auth_rejected_msg(auth_type, auth_value)
     
     # Check with an empty, invalid, and quoted password. These should work if
     # we have no authentication, and fail otherwise.
     
     for auth_value in ("", "blarg", "this has a \" in it"):
-      if self._can_authenticate(stem.connection.AuthMethod.NONE):
+      if _can_authenticate(stem.connection.AuthMethod.NONE):
         self._check_auth(auth_type, auth_value)
       else:
-        if self._can_authenticate(stem.connection.AuthMethod.PASSWORD):
+        if _can_authenticate(stem.connection.AuthMethod.PASSWORD):
           exc_type = stem.connection.IncorrectPassword
         else:
           exc_type = stem.connection.PasswordAuthRejected
         
         self.assertRaises(exc_type, self._check_auth, auth_type, auth_value)
-        self._assert_auth_rejected_msg(auth_type, auth_value)
   
   def test_authenticate_cookie(self):
     """
@@ -173,11 +214,10 @@ class TestAuthenticate(unittest.TestCase):
       # missing file.
       
       self.assertRaises(stem.connection.UnreadableCookieFile, self._check_auth, auth_type, auth_value)
-    elif self._can_authenticate(auth_type):
+    elif _can_authenticate(auth_type):
       self._check_auth(auth_type, auth_value)
     else:
       self.assertRaises(stem.connection.CookieAuthRejected, self._check_auth, auth_type, auth_value)
-      self._assert_auth_rejected_msg(auth_type, auth_value)
   
   def test_authenticate_cookie_invalid(self):
     """
@@ -193,17 +233,16 @@ class TestAuthenticate(unittest.TestCase):
     fake_cookie.write("0" * 32)
     fake_cookie.close()
     
-    if self._can_authenticate(stem.connection.AuthMethod.NONE):
+    if _can_authenticate(stem.connection.AuthMethod.NONE):
       # authentication will work anyway
       self._check_auth(auth_type, auth_value)
     else:
-      if self._can_authenticate(auth_type):
+      if _can_authenticate(auth_type):
         exc_type = stem.connection.IncorrectCookieValue
       else:
         exc_type = stem.connection.CookieAuthRejected
       
       self.assertRaises(exc_type, self._check_auth, auth_type, auth_value)
-      self._assert_auth_rejected_msg(auth_type, auth_value)
     
     os.remove(auth_value)
   
@@ -215,7 +254,7 @@ class TestAuthenticate(unittest.TestCase):
     
     auth_type = stem.connection.AuthMethod.COOKIE
     auth_value = "/if/this/exists/then/they're/asking/for/a/failure"
-    self.assertRaises(stem.connection.UnreadableCookieFile, self._check_auth, auth_type, auth_value)
+    self.assertRaises(stem.connection.UnreadableCookieFile, self._check_auth, auth_type, auth_value, False)
   
   def test_authenticate_cookie_wrong_size(self):
     """
@@ -231,139 +270,44 @@ class TestAuthenticate(unittest.TestCase):
       # Weird coincidence? Fail so we can pick another file to check against.
       self.fail("Our torrc is 32 bytes, preventing the test_authenticate_cookie_wrong_size test from running.")
     else:
-      self.assertRaises(stem.connection.IncorrectCookieSize, self._check_auth, auth_type, auth_value)
-  
-  def _get_socket_auth(self):
-    """
-    Provides the types of authentication that our current test socket accepts.
-    
-    Returns:
-      bool tuple of the form (password_auth, cookie_auth)
-    """
-    
-    tor_options = test.runner.get_runner().get_options()
-    password_auth = test.runner.Torrc.PASSWORD in tor_options
-    cookie_auth = test.runner.Torrc.COOKIE in tor_options
-    
-    return password_auth, cookie_auth
-  
-  def _can_authenticate(self, auth_type):
-    """
-    Checks if the given authentication type should be able to authenticate to
-    our current socket.
-    
-    Arguments:
-      auth_type (stem.connection.AuthMethod) - authentication method to check
-    
-    Returns:
-      bool that's True if we should be able to authenticate and False otherwise
-    """
-    
-    password_auth, cookie_auth = self._get_socket_auth()
-    
-    # If the control socket is open then all authentication methods will be
-    # accepted. Otherwise check if our auth type matches what the socket
-    # accepts.
-    
-    if not password_auth and not cookie_auth: return True
-    elif auth_type == stem.connection.AuthMethod.PASSWORD: return password_auth
-    elif auth_type == stem.connection.AuthMethod.COOKIE: return cookie_auth
-    else: return False
-  
-  def _get_auth_function(self, control_socket, auth_type, *auth_args):
-    """
-    Constructs a functor that performs the given authentication without
-    additional arguments.
-    
-    Arguments:
-      control_socket (stem.socket.ControlSocket) - socket for the function to
-          authenticate to
-      auth_type (stem.connection.AuthMethod) - method by which we should
-          authentiate to the control socket
-      auth_args (str) - arguments to be passed to the authentication function
-    """
-    
-    if auth_type == stem.connection.AuthMethod.NONE:
-      auth_function = stem.connection.authenticate_none
-    elif auth_type == stem.connection.AuthMethod.PASSWORD:
-      auth_function = stem.connection.authenticate_password
-    elif auth_type == stem.connection.AuthMethod.COOKIE:
-      auth_function = stem.connection.authenticate_cookie
-    else:
-      raise ValueError("unexpected auth type: %s" % auth_type)
-    
-    if auth_args:
-      return functools.partial(auth_function, control_socket, *auth_args)
-    else:
-      return functools.partial(auth_function, control_socket)
-  
-  def _assert_auth_rejected_msg(self, auth_type, *auth_args):
-    """
-    This asserts that authentication will fail with the rejection message given
-    by tor. Note that this test will need to be updated if tor changes its
-    rejection reponse.
-    
-    Arguments:
-      auth_type (stem.connection.AuthMethod) - method by which we should
-          authentiate to the control socket
-      auth_args (str) - arguments to be passed to the authentication function
-    """
-    
-    control_socket = test.runner.get_runner().get_tor_socket(False)
-    auth_function = self._get_auth_function(control_socket, auth_type, *auth_args)
-    password_auth, cookie_auth = self._get_socket_auth()
-    
-    if cookie_auth and password_auth:
-      failure_msg = MULTIPLE_AUTH_FAIL
-    elif cookie_auth:
-      if auth_type == stem.connection.AuthMethod.COOKIE:
-        failure_msg = INCORRECT_COOKIE_FAIL
-      else:
-        failure_msg = COOKIE_AUTH_FAIL
-    elif password_auth:
-      if auth_type == stem.connection.AuthMethod.PASSWORD:
-        failure_msg = INCORRECT_PASSWORD_FAIL
-      else:
-        failure_msg = PASSWORD_AUTH_FAIL
-    else:
-      # shouldn't happen, if so then the test has a bug
-      raise ValueError("No methods of authentication. If this is an open socket then auth shoulnd't fail.")
-    
-    try:
-      auth_function()
-      control_socket.close()
-      self.fail()
-    except stem.connection.AuthenticationFailure, exc:
-      self.assertTrue(control_socket.is_alive())
-      self.assertEqual(failure_msg, str(exc))
-      control_socket.close()
+      self.assertRaises(stem.connection.IncorrectCookieSize, self._check_auth, auth_type, auth_value, False)
   
-  def _check_auth(self, auth_type, *auth_args):
+  def _check_auth(self, auth_type, auth_arg = None, check_message = True):
     """
-    Attempts to use the given authentication function against our connection.
-    If this works then checks that we can use the connection. If not then this
-    raises the exception.
+    Attempts to use the given type of authentication against tor's control
+    socket. If it succeeds then we check that the socket can then be used. If
+    not then we check that this gives a message that we'd expect then raises
+    the exception.
     
     Arguments:
       auth_type (stem.connection.AuthMethod) - method by which we should
           authentiate to the control socket
-      auth_args (str) - arguments to be passed to the authentication function
+      auth_arg (str) - argument to be passed to the authentication function
+      check_message (bool) - checks that failure messages are what we'd expect
     
     Raises:
       stem.connection.AuthenticationFailure if the authentication fails
     """
     
-    control_socket = test.runner.get_runner().get_tor_socket(False)
-    auth_function = self._get_auth_function(control_socket, auth_type, *auth_args)
-    
-    # run the authentication, re-raising if there's a problem
-    try:
-      auth_function()
-    except stem.connection.AuthenticationFailure, exc:
-      self.assertTrue(control_socket.is_alive())
-      control_socket.close()
-      raise exc
-    
-    test.runner.exercise_socket(self, control_socket)
-    control_socket.close()
+    with test.runner.get_runner().get_tor_socket(False) as control_socket:
+      # run the authentication, re-raising if there's a problem
+      try:
+        if auth_type == stem.connection.AuthMethod.NONE:
+          stem.connection.authenticate_none(control_socket)
+        elif auth_type == stem.connection.AuthMethod.PASSWORD:
+          stem.connection.authenticate_password(control_socket, auth_arg)
+        elif auth_type == stem.connection.AuthMethod.COOKIE:
+          stem.connection.authenticate_cookie(control_socket, auth_arg)
+        
+        test.runner.exercise_socket(self, control_socket)
+      except stem.connection.AuthenticationFailure, exc:
+        # authentication functions should re-attach on failure
+        self.assertTrue(control_socket.is_alive())
+        
+        # check that we got the failure message that we'd expect
+        if check_message:
+          failure_msg = _get_auth_failure_message(auth_type)
+          self.assertEqual(failure_msg, str(exc))
+        
+        raise exc
 





More information about the tor-commits mailing list