commit e94e48ecf3209273b6de13bd405fa87d3f1203b4 Author: Megan mchang01@wesleyan.edu Date: Mon Jun 25 18:05:44 2012 -0400
First complete draft of proc unit testing code.
Most mocked functions utilize mock_fn(), which checks for expected input and either returns the appropriate output given the mocking or lets the original function be called. This is necessary in cases where outside code calls these functions and the mocked version is not able to supply them with the needed output. Private methods in the stem/util/proc.py file are not tested, nor is is_available(). This decision was based on the conventions set in system.py testing code. --- run_tests.py | 4 +- test/unit/util/proc.py | 262 ++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 220 insertions(+), 46 deletions(-)
diff --git a/run_tests.py b/run_tests.py index 670947a..a93b20e 100755 --- a/run_tests.py +++ b/run_tests.py @@ -29,6 +29,7 @@ import test.unit.util.connection import test.unit.util.enum import test.unit.util.system import test.unit.util.tor_tools +import test.unit.util.proc import test.unit.version import test.integ.connection.authentication import test.integ.connection.connect @@ -44,7 +45,6 @@ import test.integ.util.conf import test.integ.util.system import test.integ.process import test.integ.version -import test.unit.util.proc
import stem.prereq import stem.util.conf @@ -99,6 +99,7 @@ UNIT_TESTS = ( test.unit.util.enum.TestEnum, test.unit.util.connection.TestConnection, test.unit.util.conf.TestConf, + test.unit.util.proc.TestProc, test.unit.util.system.TestSystem, test.unit.util.tor_tools.TestTorTools, test.unit.descriptor.reader.TestDescriptorReader, @@ -111,7 +112,6 @@ UNIT_TESTS = ( test.unit.response.protocolinfo.TestProtocolInfoResponse, test.unit.response.authchallenge.TestAuthChallengeResponse, test.unit.connection.authentication.TestAuthenticate, - test.unit.util.proc.TestProc, )
INTEG_TESTS = ( diff --git a/test/unit/util/proc.py b/test/unit/util/proc.py index a474800..2e3dd67 100644 --- a/test/unit/util/proc.py +++ b/test/unit/util/proc.py @@ -1,70 +1,244 @@ """ -Unit testing code for proc utilities located in /stem/util/proc.py +Unit testing code for the stem.util.proc functions. + """ import os import time import unittest +import operator import functools +import cStringIO
import test.mocking as mocking import stem.util.proc as proc
-def mock_get_lines(file_path, line_prefixes, return_values): +class TargetError(Exception): """ - Provides mocking for the proc module's _get_line function. + Raised when a function that needs to be mocked cannot be called. + + This occurs when the caller's arguments don't match the expected arguments + and no target is given. """ - if isinstance(line_prefixes, tuple): - prefix_list = sorted(list(line_prefixes)) - else: - # Only one line prefix given. - prefix_list = line_prefixes - def _mock_get_lines(path, prefixes, return_values, caller_path, - caller_prefixes, caller_parameter): - if isinstance(caller_prefixes, tuple): - caller_prefix_list = sorted(list(caller_prefixes)) - else: - #Only one line prefix given. - caller_prefix_list = caller_prefixes - if path == caller_path and prefixes == caller_prefix_list: - return return_values + def __init__(self, value): + self.value = value + def __str__(self): + return repr(self.value) + +def mock_fn(exp_args, return_vals, target=None): + """ + Provides a lambda function that may be used to mock another function. + + :param list of tuples exp_args: expected input value(s) to be used for comparison + :param list return_vals: value(s) to be returned if expected input matches given input + :param function target: target function to be called if mocking doesn't cover this input + + :precondition: len(exp_args) = len(return_vals) + + return_vals[i] a=exp_args[i] + :returns: function _mocker such that: f(*a) = target(*a) a != exp_args[i] and target != None + raise TargetError a != exp_args[i] and target = None + """ + + def _mocked(tgt, exp_args, return_vals, *arg): + try: + # + i = operator.indexOf(exp_args, arg) + except ValueError: + i = -1 + if i == -1: + if tgt: + return tgt(*arg) else: - return None + raise TargetError("A relevant function could not be applied") + else: + return return_vals[i]
- return functools.partial(_mock_get_lines, file_path, prefix_list, - return_values) + return functools.partial(_mocked, target, exp_args, return_vals) + +def find_subsets(xs): + """ + Used with the builtin zip function to create all possible combinations + of two lists. Called in test_get_stats(). + + :param xs: list of tuples given by calling zip on two lists + + :returns: a list of lists of tuples + """ + + # Base case is the empty list. + if len(xs) == 0: + return [[]] + else: + subs = find_subsets(xs[1:]) + subs_w_x0 = [] + for s in subs: + subs_w_x0 = subs_w_x0 + [s + [xs[0]]] + return subs + subs_w_x0
class TestProc(unittest.TestCase): + + def setUp(self): + mocking.mock(time.time, mocking.return_value(3.14159)) + def tearDown(self): - mocking.revert_mocking() + mocking.revert_mocking()
def test_get_system_start_time(self): - """ - Tests the get_system_start_time function. - """ - mocking.mock(proc._get_line, mock_get_lines("/proc/stat", "btime", - "btime 1001001")) - - self.assertEquals(1001001, proc.get_system_start_time()) + """ + Tests the get_system_start_time function. + """ + + mocking.mock(proc._get_line, mock_fn([("/proc/stat", "btime", 'system start time')], + ["btime 1001001"])) + + # Single test as get_system_start_time takes no arguments + self.assertEquals(1001001, proc.get_system_start_time())
def test_get_physical_memory(self): - """ - Tests the get_physical_memory function. - """ - mocking.mock(proc._get_line, mock_get_lines("/proc/meminfo", - "MemTotal:", "MemTotal: 12345 kB")) + """ + Tests the get_physical_memory function. + """ + + mocking.mock(proc._get_line, mock_fn([("/proc/meminfo", + "MemTotal:", 'system physical memory')], ["MemTotal: 12345 kB"])) + + self.assertEquals((12345*1024), proc.get_physical_memory()) + + def test_get_cwd(self): + """ + Tests the get_cwd function with a given pid. + """ + + mocking.mock(os.readlink, mock_fn([('/proc/24019/cwd',)], + ['/home/directory/TEST'], os.listdir), os) + + # Test edge case of pid = 0 and a standard pid. + self.assertEquals('', proc.get_cwd(0)) + self.assertEquals('/home/directory/TEST', proc.get_cwd(24019)) + + + def test_get_uid(self): + """ + Tests the get_uid function with a given pid. + """ + + pid_list = [(24019, 11111), (0, 22222)] + + for pid in pid_list: + pid_id, user_id = pid + status_path = "/proc/%s/status" % pid_id + mocking.mock(proc._get_line, mock_fn([(status_path, "Uid:", "uid")], + ["Uid: %s" % user_id])) + self.assertEquals(user_id, proc.get_uid(pid_id))
- self.assertEquals((12345*1024), proc.get_physical_memory()) - def test_get_memory_usage(self): - """ - Tests the get_memory_usage function. + """ + Tests the get_memory_usage function with a given pid. + + This is the only function in proc.py that calls _get_lines explicitly. + """ + + mocking.mock(proc._get_lines, mock_fn([("/proc/1111/status", + ("VmRSS:", "VmSize:"), 'memory usage')], [{"VmRSS:": "VmRSS: 100 kB", + "VmSize:":"VmSize: 1800 kB"}])) + + # Test edge case of pid = 0 and a standard pid + self.assertEqual((0,0), proc.get_memory_usage(0)) + self.assertEqual((100*1024, 1800*1024), proc.get_memory_usage(1111)) + + def test_get_stats(self): + """ + Tests get_stats() with all combinations of stat_type arguments. + """ + + # Need to bypass proc.Stat.<command> calls as they aren't in this scope. + args = ['command', 'utime', 'stime', 'start time'] + responses = ['test_program', '0.13', '0.14', '10.21'] + + stat_path = "/proc/24062/stat" + stat = '1 (test_program) 2 3 4 5 6 7 8 9 10 11 12 13.0 14.0 15 16 17 18 19 20 21.0 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43' + + # List of all combinations of args with respective return values. + subsets = find_subsets(zip(args, responses)) + + mocking.mock(proc.get_system_start_time, mocking.return_value(10)) + + + # Tests the case where no stat_types are specified. + mocking.mock(proc._get_line, mock_fn( + [(stat_path, str(24062), 'process ')], [stat])) + self.assertEquals((), proc.get_stats(24062)) + + # Don't handle empty set of commands here. (just did above) + subsets.remove([]) + + for sub in subsets: + # sub = [(arg1, resp1), (arg2, resp2),...]. We need (arg1,arg2,...) + # and (resp1,resp2,...). + arg, response = zip(*sub) + + mocking.mock(proc._get_line, mock_fn( + [(stat_path, str(24062), 'process %s' % ", ".join(arg))], [stat])) + # Iterates through each combination of commands. + self.assertEquals(response, proc.get_stats(24062, *arg)) + + # Tests the case where pid = 0. + for sub in subsets: + arg, response = zip(*sub) + if 'start time' in arg: + response = 10 + else: + response = () + for a in arg: + if a == 'command': + response += ('sched',) + elif a == 'utime': + response += ('0',) + elif a == 'stime': + response += ('0',) + + mocking.mock(proc._get_line, mock_fn( + [('/proc/0/stat', str(0), 'process %s' % ", ".join(arg))], [stat]))
- This is the only function in proc.py that utilizes - _get_lines explicitly. - """ + self.assertEquals(response, proc.get_stats(0, *arg)) + + def test_get_connections(self): + """ + Tests the get_connections function along with a given pid. + """ + + pid = 1111 + fd_list = ['1', '2', '3', '4'] + fd_paths = [] + for fd in fd_list: + fd_paths.append(('/proc/%s/fd/%s' % (str(pid), fd),)) + readlink_results = ['socket:[99999999]', 'socket:[IIIIIIII]', 'pipe:[30303]', 'pipe:[40404]'] + + # Will be put in cStringIO wrappers and treated as files. + tcp = '\n 0: 11111111:1111 22222222:2222 01 44444444:44444444 55:55555555 66666666 1111 8 99999999' + udp = '\n A: BBBBBBBB:BBBB CCCCCCCC:CCCC DD EEEEEEEE:EEEEEEEE FF:FFFFFFFF GGGGGGGG 1111 H IIIIIIII' + + file_list = [cStringIO.StringIO(tcp), cStringIO.StringIO(udp)] + contents_list = [tcp, udp] + + # Mock os.listdir, os.readlink, and open. + mocking.mock(os.listdir, mock_fn([('/proc/%s/fd' % str(pid),)], [fd_list], os.listdir), os) + mocking.mock(os.readlink, mock_fn(fd_paths, readlink_results, os.readlink), os) + + mocking.mock(open, mock_fn([('/proc/net/tcp',), ('/proc/net/udp',)], file_list, open)) + + # Tests the edge case of pid = 0. + self.assertEquals([], proc.get_connections(0)) + + # First build expected responses, then call the function to be tested. + results = [] + for proc_file in file_list: + contents = proc_file.getvalue()
- mocking.mock(proc._get_lines, mock_get_lines("/proc/1111/status", - ("VmRSS:", "VmSize:"), {"VmRSS:": "VmRSS: 100 kB", - "VmSize:":"VmSize: 1800 kB"})) + _, l_addr, f_addr, status, _, _, _, _, _, inode = contents.split()[:10]
- self.assertEqual((100*1024, 1800*1024), proc.get_memory_usage(1111)) + local_ip, local_port = proc._decode_proc_address_encoding(l_addr) + foreign_ip, foreign_port = proc._decode_proc_address_encoding(f_addr) + results.append((local_ip, local_port, foreign_ip, foreign_port)) + + self.assertEquals(results, proc.get_connections(1111))