commit a73be509009942b4b2711b49a3b30e3c3f6e609b Author: Damian Johnson atagar@torproject.org Date: Sat Jan 28 17:06:36 2012 -0800
Rewriting authentication integ tests
Excessive testing helper functions are bad. They hurt test readability and makes the code a pita to trace through. This cleans up the authentication integration tests to collapse the helper functions into just the few that make life better. Much more maintainable now. \o/ --- stem/socket.py | 6 + test/integ/connection/authentication.py | 276 ++++++++++++------------------- 2 files changed, 116 insertions(+), 166 deletions(-)
diff --git a/stem/socket.py b/stem/socket.py index f3aeaf8..889713d 100644 --- a/stem/socket.py +++ b/stem/socket.py @@ -217,6 +217,12 @@ class ControlSocket: self._send_cond.release() self._recv_cond.release()
+ def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + def _make_socket(self): """ Constructs and connects new socket. This is implemented by subclasses. diff --git a/test/integ/connection/authentication.py b/test/integ/connection/authentication.py index bc94ead..cc36c66 100644 --- a/test/integ/connection/authentication.py +++ b/test/integ/connection/authentication.py @@ -5,7 +5,6 @@ stem.connection.authenticate* functions.
import os import unittest -import functools
import test.runner import stem.connection @@ -23,6 +22,60 @@ MULTIPLE_AUTH_FAIL = "Authentication failed: Password did not match HashedContro INCORRECT_COOKIE_FAIL = "Authentication failed: Authentication cookie did not match expected value." INCORRECT_PASSWORD_FAIL = "Authentication failed: Password did not match HashedControlPassword value from configuration"
+def _can_authenticate(auth_type): + """ + Checks if a given authentication method can authenticate to our control + socket. + + Arguments: + auth_type (stem.connection.AuthMethod) - authentication method to check + + Returns: + bool that's True if we should be able to authenticate and False otherwise + """ + + tor_options = test.runner.get_runner().get_options() + password_auth = test.runner.Torrc.PASSWORD in tor_options + cookie_auth = test.runner.Torrc.COOKIE in tor_options + + if not password_auth and not cookie_auth: return True # open socket + elif auth_type == stem.connection.AuthMethod.PASSWORD: return password_auth + elif auth_type == stem.connection.AuthMethod.COOKIE: return cookie_auth + else: return False + +def _get_auth_failure_message(auth_type): + """ + Provides the message that tor will respond with if our current method of + authentication fails. Note that this test will need to be updated if tor + changes its rejection reponse. + + Arguments: + auth_type (stem.connection.AuthMethod) - authentication method to check + + Returns: + string with the rejection message that tor would provide + """ + + tor_options = test.runner.get_runner().get_options() + password_auth = test.runner.Torrc.PASSWORD in tor_options + cookie_auth = test.runner.Torrc.COOKIE in tor_options + + if cookie_auth and password_auth: + return MULTIPLE_AUTH_FAIL + elif cookie_auth: + if auth_type == stem.connection.AuthMethod.COOKIE: + return INCORRECT_COOKIE_FAIL + else: + return COOKIE_AUTH_FAIL + elif password_auth: + if auth_type == stem.connection.AuthMethod.PASSWORD: + return INCORRECT_PASSWORD_FAIL + else: + return PASSWORD_AUTH_FAIL + else: + # shouldn't happen, if so then the test has a bug + raise ValueError("No methods of authentication. If this is an open socket then auth shoulnd't fail.") + class TestAuthenticate(unittest.TestCase): def setUp(self): # none of these tests apply if there's no control connection @@ -34,10 +87,9 @@ class TestAuthenticate(unittest.TestCase): Tests that the authenticate function can authenticate to our socket. """
- control_socket = test.runner.get_runner().get_tor_socket(False) - stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD) - test.runner.exercise_socket(self, control_socket) - control_socket.close() + with test.runner.get_runner().get_tor_socket(False) as control_socket: + stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD) + test.runner.exercise_socket(self, control_socket)
def test_authenticate_general_example(self): """ @@ -87,34 +139,25 @@ class TestAuthenticate(unittest.TestCase): is_password_only = test.runner.Torrc.PASSWORD in tor_options and not test.runner.Torrc.COOKIE in tor_options
# tests without a password - control_socket = runner.get_tor_socket(False) - auth_function = functools.partial(stem.connection.authenticate, control_socket) - - if is_password_only: - self.assertRaises(stem.connection.MissingPassword, auth_function) - else: - auth_function() - test.runner.exercise_socket(self, control_socket) - - control_socket.close() + with runner.get_tor_socket(False) as control_socket: + if is_password_only: + self.assertRaises(stem.connection.MissingPassword, stem.connection.authenticate, control_socket) + else: + stem.connection.authenticate(control_socket) + test.runner.exercise_socket(self, control_socket)
# tests with the incorrect password - control_socket = runner.get_tor_socket(False) - auth_function = functools.partial(stem.connection.authenticate, control_socket, "blarg") - - if is_password_only: - self.assertRaises(stem.connection.IncorrectPassword, auth_function) - else: - auth_function() - test.runner.exercise_socket(self, control_socket) - - control_socket.close() + with runner.get_tor_socket(False) as control_socket: + if is_password_only: + self.assertRaises(stem.connection.IncorrectPassword, stem.connection.authenticate, control_socket, "blarg") + else: + stem.connection.authenticate(control_socket, "blarg") + test.runner.exercise_socket(self, control_socket)
# tests with the right password - control_socket = runner.get_tor_socket(False) - stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD) - test.runner.exercise_socket(self, control_socket) - control_socket.close() + with runner.get_tor_socket(False) as control_socket: + stem.connection.authenticate(control_socket, test.runner.CONTROL_PASSWORD) + test.runner.exercise_socket(self, control_socket)
def test_authenticate_none(self): """ @@ -122,11 +165,11 @@ class TestAuthenticate(unittest.TestCase): """
auth_type = stem.connection.AuthMethod.NONE - if self._can_authenticate(auth_type): + + if _can_authenticate(auth_type): self._check_auth(auth_type) else: self.assertRaises(stem.connection.OpenAuthRejected, self._check_auth, auth_type) - self._assert_auth_rejected_msg(auth_type)
def test_authenticate_password(self): """ @@ -136,26 +179,24 @@ class TestAuthenticate(unittest.TestCase): auth_type = stem.connection.AuthMethod.PASSWORD auth_value = test.runner.CONTROL_PASSWORD
- if self._can_authenticate(auth_type): + if _can_authenticate(auth_type): self._check_auth(auth_type, auth_value) else: self.assertRaises(stem.connection.PasswordAuthRejected, self._check_auth, auth_type, auth_value) - self._assert_auth_rejected_msg(auth_type, auth_value)
# Check with an empty, invalid, and quoted password. These should work if # we have no authentication, and fail otherwise.
for auth_value in ("", "blarg", "this has a " in it"): - if self._can_authenticate(stem.connection.AuthMethod.NONE): + if _can_authenticate(stem.connection.AuthMethod.NONE): self._check_auth(auth_type, auth_value) else: - if self._can_authenticate(stem.connection.AuthMethod.PASSWORD): + if _can_authenticate(stem.connection.AuthMethod.PASSWORD): exc_type = stem.connection.IncorrectPassword else: exc_type = stem.connection.PasswordAuthRejected
self.assertRaises(exc_type, self._check_auth, auth_type, auth_value) - self._assert_auth_rejected_msg(auth_type, auth_value)
def test_authenticate_cookie(self): """ @@ -173,11 +214,10 @@ class TestAuthenticate(unittest.TestCase): # missing file.
self.assertRaises(stem.connection.UnreadableCookieFile, self._check_auth, auth_type, auth_value) - elif self._can_authenticate(auth_type): + elif _can_authenticate(auth_type): self._check_auth(auth_type, auth_value) else: self.assertRaises(stem.connection.CookieAuthRejected, self._check_auth, auth_type, auth_value) - self._assert_auth_rejected_msg(auth_type, auth_value)
def test_authenticate_cookie_invalid(self): """ @@ -193,17 +233,16 @@ class TestAuthenticate(unittest.TestCase): fake_cookie.write("0" * 32) fake_cookie.close()
- if self._can_authenticate(stem.connection.AuthMethod.NONE): + if _can_authenticate(stem.connection.AuthMethod.NONE): # authentication will work anyway self._check_auth(auth_type, auth_value) else: - if self._can_authenticate(auth_type): + if _can_authenticate(auth_type): exc_type = stem.connection.IncorrectCookieValue else: exc_type = stem.connection.CookieAuthRejected
self.assertRaises(exc_type, self._check_auth, auth_type, auth_value) - self._assert_auth_rejected_msg(auth_type, auth_value)
os.remove(auth_value)
@@ -215,7 +254,7 @@ class TestAuthenticate(unittest.TestCase):
auth_type = stem.connection.AuthMethod.COOKIE auth_value = "/if/this/exists/then/they're/asking/for/a/failure" - self.assertRaises(stem.connection.UnreadableCookieFile, self._check_auth, auth_type, auth_value) + self.assertRaises(stem.connection.UnreadableCookieFile, self._check_auth, auth_type, auth_value, False)
def test_authenticate_cookie_wrong_size(self): """ @@ -231,139 +270,44 @@ class TestAuthenticate(unittest.TestCase): # Weird coincidence? Fail so we can pick another file to check against. self.fail("Our torrc is 32 bytes, preventing the test_authenticate_cookie_wrong_size test from running.") else: - self.assertRaises(stem.connection.IncorrectCookieSize, self._check_auth, auth_type, auth_value) - - def _get_socket_auth(self): - """ - Provides the types of authentication that our current test socket accepts. - - Returns: - bool tuple of the form (password_auth, cookie_auth) - """ - - tor_options = test.runner.get_runner().get_options() - password_auth = test.runner.Torrc.PASSWORD in tor_options - cookie_auth = test.runner.Torrc.COOKIE in tor_options - - return password_auth, cookie_auth - - def _can_authenticate(self, auth_type): - """ - Checks if the given authentication type should be able to authenticate to - our current socket. - - Arguments: - auth_type (stem.connection.AuthMethod) - authentication method to check - - Returns: - bool that's True if we should be able to authenticate and False otherwise - """ - - password_auth, cookie_auth = self._get_socket_auth() - - # If the control socket is open then all authentication methods will be - # accepted. Otherwise check if our auth type matches what the socket - # accepts. - - if not password_auth and not cookie_auth: return True - elif auth_type == stem.connection.AuthMethod.PASSWORD: return password_auth - elif auth_type == stem.connection.AuthMethod.COOKIE: return cookie_auth - else: return False - - def _get_auth_function(self, control_socket, auth_type, *auth_args): - """ - Constructs a functor that performs the given authentication without - additional arguments. - - Arguments: - control_socket (stem.socket.ControlSocket) - socket for the function to - authenticate to - auth_type (stem.connection.AuthMethod) - method by which we should - authentiate to the control socket - auth_args (str) - arguments to be passed to the authentication function - """ - - if auth_type == stem.connection.AuthMethod.NONE: - auth_function = stem.connection.authenticate_none - elif auth_type == stem.connection.AuthMethod.PASSWORD: - auth_function = stem.connection.authenticate_password - elif auth_type == stem.connection.AuthMethod.COOKIE: - auth_function = stem.connection.authenticate_cookie - else: - raise ValueError("unexpected auth type: %s" % auth_type) - - if auth_args: - return functools.partial(auth_function, control_socket, *auth_args) - else: - return functools.partial(auth_function, control_socket) - - def _assert_auth_rejected_msg(self, auth_type, *auth_args): - """ - This asserts that authentication will fail with the rejection message given - by tor. Note that this test will need to be updated if tor changes its - rejection reponse. - - Arguments: - auth_type (stem.connection.AuthMethod) - method by which we should - authentiate to the control socket - auth_args (str) - arguments to be passed to the authentication function - """ - - control_socket = test.runner.get_runner().get_tor_socket(False) - auth_function = self._get_auth_function(control_socket, auth_type, *auth_args) - password_auth, cookie_auth = self._get_socket_auth() - - if cookie_auth and password_auth: - failure_msg = MULTIPLE_AUTH_FAIL - elif cookie_auth: - if auth_type == stem.connection.AuthMethod.COOKIE: - failure_msg = INCORRECT_COOKIE_FAIL - else: - failure_msg = COOKIE_AUTH_FAIL - elif password_auth: - if auth_type == stem.connection.AuthMethod.PASSWORD: - failure_msg = INCORRECT_PASSWORD_FAIL - else: - failure_msg = PASSWORD_AUTH_FAIL - else: - # shouldn't happen, if so then the test has a bug - raise ValueError("No methods of authentication. If this is an open socket then auth shoulnd't fail.") - - try: - auth_function() - control_socket.close() - self.fail() - except stem.connection.AuthenticationFailure, exc: - self.assertTrue(control_socket.is_alive()) - self.assertEqual(failure_msg, str(exc)) - control_socket.close() + self.assertRaises(stem.connection.IncorrectCookieSize, self._check_auth, auth_type, auth_value, False)
- def _check_auth(self, auth_type, *auth_args): + def _check_auth(self, auth_type, auth_arg = None, check_message = True): """ - Attempts to use the given authentication function against our connection. - If this works then checks that we can use the connection. If not then this - raises the exception. + Attempts to use the given type of authentication against tor's control + socket. If it succeeds then we check that the socket can then be used. If + not then we check that this gives a message that we'd expect then raises + the exception.
Arguments: auth_type (stem.connection.AuthMethod) - method by which we should authentiate to the control socket - auth_args (str) - arguments to be passed to the authentication function + auth_arg (str) - argument to be passed to the authentication function + check_message (bool) - checks that failure messages are what we'd expect
Raises: stem.connection.AuthenticationFailure if the authentication fails """
- control_socket = test.runner.get_runner().get_tor_socket(False) - auth_function = self._get_auth_function(control_socket, auth_type, *auth_args) - - # run the authentication, re-raising if there's a problem - try: - auth_function() - except stem.connection.AuthenticationFailure, exc: - self.assertTrue(control_socket.is_alive()) - control_socket.close() - raise exc - - test.runner.exercise_socket(self, control_socket) - control_socket.close() + with test.runner.get_runner().get_tor_socket(False) as control_socket: + # run the authentication, re-raising if there's a problem + try: + if auth_type == stem.connection.AuthMethod.NONE: + stem.connection.authenticate_none(control_socket) + elif auth_type == stem.connection.AuthMethod.PASSWORD: + stem.connection.authenticate_password(control_socket, auth_arg) + elif auth_type == stem.connection.AuthMethod.COOKIE: + stem.connection.authenticate_cookie(control_socket, auth_arg) + + test.runner.exercise_socket(self, control_socket) + except stem.connection.AuthenticationFailure, exc: + # authentication functions should re-attach on failure + self.assertTrue(control_socket.is_alive()) + + # check that we got the failure message that we'd expect + if check_message: + failure_msg = _get_auth_failure_message(auth_type) + self.assertEqual(failure_msg, str(exc)) + + raise exc
tor-commits@lists.torproject.org