commit 7f760f86414ee0bfbd050480e1753555c66e9a5b Author: Damian Johnson atagar@torproject.org Date: Tue Nov 29 10:02:24 2011 -0800
Function and testing for password authentication
Adding a function for password authentication. This included escaping quotes but otherwise is trivial - most of the effort was refactoring the authentication integ tests. --- stem/connection.py | 30 +++++++ test/integ/connection/authentication.py | 130 +++++++++++++++++++++++++++---- 2 files changed, 145 insertions(+), 15 deletions(-)
diff --git a/stem/connection.py b/stem/connection.py index bef8d50..3ea5747 100644 --- a/stem/connection.py +++ b/stem/connection.py @@ -62,6 +62,36 @@ def authenticate_none(control_socket): if str(auth_response) != "OK": raise ValueError(str(auth_response))
+def authenticate_password(control_socket, password): + """ + Authenticates to a control socket that uses a password (via the + HashedControlPassword torrc option). Quotes in the password are escaped. + + If authentication fails then tor will close the control socket. + + Arguments: + control_socket (stem.socket.ControlSocket) - socket to be authenticated + password (str) - passphrase to present to the socket + + Raises: + ValueError if the authentication credentials aren't accepted + stem.socket.ProtocolError the content from the socket is malformed + stem.socket.SocketError if problems arise in using the socket + """ + + # Escapes quotes. Tor can include those in the password hash, in which case + # it expects escaped quotes from the controller. For more information see... + # https://trac.torproject.org/projects/tor/ticket/4600 + + password = password.replace('"', '\"') + + control_socket.send("AUTHENTICATE "%s"" % password) + auth_response = control_socket.recv() + + # if we got anything but an OK response then error + if str(auth_response) != "OK": + raise ValueError(str(auth_response)) + def get_protocolinfo_by_port(control_addr = "127.0.0.1", control_port = 9051, get_socket = False): """ Issues a PROTOCOLINFO query to a control port, getting information about the diff --git a/test/integ/connection/authentication.py b/test/integ/connection/authentication.py index 4ba0bfa..c3759e6 100644 --- a/test/integ/connection/authentication.py +++ b/test/integ/connection/authentication.py @@ -4,6 +4,7 @@ stem.connection.authenticate_* functions. """
import unittest +import functools
import test.runner import stem.connection @@ -15,6 +16,9 @@ COOKIE_AUTH_FAIL = "Authentication failed: Wrong length on authentication cookie PASSWORD_AUTH_FAIL = "Authentication failed: Password did not match HashedControlPassword value from configuration. Maybe you tried a plain text password? If so, the standard requires that you put it in double quotes." MULTIPLE_AUTH_FAIL = "Authentication failed: Password did not match HashedControlPassword *or* authentication cookie."
+# this only arises in password-only auth when we authenticate by password +INCORRECT_PASSWORD_FAIL = "Authentication failed: Password did not match HashedControlPassword value from configuration" + class TestAuthenticate(unittest.TestCase): """ Tests the authentication methods. This should be run with the 'CONN_ALL' @@ -32,31 +36,127 @@ class TestAuthenticate(unittest.TestCase): if connection_type == test.runner.TorConnection.NONE: self.skipTest("(no connection)")
- # If the connection has authentication then this will fail with a message - # based on the authentication type. If not then this will succeed. + expect_success = self._is_authenticateable(stem.connection.AuthMethod.NONE) + self._check_auth(stem.connection.AuthMethod.NONE, None, expect_success) + + def test_authenticate_password(self): + """ + Tests the authenticate_password function. + """ + + runner = test.runner.get_runner() + connection_type = runner.get_connection_type() + + if connection_type == test.runner.TorConnection.NONE: + self.skipTest("(no connection)") + + expect_success = self._is_authenticateable(stem.connection.AuthMethod.PASSWORD) + self._check_auth(stem.connection.AuthMethod.PASSWORD, test.runner.CONTROL_PASSWORD, expect_success) + + # Check with an empty, invalid, and quoted password. These should work if + # we have no authentication, and fail otherwise. + + expect_success = self._is_authenticateable(stem.connection.AuthMethod.NONE) + self._check_auth(stem.connection.AuthMethod.PASSWORD, "", expect_success) + self._check_auth(stem.connection.AuthMethod.PASSWORD, "blarg", expect_success) + self._check_auth(stem.connection.AuthMethod.PASSWORD, "this has a " in it", expect_success) + + def _get_socket_auth(self): + """ + Provides the types of authentication that our current test socket accepts.
- control_socket = test.runner.get_runner().get_tor_socket(False) + Returns: + bool tuple of the form (password_auth, cookie_auth) + """
+ connection_type = test.runner.get_runner().get_connection_type() connection_options = test.runner.CONNECTION_OPTS[connection_type] - cookie_auth = test.runner.OPT_COOKIE in connection_options password_auth = test.runner.OPT_PASSWORD in connection_options + cookie_auth = test.runner.OPT_COOKIE in connection_options
- if cookie_auth or password_auth: - if cookie_auth and password_auth: failure_msg = MULTIPLE_AUTH_FAIL - elif cookie_auth: failure_msg = COOKIE_AUTH_FAIL - else: failure_msg = PASSWORD_AUTH_FAIL - - try: - stem.connection.authenticate_none(control_socket) - self.fail() - except ValueError, exc: - self.assertEqual(failure_msg, str(exc)) + return password_auth, cookie_auth + + def _is_authenticateable(self, auth_type): + """ + Checks if the given authentication type should be able to authenticate to + our current socket. + + Arguments: + auth_type (stem.connection.AuthMethod) - authentication method to check + + Returns: + bool that's True if we should be able to authenticate and False otherwise + """ + + password_auth, cookie_auth = self._get_socket_auth() + + # If the control socket is open then all authentication methods will be + # accepted. Otherwise check if our auth type matches what the socket + # accepts. + + if not password_auth and not cookie_auth: return True + elif auth_type == stem.connection.AuthMethod.PASSWORD: return password_auth + elif auth_type == stem.connection.AuthMethod.COOKIE: return cookie_auth + else: return False + + def _check_auth(self, auth_type, auth_value, expect_success): + """ + Attempts to use the given authentication function against our connection. + If this works then checks that we can use the connection. If not then we + check that the error message is what we'd expect. + + Arguments: + auth_type (stem.connection.AuthMethod) - method by which we should + authentiate to the control socket + auth_value (str) - value to be provided to the authentication function + expect_success (bool) - true if the authentication should succeed, false + otherwise + """ + + runner = test.runner.get_runner() + control_socket = runner.get_tor_socket(False) + password_auth, cookie_auth = self._get_socket_auth() + + # construct the function call + + if auth_type == stem.connection.AuthMethod.NONE: + auth_function = stem.connection.authenticate_none + elif auth_type == stem.connection.AuthMethod.PASSWORD: + auth_function = stem.connection.authenticate_password + elif auth_type == stem.connection.AuthMethod.COOKIE: + auth_function = None # TODO: fill in + else: + raise ValueError("unexpected auth type: %s" % auth_type) + + if auth_value != None: + auth_function = functools.partial(auth_function, control_socket, auth_value) else: - stem.connection.authenticate_none(control_socket) + auth_function = functools.partial(auth_function, control_socket) + + if expect_success: + auth_function()
# issues a 'GETINFO config-file' query to confirm that we can use the socket
control_socket.send("GETINFO config-file") config_file_response = control_socket.recv() self.assertEquals("config-file=%s\nOK" % runner.get_torrc_path(), str(config_file_response)) + control_socket.close() + else: + if cookie_auth and password_auth: failure_msg = MULTIPLE_AUTH_FAIL + elif cookie_auth: failure_msg = COOKIE_AUTH_FAIL + else: + # if we're attempting to authenticate with a password then it's a + # truncated message + + if auth_type == stem.connection.AuthMethod.PASSWORD: + failure_msg = INCORRECT_PASSWORD_FAIL + else: + failure_msg = PASSWORD_AUTH_FAIL + + try: + auth_function() + self.fail() + except ValueError, exc: + self.assertEqual(failure_msg, str(exc))
tor-commits@lists.torproject.org