[tor-commits] [stem/master] First complete draft of proc unit testing code.

atagar at torproject.org atagar at torproject.org
Sun Jul 1 02:49:59 UTC 2012


commit e94e48ecf3209273b6de13bd405fa87d3f1203b4
Author: Megan <mchang01 at wesleyan.edu>
Date:   Mon Jun 25 18:05:44 2012 -0400

    First complete draft of proc unit testing code.
    
    Most mocked functions utilize mock_fn(), which checks for expected input
    and either returns the appropriate output given the mocking or lets the
    original function be called.  This is necessary in cases where outside
    code calls these functions and the mocked version is not able to supply
    them with the needed output.  Private methods in the stem/util/proc.py
    file are not tested, nor is is_available().  This decision was based on
    the conventions set in system.py testing code.
---
 run_tests.py           |    4 +-
 test/unit/util/proc.py |  262 ++++++++++++++++++++++++++++++++++++++++--------
 2 files changed, 220 insertions(+), 46 deletions(-)

diff --git a/run_tests.py b/run_tests.py
index 670947a..a93b20e 100755
--- a/run_tests.py
+++ b/run_tests.py
@@ -29,6 +29,7 @@ import test.unit.util.connection
 import test.unit.util.enum
 import test.unit.util.system
 import test.unit.util.tor_tools
+import test.unit.util.proc
 import test.unit.version
 import test.integ.connection.authentication
 import test.integ.connection.connect
@@ -44,7 +45,6 @@ import test.integ.util.conf
 import test.integ.util.system
 import test.integ.process
 import test.integ.version
-import test.unit.util.proc
 
 import stem.prereq
 import stem.util.conf
