[tor-commits] [stem/master] Add DaemonTask class

atagar at torproject.org atagar at torproject.org
Sat Aug 19 18:47:37 UTC 2017


commit a7f6990c4ba3cb6348c32084abbd16e4cbadd0c8
Author: Damian Johnson <atagar at torproject.org>
Date:   Fri Aug 18 05:13:53 2017 -0700

    Add DaemonTask class
    
    To improve our test runtime we started using the multithreading module to
    parallelize. It's clunky, but with this little helper it's easy to split
    off long-running tasks.
    
    This can improve more than just our tests. First candidate that comes to
    mind is nyx's resource usage fetches so it doesn't block the draw loop.
    Either way, having a tested library class is better than keeping this
    hidden in our test helpers.
---
 stem/util/system.py       | 111 ++++++++++++++++++++++++++++++++++++++++++++++
 test/integ/util/system.py |  35 +++++++++++++++
 2 files changed, 146 insertions(+)

diff --git a/stem/util/system.py b/stem/util/system.py
index 6d966723..6301c67b 100644
--- a/stem/util/system.py
+++ b/stem/util/system.py
@@ -45,12 +45,28 @@ best-effort, providing **None** if the lookup fails.
 
   get_process_name - provides our process' name
   set_process_name - changes our process' name
+
+.. data:: Status (enum)
+
+  State of a subprocess.
+
+  .. versionadded:: 1.6.0
+
+  ====================  ===========
+  Status                Description
+  ====================  ===========
+  PENDING               not yet started
+  RUNNING               currently being performed
+  DONE                  completed successfully
+  FAILED                failed with an exception
+  ====================  ===========
 """
 
 import ctypes
 import ctypes.util
 import distutils.spawn
 import mimetypes
+import multiprocessing
 import os
 import platform
 import re
@@ -59,12 +75,20 @@ import tarfile
 import threading
 import time
 
+import stem.util.enum
 import stem.util.proc
 import stem.util.str_tools
 
 from stem import UNDEFINED
 from stem.util import str_type, log
 
+State = stem.util.enum.UppercaseEnum(
+  'PENDING',
+  'RUNNING',
+  'DONE',
+  'FAILED',
+)
+
 # Mapping of commands to if they're available or not.
 
 CMD_AVAILABLE_CACHE = {}
@@ -180,6 +204,93 @@ class CallTimeoutError(CallError):
     self.timeout = timeout
 
 
+class DaemonTask(object):
+  """
+  Invokes the given function in a subprocess, returning the value.
+
+  .. versionadded:: 1.6.0
+
+  :var function runner: function to be invoked by the subprocess
+  :var tuple args: arguments to provide to the subprocess
+  :var int priority: subprocess nice priority
+
+  :var stem.util.system.State status: state of the subprocess
+  :var float runtime: seconds subprocess took to complete
+  :var object result: return value of subprocess if successful
+  :var exception error: exception raised by subprocess if it failed
+  """
+
+  def __init__(self, runner, args = None, priority = 15, start = False):
+    self.runner = runner
+    self.args = args
+    self.priority = priority
+
+    self.status = State.PENDING
+    self.runtime = None
+    self.result = None
+    self.error = None
+
+    self._process = None
+    self._pipe = None
+
+    if start:
+      self.run()
+
+  def run(self):
+    """
+    Invokes the task if it hasn't already been started. If it has this is a
+    no-op.
+    """
+
+    if self.status == State.PENDING:
+      self._pipe, child_pipe = multiprocessing.Pipe()
+      self._process = multiprocessing.Process(target = DaemonTask._run_wrapper, args = (child_pipe, self.priority, self.runner, self.args))
+      self._process.start()
+      self.status = State.RUNNING
+
+  def join(self):
+    """
+    Provides the result of the daemon task. If still running this blocks until
+    the task is completed.
+
+    :returns: response of the function we ran
+
+    :raises: exception raised by the function if it failed with one
+    """
+
+    if self.status == State.RUNNING:
+      response = self._pipe.recv()
+      self._process.join()
+
+      self.status = response[0]
+      self.runtime = response[1]
+
+      if self.status == State.DONE:
+        self.result = response[2]
+      elif self.status == State.FAILED:
+        self.error = response[2]
+
+    if self.status == State.DONE:
+      return self.result
+    elif self.status == State.FAILED:
+      raise self.error
+    else:
+      raise RuntimeError('BUG: unexpected status from daemon task, %s' % self.status)
+
+  @staticmethod
+  def _run_wrapper(conn, priority, runner, args):
+    start_time = time.time()
+    os.nice(priority)
+
+    try:
+      result = runner(*args) if args else runner()
+      conn.send((State.DONE, time.time() - start_time, result))
+    except Exception as exc:
+      conn.send((State.FAILED, time.time() - start_time, exc))
+    finally:
+      conn.close()
+
+
 def is_windows():
   """
   Checks if we are running on Windows.
diff --git a/test/integ/util/system.py b/test/integ/util/system.py
index 818b6392..6a310faa 100644
--- a/test/integ/util/system.py
+++ b/test/integ/util/system.py
@@ -13,6 +13,8 @@ import stem.util.system
 import test.require
 import test.runner
 
+from stem.util.system import State, DaemonTask
+
 try:
   # added in python 3.3
   from unittest.mock import Mock, patch
@@ -70,6 +72,39 @@ require_path = test.require.needs(lambda: 'PATH' in os.environ, 'requires PATH')
 
 
 class TestSystem(unittest.TestCase):
+  def test_daemon_task_when_successful(self):
+    """
+    Checks a simple, successfully DaemonTask that simply echos a value.
+    """
+
+    task = DaemonTask(lambda arg: arg, ('hello world',))
+
+    self.assertEqual(None, task.result)
+    self.assertEqual(State.PENDING, task.status)
+
+    task.run()
+    self.assertEqual('hello world', task.join())
+    self.assertEqual(State.DONE, task.status)
+    self.assertTrue(0 < task.runtime < 1.0)
+
+  def test_daemon_task_on_failure(self):
+    """
+    Checks DaemonTask when an exception is raised.
+    """
+
+    def _test_task(arg):
+      raise RuntimeError(arg)
+
+    task = DaemonTask(_test_task, ('hello world',))
+
+    self.assertEqual(None, task.result)
+    self.assertEqual(State.PENDING, task.status)
+
+    task.run()
+    self.assertRaisesRegexp(RuntimeError, 'hello world', task.join)
+    self.assertEqual(State.FAILED, task.status)
+    self.assertTrue(0 < task.runtime < 1.0)
+
   @require_path
   def test_is_available(self):
     """





More information about the tor-commits mailing list