commit 14087a70e2c0e40541d610ce41c4b08c8fab6b3e
Author: Damian Johnson <atagar(a)torproject.org>
Date: Fri Jun 29 08:42:08 2012 -0700
Revisions for proc unit and integ tests
Lots 'o changes, most just nitpicks to satisfy my OCD. The main functional
changes are just...
* The mock_fn() and find_subsets() helpers are useful for other tests so moving
them to the mocking module. The authentication unit tests, for instance, had
a helper that this has replaced.
* The default target for the mock_fn() calls doesn't seem to be needed, and
could mask issues that prevents proper system abstraction (ie, if we're
passing through to os.readlink() then we probably won't notice that the test
only works on Linux).
* The stem.util.proc.is_available() check was broken since we were using the
function as a boolean, which is always True...
>>> def foo():
... return False
...
>>> bool(foo)
True
* We created a controller to spawn a control connection, but forgot to close
the controller afterward.
* Test broken if we were running without a control port (for instance, with the
RUN_SOCKET target).
---
run_tests.py | 4 +-
test/integ/util/proc.py | 103 +++++++--------
test/mocking.py | 58 ++++++++
test/unit/connection/authentication.py | 31 ++---
test/unit/util/proc.py | 232 ++++++++++++--------------------
5 files changed, 201 insertions(+), 227 deletions(-)
diff --git a/run_tests.py b/run_tests.py
index 69cf441..7ccedd3 100755
--- a/run_tests.py
+++ b/run_tests.py
@@ -27,9 +27,9 @@ import test.unit.response.authchallenge
import test.unit.util.conf
import test.unit.util.connection
import test.unit.util.enum
+import test.unit.util.proc
import test.unit.util.system
import test.unit.util.tor_tools
-import test.unit.util.proc
import test.unit.version
import test.integ.connection.authentication
import test.integ.connection.connect
@@ -42,8 +42,8 @@ import test.integ.descriptor.server_descriptor
import test.integ.descriptor.extrainfo_descriptor
import test.integ.response.protocolinfo
import test.integ.util.conf
-import test.integ.util.system
import test.integ.util.proc
+import test.integ.util.system
import test.integ.process
import test.integ.version
diff --git a/test/integ/util/proc.py b/test/integ/util/proc.py
index 21cd5d8..502d13f 100644
--- a/test/integ/util/proc.py
+++ b/test/integ/util/proc.py
@@ -1,106 +1,97 @@
"""
-Integration tests for stem.util.proc functions against the tor process that we're running.
+Integration tests for stem.util.proc functions against the tor process that
+we're running.
"""
+
+from __future__ import with_statement
+
import os
-import socket
import unittest
import test.runner
-import stem.socket
-import stem.control
import stem.util.proc as proc
class TestProc(unittest.TestCase):
def test_get_cwd(self):
"""
- Tests the stem.util.proc.get_cwd function.
+ Checks that stem.util.proc.get_cwd matches our tor instance's cwd.
"""
- # Skips test if proc utilities are unavailable on this platform.
- # This is repeated at the beginning of every proc integration test.
- if not proc.is_available:
- test.runner.skip(self, "(Unavailable on this platform)")
+ if not proc.is_available():
+ test.runner.skip(self, "(proc unavailable)")
return
runner = test.runner.get_runner()
-
runner_pid, tor_cwd = runner.get_pid(), runner.get_tor_cwd()
self.assertEquals(tor_cwd, proc.get_cwd(runner_pid))
-
+
def test_get_uid(self):
"""
- Tests the stem.util.proc.get_uid function.
+ Checks that stem.util.proc.get_uid matches our tor instance's uid.
"""
- if not proc.is_available:
- test.runner.skip(self, "(Unavailable on this platform)")
+ if not proc.is_available():
+ test.runner.skip(self, "(proc unavailable)")
return
tor_pid = test.runner.get_runner().get_pid()
-
self.assertEquals(os.geteuid(), proc.get_uid(tor_pid))
-
+
def test_get_memory_usage(self):
"""
- Tests the stem.util.proc.get_memory_usage function.
+ Checks that stem.util.proc.get_memory_usage looks somewhat reasonable.
"""
- if not proc.is_available:
- test.runner.skip(self, "(Unavailable on this platform)")
+ if not proc.is_available():
+ test.runner.skip(self, "(proc unavailable)")
return
tor_pid = test.runner.get_runner().get_pid()
- res_size, vir_size = (proc.get_memory_usage(tor_pid))
- # Checks if get_memory_usage() is greater than a kilobyte.
- res_bool, vir_bool = res_size > 1024, vir_size > 1024
-
- self.assertTrue(res_bool)
- self.assertTrue(vir_bool)
+ res_size, vir_size = proc.get_memory_usage(tor_pid)
+ # checks that they're larger than a kilobyte
+ self.assertTrue(res_size > 1024)
+ self.assertTrue(vir_size > 1024)
+
def test_get_stats(self):
"""
- Tests the stem.util.proc.get_memory_usage function.
+ Checks that stem.util.proc.get_stats looks somewhat reasonable.
"""
- if not proc.is_available:
- test.runner.skip(self, "(Unavailable on this platform)")
+ if not proc.is_available():
+ test.runner.skip(self, "(proc unavailable)")
return
tor_pid = test.runner.get_runner().get_pid()
command, utime, stime, start_time = proc.get_stats(tor_pid, 'command', 'utime', 'stime', 'start time')
- # Checks if utime and stime are greater than 0.
- utime_bool = utime > 0
- stime_bool = stime > 0
- # Checks if start time is greater than get_system_start_time().
- start_time_bool = start_time > proc.get_system_start_time()
-
self.assertEquals('tor', command)
- self.assertTrue(utime_bool)
- self.assertTrue(stime_bool)
- self.assertTrue(start_time_bool)
-
+ self.assertTrue(utime > 0)
+ self.assertTrue(stime > 0)
+ self.assertTrue(start_time > proc.get_system_start_time())
+
def test_get_connections(self):
"""
- Tests the stem.util.proc.get_connections function.
-
- Checks that get_connections() provides the control connection.
+ Checks for our control port in the stem.util.proc.get_connections output if
+ we have one.
"""
- if not proc.is_available:
- test.runner.skip(self, "(Unavailable on this platform)")
- return
+ runner = test.runner.get_runner()
- tor_pid = test.runner.get_runner().get_pid()
- test.runner.get_runner().get_tor_controller(test.runner.CONTROL_PASSWORD)
- ip_bool, socket_bool = False, False
- for tup in proc.get_connections(tor_pid):
- if '127.0.0.1' in tup:
- ip_bool = True
- if test.runner.CONTROL_PORT in tup:
- socket_bool = True
- if ip_bool and socket_bool:
- continue
+ if not proc.is_available():
+ test.runner.skip(self, "(proc unavailable)")
+ return
+ elif not test.runner.Torrc.PORT in runner.get_options():
+ test.runner.skip(self, "(no control port)")
+ return
- self.assertTrue(ip_bool)
- self.assertTrue(socket_bool)
+ # making a controller connection so that we have something to query for
+ with runner.get_tor_socket():
+ tor_pid = test.runner.get_runner().get_pid()
+
+ for conn in proc.get_connections(tor_pid):
+ if ("127.0.0.1", test.runner.CONTROL_PORT) == conn[:2]:
+ return
+
+ self.fail()
+
diff --git a/test/mocking.py b/test/mocking.py
index e4245a9..94d519e 100644
--- a/test/mocking.py
+++ b/test/mocking.py
@@ -9,6 +9,7 @@ calling :func:`test.mocking.revert_mocking`.
mock - replaces a function with an alternative implementation
revert_mocking - reverts any changes made by the mock function
get_real_function - provides the non-mocked version of a function
+ get_all_combinations - provides all combinations of attributes
Mocking Functions
no_op - does nothing
@@ -16,6 +17,7 @@ calling :func:`test.mocking.revert_mocking`.
return_true - returns True
return_false - returns False
return_none - returns None
+ return_for_args - return based on the input arguments
raise_exception - raises an exception when called
Instance Constructors
@@ -55,6 +57,27 @@ def return_true(): return return_value(True)
def return_false(): return return_value(False)
def return_none(): return return_value(None)
+def return_for_args(args_to_return_value, default = None):
+ """
+ Returns a value if the arguments to it match something in a given
+ 'argument => return value' mapping. Otherwise, a default function
+ is called with the arguments.
+
+ :param dict args_to_return_value: mapping of arguments to the value we should provide
+ :param functor default: returns the value of this function if the args don't match something that we have, we raise a ValueError by default
+ """
+
+ def _return_value(*args):
+ if args in args_to_return_value:
+ return args_to_return_value[args]
+ elif default is None:
+ arg_label = ", ".join([str(v) for v in args])
+ raise ValueError("Unrecognized argument sent for return_for_args(): %s" % arg_label)
+ else:
+ return default(args)
+
+ return _return_value
+
def raise_exception(exception):
def _raise(*args): raise exception
return _raise
@@ -180,6 +203,41 @@ def get_real_function(function):
else:
return function
+def get_all_combinations(attr, include_empty = False):
+ """
+ Provides an iterator for all combinations of a set of attributes. For
+ instance...
+
+ ::
+
+ >>> list(test.mocking.get_all_combinations(["a", "b", "c"]))
+ [('a',), ('b',), ('c',), ('a', 'b'), ('a', 'c'), ('b', 'c'), ('a', 'b', 'c')]
+
+ :param list attr: attributes to provide combinations for
+ :param bool include_empty: includes an entry with zero items if True
+ :returns: iterator for all combinations
+ """
+
+ # Makes an itertools.product() call for 'i' copies of attr...
+ #
+ # * itertools.product(attr) => all one-element combinations
+ # * itertools.product(attr, attr) => all two-element combinations
+ # * ... etc
+
+ if include_empty: yield ()
+
+ seen = set()
+ for i in xrange(1, len(attr) + 1):
+ product_arg = [attr for j in xrange(i)]
+
+ for item in itertools.product(*product_arg):
+ # deduplicate, sort, and only provide if we haven't seen it yet
+ item = tuple(sorted(set(item)))
+
+ if not item in seen:
+ seen.add(item)
+ yield item
+
def get_message(content, reformat = True):
"""
Provides a ControlMessage with content modified to be parsable. This makes
diff --git a/test/unit/connection/authentication.py b/test/unit/connection/authentication.py
index 085eae6..2efcab8 100644
--- a/test/unit/connection/authentication.py
+++ b/test/unit/connection/authentication.py
@@ -15,27 +15,6 @@ import stem.connection
import stem.util.log as log
import test.mocking as mocking
-def _get_all_auth_method_combinations():
- """
- Enumerates all types of authentication that a PROTOCOLINFO response may
- provide, returning a tuple with the AuthMethod enums.
- """
-
- for is_none in (False, True):
- for is_password in (False, True):
- for is_cookie in (False, True):
- for is_safecookie in (False, True):
- for is_unknown in (False, True):
- auth_methods = []
-
- if is_none: auth_methods.append(stem.connection.AuthMethod.NONE)
- if is_password: auth_methods.append(stem.connection.AuthMethod.PASSWORD)
- if is_cookie: auth_methods.append(stem.connection.AuthMethod.COOKIE)
- if is_safecookie: auth_methods.append(stem.connection.AuthMethod.SAFECOOKIE)
- if is_unknown: auth_methods.append(stem.connection.AuthMethod.UNKNOWN)
-
- yield tuple(auth_methods)
-
class TestAuthenticate(unittest.TestCase):
def setUp(self):
mocking.mock(stem.connection.get_protocolinfo, mocking.no_op())
@@ -111,7 +90,15 @@ class TestAuthenticate(unittest.TestCase):
all_auth_password_exc += control_exc
all_auth_cookie_exc += control_exc
- for protocolinfo_auth_methods in _get_all_auth_method_combinations():
+ auth_method_combinations = mocking.get_all_combinations([
+ stem.connection.AuthMethod.NONE,
+ stem.connection.AuthMethod.PASSWORD,
+ stem.connection.AuthMethod.COOKIE,
+ stem.connection.AuthMethod.SAFECOOKIE,
+ stem.connection.AuthMethod.UNKNOWN,
+ ], include_empty = True)
+
+ for protocolinfo_auth_methods in auth_method_combinations:
# protocolinfo input for the authenticate() call we'll be making
protocolinfo_arg = mocking.get_protocolinfo_response(
auth_methods = protocolinfo_auth_methods,
diff --git a/test/unit/util/proc.py b/test/unit/util/proc.py
index 7ae3865..e1aaec7 100644
--- a/test/unit/util/proc.py
+++ b/test/unit/util/proc.py
@@ -1,83 +1,15 @@
"""
Unit testing code for the stem.util.proc functions.
-
"""
+
import os
-import time
import unittest
-import operator
-import functools
-import itertools
-import cStringIO
+import StringIO
import test.mocking as mocking
import stem.util.proc as proc
-class TargetError(Exception):
- """
- Raised when a function that needs to be mocked cannot be called.
-
- This occurs when the caller's arguments don't match the expected arguments
- and no target is given.
- """
- def __init__(self, value):
- self.value = value
- def __str__(self):
- return repr(self.value)
-
-def mock_fn(arguments_returns, target=None):
- """
- Provides a lambda function that may be used to mock another function.
-
- This function operates under the precondition that len(exp_args) = len(return_vals)
-
- :param dict args_rets: expected input value(s) as tuples are the key and return values are the values of the dictionary.
- :param function target: target function to be called if mocking doesn't cover this input
-
- :returns: function _mocker such that...
- *return_vals[i] a = exp_args[i]
- *target(*a) a != exp_args[i] and target != None
- *raise TargetError a != exp_args[i] and target = None
- """
-
- def _mocked(args_rets, tgt, *arg):
- try:
- # First check if given input matches one of the inputs to be mocked.
- return args_rets[arg]
- except KeyError:
- if tgt:
- return tgt(*arg)
- else:
- raise TargetError("A relevant function could not be applied")
- return functools.partial(_mocked, arguments_returns, target)
-
-def find_subsets(xs):
- """
- Used with the builtin zip function to create all possible combinations
- of the elements of two lists.
-
- Called in test_get_stats().
-
- :param xs: list of tuples given by calling zip on two lists
-
- :returns: a list of lists of tuples containing all possible combinations of arguments. Will be of the form [[(arg1, resp1),(arg2, resp2),...], [(arg1, resp1),(arg3, resp3),...],...]
- """
-
- # Base case is the empty list.
- if len(xs) == 0:
- return [[]]
- else:
- subs = find_subsets(xs[1:])
- subs_w_x0 = []
- for s in subs:
- subs_w_x0 = subs_w_x0 + [s + [xs[0]]]
- return subs + subs_w_x0
-
class TestProc(unittest.TestCase):
-
- def setUp(self):
- mocking.mock(time.time, mocking.return_value(3.14159))
-
def tearDown(self):
mocking.revert_mocking()
@@ -86,9 +18,10 @@ class TestProc(unittest.TestCase):
Tests the get_system_start_time function.
"""
- mocking.mock(proc._get_line, mock_fn({('/proc/stat', 'btime', 'system start time'):'btime 1001001'}))
+ mocking.mock(proc._get_line, mocking.return_for_args({
+ ('/proc/stat', 'btime', 'system start time'): 'btime 1001001',
+ }))
- # Single test as get_system_start_time takes no arguments
self.assertEquals(1001001, proc.get_system_start_time())
def test_get_physical_memory(self):
@@ -96,138 +29,143 @@ class TestProc(unittest.TestCase):
Tests the get_physical_memory function.
"""
- mocking.mock(proc._get_line, mock_fn({('/proc/meminfo', 'MemTotal:', 'system physical memory'):'MemTotal: 12345 kB'}))
+ mocking.mock(proc._get_line, mocking.return_for_args({
+ ('/proc/meminfo', 'MemTotal:', 'system physical memory'): 'MemTotal: 12345 kB',
+ }))
self.assertEquals((12345*1024), proc.get_physical_memory())
-
+
def test_get_cwd(self):
"""
Tests the get_cwd function with a given pid.
"""
- mocking.mock(os.readlink, mock_fn({('/proc/24019/cwd',):'/home/directory/TEST'}, os.listdir), os)
+ mocking.mock(os.readlink, mocking.return_for_args({
+ ('/proc/24019/cwd',): '/home/directory/TEST'
+ }), os)
- # Test edge case of pid = 0 and a standard pid.
- self.assertEquals('', proc.get_cwd(0))
self.assertEquals('/home/directory/TEST', proc.get_cwd(24019))
-
-
+
def test_get_uid(self):
"""
Tests the get_uid function with a given pid.
"""
- pid_list = [(24019, 11111), (0, 22222)]
-
- for pid in pid_list:
- pid_id, user_id = pid
- status_path = "/proc/%s/status" % pid_id
- mocking.mock(proc._get_line, mock_fn({(status_path, 'Uid:', 'uid'):'Uid: %s' % user_id}))
-
- self.assertEquals(user_id, proc.get_uid(pid_id))
+ for test_value in [(24019, 11111), (0, 22222)]:
+ pid, uid = test_value
+ mocking.mock(proc._get_line, mocking.return_for_args({
+ ("/proc/%s/status" % pid, 'Uid:', 'uid'): 'Uid: %s' % uid
+ }))
+ self.assertEquals(uid, proc.get_uid(pid))
+
def test_get_memory_usage(self):
"""
Tests the get_memory_usage function with a given pid.
-
- This is the only function in proc.py that calls _get_lines explicitly.
"""
- mocking.mock(proc._get_lines, mock_fn({('/proc/1111/status', ('VmRSS:', 'VmSize:'), 'memory usage'):{'VmRSS:':'VmRSS: 100 kB', 'VmSize:':'VmSize: 1800 kB'}}))
+ mocking.mock(proc._get_lines, mocking.return_for_args({
+ ('/proc/1111/status', ('VmRSS:', 'VmSize:'), 'memory usage'):
+ {'VmRSS:':'VmRSS: 100 kB', 'VmSize:':'VmSize: 1800 kB'}
+ }))
- # Test edge case of pid = 0 and a standard pid
- self.assertEqual((0,0), proc.get_memory_usage(0))
+ self.assertEqual((0, 0), proc.get_memory_usage(0))
self.assertEqual((100*1024, 1800*1024), proc.get_memory_usage(1111))
-
+
def test_get_stats(self):
"""
Tests get_stats() with all combinations of stat_type arguments.
"""
- # Need to bypass proc.Stat.<command> calls as they aren't in this scope.
- args = ['command', 'utime', 'stime', 'start time']
- responses = ['test_program', '0.13', '0.14', '10.21']
+ # list of all combinations of args with respective return values
+ stat_combinations = mocking.get_all_combinations([
+ ('command', 'test_program'),
+ ('utime', '0.13'),
+ ('stime', '0.14'),
+ ('start time', '10.21'),
+ ])
stat_path = "/proc/24062/stat"
stat = '1 (test_program) 2 3 4 5 6 7 8 9 10 11 12 13.0 14.0 15 16 17 18 19 20 21.0 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43'
- # List of all combinations of args with respective return values.
- subsets = find_subsets(zip(args, responses))
-
mocking.mock(proc.get_system_start_time, mocking.return_value(10))
+ # tests the case where no stat_types are specified
+ mocking.mock(proc._get_line, mocking.return_for_args({
+ (stat_path, '24062', 'process '): stat
+ }))
- # Tests the case where no stat_types are specified.
- mocking.mock(proc._get_line, mock_fn({(stat_path, str(24062), 'process '):stat}))
self.assertEquals((), proc.get_stats(24062))
- # Don't handle empty set of commands here. (just did above)
- subsets.remove([])
-
- for sub in subsets:
- # sub = [(arg1, resp1), (arg2, resp2),...]. We need (arg1,arg2,...)
- # and (resp1,resp2,...).
- arg, response = zip(*sub)
+ for stats in stat_combinations:
+ # the stats variable is...
+ # [(arg1, resp1), (arg2, resp2)...]
+ #
+ # but we need...
+ # (arg1, arg2...), (resp1, resp2...).
- mocking.mock(proc._get_line, mock_fn({(stat_path, str(24062), 'process %s' % ', '.join(arg)):stat}))
+ args, response = zip(*stats)
- # Iterates through each combination of commands.
- self.assertEquals(response, proc.get_stats(24062, *arg))
+ mocking.mock(proc._get_line, mocking.return_for_args({
+ (stat_path, '24062', 'process %s' % ', '.join(args)): stat
+ }))
- # Tests the case where pid = 0.
- for sub in subsets:
- arg, response = zip(*sub)
- if 'start time' in arg:
+ self.assertEquals(response, proc.get_stats(24062, *args))
+
+ # tests the case where pid = 0
+
+ if 'start time' in args:
response = 10
else:
response = ()
- for a in arg:
- if a == 'command':
+
+ for arg in args:
+ if arg == 'command':
response += ('sched',)
- elif a == 'utime':
+ elif arg == 'utime':
response += ('0',)
- elif a == 'stime':
+ elif arg == 'stime':
response += ('0',)
- mocking.mock(proc._get_line, mock_fn({('/proc/0/stat', str(0), 'process %s' % ', '.join(arg)):stat}))
- self.assertEquals(response, proc.get_stats(0, *arg))
+ mocking.mock(proc._get_line, mocking.return_for_args({
+ ('/proc/0/stat', '0', 'process %s' % ', '.join(args)): stat
+ }))
+
+ self.assertEquals(response, proc.get_stats(0, *args))
def test_get_connections(self):
"""
- Tests the get_connections function along with a given pid.
+ Tests the get_connections function.
"""
pid = 1111
- fd_list = ['1', '2', '3', '4']
- readlink_results = ['socket:[99999999]', 'socket:[IIIIIIII]', 'pipe:[30303]', 'pipe:[40404]']
- input_vals = {}
- for i in range(len(fd_list)):
- input_vals[('/proc/%s/fd/%s' % (str(pid), fd_list[i]),)] = readlink_results[i]
+ mocking.mock(os.listdir, mocking.return_for_args({
+ ('/proc/%s/fd' % pid,): ['1', '2', '3', '4'],
+ }), os)
+
+ mocking.mock(os.readlink, mocking.return_for_args({
+ ('/proc/%s/fd/1' % pid,): 'socket:[99999999]',
+ ('/proc/%s/fd/2' % pid,): 'socket:[IIIIIIII]',
+ ('/proc/%s/fd/3' % pid,): 'pipe:[30303]',
+ ('/proc/%s/fd/4' % pid,): 'pipe:[40404]',
+ }), os)
- # Will be put in cStringIO wrappers and treated as files.
tcp = '\n 0: 11111111:1111 22222222:2222 01 44444444:44444444 55:55555555 66666666 1111 8 99999999'
udp = '\n A: BBBBBBBB:BBBB CCCCCCCC:CCCC DD EEEEEEEE:EEEEEEEE FF:FFFFFFFF GGGGGGGG 1111 H IIIIIIII'
- file_vals = {('/proc/net/tcp',):cStringIO.StringIO(tcp), ('/proc/net/udp',):cStringIO.StringIO(udp)}
-
- # Mock os.listdir, os.readlink, and open with mock_fn.
- mocking.mock(os.listdir, mock_fn({('/proc/%s/fd' % str(pid),):fd_list}, os.listdir), os)
- mocking.mock(os.readlink, mock_fn(input_vals, os.readlink), os)
- mocking.mock(open, mock_fn(file_vals, open))
+ mocking.mock(open, mocking.return_for_args({
+ ('/proc/net/tcp',): StringIO.StringIO(tcp),
+ ('/proc/net/udp',): StringIO.StringIO(udp)
+ }))
- # Tests the edge case of pid = 0.
+ # tests the edge case of pid = 0
self.assertEquals([], proc.get_connections(0))
- # First build expected responses, then call the function to be tested.
- results = []
- for keys, files in file_vals.iteritems():
- contents = files.getvalue()
-
- _, l_addr, f_addr, status, _, _, _, _, _, inode = contents.split()[:10]
-
- local_ip, local_port = proc._decode_proc_address_encoding(l_addr)
- foreign_ip, foreign_port = proc._decode_proc_address_encoding(f_addr)
- results.append((local_ip, local_port, foreign_ip, foreign_port))
- print "results: %s" % results
- self.assertEquals(results, proc.get_connections(1111))
+ expected_results = [
+ ('17.17.17.17', 4369, '34.34.34.34', 8738),
+ ('187.187.187.187', 48059, '204.204.204.204', 52428),
+ ]
+
+ self.assertEquals(expected_results, proc.get_connections(pid))
+