commit a7f6990c4ba3cb6348c32084abbd16e4cbadd0c8 Author: Damian Johnson atagar@torproject.org Date: Fri Aug 18 05:13:53 2017 -0700
Add DaemonTask class
To improve our test runtime we started using the multithreading module to parallelize. It's clunky, but with this little helper it's easy to split off long-running tasks.
This can improve more than just our tests. First candidate that comes to mind is nyx's resource usage fetches so it doesn't block the draw loop. Either way, having a tested library class is better than keeping this hidden in our test helpers. --- stem/util/system.py | 111 ++++++++++++++++++++++++++++++++++++++++++++++ test/integ/util/system.py | 35 +++++++++++++++ 2 files changed, 146 insertions(+)
diff --git a/stem/util/system.py b/stem/util/system.py index 6d966723..6301c67b 100644 --- a/stem/util/system.py +++ b/stem/util/system.py @@ -45,12 +45,28 @@ best-effort, providing **None** if the lookup fails.
get_process_name - provides our process' name set_process_name - changes our process' name + +.. data:: Status (enum) + + State of a subprocess. + + .. versionadded:: 1.6.0 + + ==================== =========== + Status Description + ==================== =========== + PENDING not yet started + RUNNING currently being performed + DONE completed successfully + FAILED failed with an exception + ==================== =========== """
import ctypes import ctypes.util import distutils.spawn import mimetypes +import multiprocessing import os import platform import re @@ -59,12 +75,20 @@ import tarfile import threading import time
+import stem.util.enum import stem.util.proc import stem.util.str_tools
from stem import UNDEFINED from stem.util import str_type, log
+State = stem.util.enum.UppercaseEnum( + 'PENDING', + 'RUNNING', + 'DONE', + 'FAILED', +) + # Mapping of commands to if they're available or not.
CMD_AVAILABLE_CACHE = {} @@ -180,6 +204,93 @@ class CallTimeoutError(CallError): self.timeout = timeout
+class DaemonTask(object): + """ + Invokes the given function in a subprocess, returning the value. + + .. versionadded:: 1.6.0 + + :var function runner: function to be invoked by the subprocess + :var tuple args: arguments to provide to the subprocess + :var int priority: subprocess nice priority + + :var stem.util.system.State status: state of the subprocess + :var float runtime: seconds subprocess took to complete + :var object result: return value of subprocess if successful + :var exception error: exception raised by subprocess if it failed + """ + + def __init__(self, runner, args = None, priority = 15, start = False): + self.runner = runner + self.args = args + self.priority = priority + + self.status = State.PENDING + self.runtime = None + self.result = None + self.error = None + + self._process = None + self._pipe = None + + if start: + self.run() + + def run(self): + """ + Invokes the task if it hasn't already been started. If it has this is a + no-op. + """ + + if self.status == State.PENDING: + self._pipe, child_pipe = multiprocessing.Pipe() + self._process = multiprocessing.Process(target = DaemonTask._run_wrapper, args = (child_pipe, self.priority, self.runner, self.args)) + self._process.start() + self.status = State.RUNNING + + def join(self): + """ + Provides the result of the daemon task. If still running this blocks until + the task is completed. + + :returns: response of the function we ran + + :raises: exception raised by the function if it failed with one + """ + + if self.status == State.RUNNING: + response = self._pipe.recv() + self._process.join() + + self.status = response[0] + self.runtime = response[1] + + if self.status == State.DONE: + self.result = response[2] + elif self.status == State.FAILED: + self.error = response[2] + + if self.status == State.DONE: + return self.result + elif self.status == State.FAILED: + raise self.error + else: + raise RuntimeError('BUG: unexpected status from daemon task, %s' % self.status) + + @staticmethod + def _run_wrapper(conn, priority, runner, args): + start_time = time.time() + os.nice(priority) + + try: + result = runner(*args) if args else runner() + conn.send((State.DONE, time.time() - start_time, result)) + except Exception as exc: + conn.send((State.FAILED, time.time() - start_time, exc)) + finally: + conn.close() + + def is_windows(): """ Checks if we are running on Windows. diff --git a/test/integ/util/system.py b/test/integ/util/system.py index 818b6392..6a310faa 100644 --- a/test/integ/util/system.py +++ b/test/integ/util/system.py @@ -13,6 +13,8 @@ import stem.util.system import test.require import test.runner
+from stem.util.system import State, DaemonTask + try: # added in python 3.3 from unittest.mock import Mock, patch @@ -70,6 +72,39 @@ require_path = test.require.needs(lambda: 'PATH' in os.environ, 'requires PATH')
class TestSystem(unittest.TestCase): + def test_daemon_task_when_successful(self): + """ + Checks a simple, successfully DaemonTask that simply echos a value. + """ + + task = DaemonTask(lambda arg: arg, ('hello world',)) + + self.assertEqual(None, task.result) + self.assertEqual(State.PENDING, task.status) + + task.run() + self.assertEqual('hello world', task.join()) + self.assertEqual(State.DONE, task.status) + self.assertTrue(0 < task.runtime < 1.0) + + def test_daemon_task_on_failure(self): + """ + Checks DaemonTask when an exception is raised. + """ + + def _test_task(arg): + raise RuntimeError(arg) + + task = DaemonTask(_test_task, ('hello world',)) + + self.assertEqual(None, task.result) + self.assertEqual(State.PENDING, task.status) + + task.run() + self.assertRaisesRegexp(RuntimeError, 'hello world', task.join) + self.assertEqual(State.FAILED, task.status) + self.assertTrue(0 < task.runtime < 1.0) + @require_path def test_is_available(self): """