@@ -99,6 +99,7 @@ UNIT_TESTS = (
   test.unit.util.enum.TestEnum,
   test.unit.util.connection.TestConnection,
   test.unit.util.conf.TestConf,
+  test.unit.util.proc.TestProc,
   test.unit.util.system.TestSystem,
   test.unit.util.tor_tools.TestTorTools,
   test.unit.descriptor.reader.TestDescriptorReader,
@@ -111,7 +112,6 @@ UNIT_TESTS = (
   test.unit.response.protocolinfo.TestProtocolInfoResponse,
   test.unit.response.authchallenge.TestAuthChallengeResponse,
   test.unit.connection.authentication.TestAuthenticate,
-  test.unit.util.proc.TestProc,
 )
 
 INTEG_TESTS = (
diff --git a/test/unit/util/proc.py b/test/unit/util/proc.py
index a474800..2e3dd67 100644
--- a/test/unit/util/proc.py
+++ b/test/unit/util/proc.py
@@ -1,70 +1,244 @@
 """
-Unit testing code for proc utilities located in /stem/util/proc.py
+Unit testing code for the stem.util.proc functions.
+
 """
 import os
 import time
 import unittest
+import operator
 import functools
+import cStringIO
 
 import test.mocking as mocking
 import stem.util.proc as proc
 
-def mock_get_lines(file_path, line_prefixes, return_values):
+class TargetError(Exception):
   """
-  Provides mocking for the proc module's _get_line function.
+  Raised when a function that needs to be mocked cannot be called.
+  
+  This occurs when the caller's arguments don't match the expected arguments
+  and no target is given.
   """
-  if isinstance(line_prefixes, tuple):
-      prefix_list = sorted(list(line_prefixes))
-  else:
-      # Only one line prefix given.
-      prefix_list = line_prefixes
-  def _mock_get_lines(path, prefixes, return_values, caller_path,
-  caller_prefixes, caller_parameter):
-      if isinstance(caller_prefixes, tuple):
-          caller_prefix_list = sorted(list(caller_prefixes))
-      else:
-          #Only one line prefix given.
-          caller_prefix_list = caller_prefixes
-      if path == caller_path and prefixes == caller_prefix_list:
-          return return_values
+  def __init__(self, value):
+    self.value = value
+  def __str__(self):
+    return repr(self.value)
+
+def mock_fn(exp_args, return_vals, target=None):
+  """
+  Provides a lambda function that may be used to mock another function.
+  
+  :param list of tuples exp_args: expected input value(s) to be used for comparison
+  :param list return_vals: value(s) to be returned if expected input matches given input
+  :param function target: target function to be called if mocking doesn't cover this input
+  
+  :precondition: len(exp_args) = len(return_vals)
+  
+                                                 return_vals[i]    a=exp_args[i]
+  :returns:  function _mocker such that: f(*a) = target(*a)        a != exp_args[i] and target != None
+                                                 raise TargetError a != exp_args[i] and target = None
+  """
+  
+  def _mocked(tgt, exp_args, return_vals, *arg):
+    try:
+      #
+      i = operator.indexOf(exp_args, arg)
+    except ValueError:
+      i = -1
+    if i == -1:
+      if tgt:
+        return tgt(*arg)
       else:
-          return None
+        raise TargetError("A relevant function could not be applied")
+    else:
+      return return_vals[i]
   
-  return functools.partial(_mock_get_lines, file_path, prefix_list,
-  return_values)
+  return functools.partial(_mocked, target, exp_args, return_vals)
+
+def find_subsets(xs):
+  """
+  Used with the builtin zip function to create all possible combinations
+  of two lists. Called in test_get_stats().
+  
+  :param xs: list of tuples given by calling zip on two lists
+  
+  :returns: a list of lists of tuples
+  """
+  
+  #  Base case is the empty list.
+  if len(xs) == 0:
+    return [[]]
+  else:
+    subs = find_subsets(xs[1:])
+    subs_w_x0 = []
+    for s in subs:
+      subs_w_x0 = subs_w_x0 + [s + [xs[0]]]
+    return subs + subs_w_x0
 
 class TestProc(unittest.TestCase):
+  
+  def setUp(self):
+    mocking.mock(time.time, mocking.return_value(3.14159))
+  
   def tearDown(self):
-      mocking.revert_mocking()
+    mocking.revert_mocking()
   
   def test_get_system_start_time(self):
-      """
-      Tests the get_system_start_time function.
-      """
-      mocking.mock(proc._get_line, mock_get_lines("/proc/stat", "btime",
-      "btime 1001001"))
-      
-      self.assertEquals(1001001, proc.get_system_start_time())
+    """
+    Tests the get_system_start_time function.
+    """
+    
+    mocking.mock(proc._get_line, mock_fn([("/proc/stat", "btime", 'system start time')],
+    ["btime 1001001"]))
+    
+    # Single test as get_system_start_time takes no arguments
+    self.assertEquals(1001001, proc.get_system_start_time())
   
   def test_get_physical_memory(self):
-      """
-      Tests the get_physical_memory function.
-      """
-      mocking.mock(proc._get_line, mock_get_lines("/proc/meminfo",
-      "MemTotal:", "MemTotal:       12345 kB"))
+    """
+    Tests the get_physical_memory function.
+    """
+    
+    mocking.mock(proc._get_line, mock_fn([("/proc/meminfo",
+    "MemTotal:", 'system physical memory')], ["MemTotal:       12345 kB"]))
+    
+    self.assertEquals((12345*1024), proc.get_physical_memory())
+    
+  def test_get_cwd(self):
+    """
+    Tests the get_cwd function with a given pid.
+    """
+    
+    mocking.mock(os.readlink, mock_fn([('/proc/24019/cwd',)],
+                 ['/home/directory/TEST'], os.listdir), os)
+    
+    # Test edge case of pid = 0 and a standard pid.
+    self.assertEquals('', proc.get_cwd(0))
+    self.assertEquals('/home/directory/TEST', proc.get_cwd(24019))
+    
+    
+  def test_get_uid(self):
+    """
+    Tests the get_uid function with a given pid.
+    """
+    
+    pid_list = [(24019, 11111), (0, 22222)]
+    
+    for pid in pid_list:
+      pid_id, user_id = pid
+      status_path = "/proc/%s/status" % pid_id
+      mocking.mock(proc._get_line, mock_fn([(status_path, "Uid:", "uid")],
+                   ["Uid: %s" % user_id]))
+      self.assertEquals(user_id, proc.get_uid(pid_id))
       
-      self.assertEquals((12345*1024), proc.get_physical_memory())
-  
   def test_get_memory_usage(self):
-      """
-      Tests the get_memory_usage function.
+    """
+    Tests the get_memory_usage function with a given pid.
+    
+    This is the only function in proc.py that calls _get_lines explicitly.
+    """
+    
+    mocking.mock(proc._get_lines, mock_fn([("/proc/1111/status",
+    ("VmRSS:", "VmSize:"), 'memory usage')], [{"VmRSS:": "VmRSS: 100 kB",
+    "VmSize:":"VmSize: 1800 kB"}]))
+    
+    # Test edge case of pid = 0 and a standard pid
+    self.assertEqual((0,0), proc.get_memory_usage(0))
+    self.assertEqual((100*1024, 1800*1024), proc.get_memory_usage(1111))
+    
+  def test_get_stats(self):
+    """
+    Tests get_stats() with all combinations of stat_type arguments.
+    """
+    
+    # Need to bypass proc.Stat.<command> calls as they aren't in this scope.
+    args = ['command', 'utime', 'stime', 'start time']
+    responses = ['test_program', '0.13', '0.14', '10.21']
+    
+    stat_path = "/proc/24062/stat"
+    stat = '1 (test_program) 2 3 4 5 6 7 8 9 10 11 12 13.0 14.0 15 16 17 18 19 20 21.0 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43'
+    
+    # List of all combinations of args with respective return values.
+    subsets = find_subsets(zip(args, responses))
+    
+    mocking.mock(proc.get_system_start_time, mocking.return_value(10))
+    
+    
+    # Tests the case where no stat_types are specified.
+    mocking.mock(proc._get_line, mock_fn(
+      [(stat_path, str(24062), 'process ')], [stat]))
+    self.assertEquals((), proc.get_stats(24062))
+    
+    # Don't handle empty set of commands here. (just did above)
+    subsets.remove([])
+    
+    for sub in subsets:
+      # sub = [(arg1, resp1), (arg2, resp2),...].  We need (arg1,arg2,...)
+      # and (resp1,resp2,...).
+      arg, response = zip(*sub)
+      
+      mocking.mock(proc._get_line, mock_fn(
+          [(stat_path, str(24062), 'process %s' % ", ".join(arg))], [stat]))
+      # Iterates through each combination of commands.
+      self.assertEquals(response, proc.get_stats(24062, *arg))
+      
+    # Tests the case where pid = 0.
+    for sub in subsets:
+      arg, response = zip(*sub)
+      if 'start time' in arg:
+        response = 10
+      else:
+        response = ()
+        for a in arg:
+          if a == 'command':
+            response += ('sched',)
+          elif a == 'utime':
+            response += ('0',)
+          elif a == 'stime':
+            response += ('0',)
+      
+      mocking.mock(proc._get_line, mock_fn(
+        [('/proc/0/stat', str(0), 'process %s' % ", ".join(arg))], [stat]))
       
-      This is the only function in proc.py that utilizes
-      _get_lines explicitly.
-      """
+      self.assertEquals(response, proc.get_stats(0, *arg))
+  
+  def test_get_connections(self):
+    """
+    Tests the get_connections function along with a given pid.
+    """
+    
+    pid = 1111
+    fd_list = ['1', '2', '3', '4']
+    fd_paths = []
+    for fd in fd_list:
+      fd_paths.append(('/proc/%s/fd/%s' % (str(pid), fd),))
+    readlink_results = ['socket:[99999999]', 'socket:[IIIIIIII]', 'pipe:[30303]', 'pipe:[40404]']
+    
+    # Will be put in cStringIO wrappers and treated as files.
+    tcp = '\n 0: 11111111:1111 22222222:2222 01 44444444:44444444 55:55555555 66666666 1111 8 99999999'
+    udp = '\n A: BBBBBBBB:BBBB CCCCCCCC:CCCC DD EEEEEEEE:EEEEEEEE FF:FFFFFFFF GGGGGGGG 1111 H IIIIIIII'
+    
+    file_list = [cStringIO.StringIO(tcp), cStringIO.StringIO(udp)]
+    contents_list = [tcp, udp]
+    
+    # Mock os.listdir, os.readlink, and open.
+    mocking.mock(os.listdir, mock_fn([('/proc/%s/fd' % str(pid),)], [fd_list], os.listdir), os)
+    mocking.mock(os.readlink, mock_fn(fd_paths, readlink_results, os.readlink), os)
+    
+    mocking.mock(open, mock_fn([('/proc/net/tcp',), ('/proc/net/udp',)], file_list, open))
+    
+    # Tests the edge case of pid = 0.
+    self.assertEquals([], proc.get_connections(0))
+    
+    # First build expected responses, then call the function to be tested.
+    results = []
+    for proc_file in file_list:
+      contents = proc_file.getvalue()
       
-      mocking.mock(proc._get_lines, mock_get_lines("/proc/1111/status",
-      ("VmRSS:", "VmSize:"), {"VmRSS:": "VmRSS: 100 kB",
-      "VmSize:":"VmSize: 1800 kB"}))
+      _, l_addr, f_addr, status, _, _, _, _, _, inode = contents.split()[:10]
       
-      self.assertEqual((100*1024, 1800*1024), proc.get_memory_usage(1111))
+      local_ip, local_port = proc._decode_proc_address_encoding(l_addr)
+      foreign_ip, foreign_port = proc._decode_proc_address_encoding(f_addr)
+      results.append((local_ip, local_port, foreign_ip, foreign_port))
+    
+    self.assertEquals(results, proc.get_connections(1111))





More information about the tor-commits mailing list