commit a7f6990c4ba3cb6348c32084abbd16e4cbadd0c8
Author: Damian Johnson <atagar(a)torproject.org>
Date: Fri Aug 18 05:13:53 2017 -0700
Add DaemonTask class
To improve our test runtime we started using the multithreading module to
parallelize. It's clunky, but with this little helper it's easy to split
off long-running tasks.
This can improve more than just our tests. First candidate that comes to
mind is nyx's resource usage fetches so it doesn't block the draw loop.
Either way, having a tested library class is better than keeping this
hidden in our test helpers.
---
stem/util/system.py | 111 ++++++++++++++++++++++++++++++++++++++++++++++
test/integ/util/system.py | 35 +++++++++++++++
2 files changed, 146 insertions(+)
diff --git a/stem/util/system.py b/stem/util/system.py
index 6d966723..6301c67b 100644
--- a/stem/util/system.py
+++ b/stem/util/system.py
@@ -45,12 +45,28 @@ best-effort, providing **None** if the lookup fails.
get_process_name - provides our process' name
set_process_name - changes our process' name
+
+.. data:: Status (enum)
+
+ State of a subprocess.
+
+ .. versionadded:: 1.6.0
+
+ ==================== ===========
+ Status Description
+ ==================== ===========
+ PENDING not yet started
+ RUNNING currently being performed
+ DONE completed successfully
+ FAILED failed with an exception
+ ==================== ===========
"""
import ctypes
import ctypes.util
import distutils.spawn
import mimetypes
+import multiprocessing
import os
import platform
import re
@@ -59,12 +75,20 @@ import tarfile
import threading
import time
+import stem.util.enum
import stem.util.proc
import stem.util.str_tools
from stem import UNDEFINED
from stem.util import str_type, log
+State = stem.util.enum.UppercaseEnum(
+ 'PENDING',
+ 'RUNNING',
+ 'DONE',
+ 'FAILED',
+)
+
# Mapping of commands to if they're available or not.
CMD_AVAILABLE_CACHE = {}
@@ -180,6 +204,93 @@ class CallTimeoutError(CallError):
self.timeout = timeout
+class DaemonTask(object):
+ """
+ Invokes the given function in a subprocess, returning the value.
+
+ .. versionadded:: 1.6.0
+
+ :var function runner: function to be invoked by the subprocess
+ :var tuple args: arguments to provide to the subprocess
+ :var int priority: subprocess nice priority
+
+ :var stem.util.system.State status: state of the subprocess
+ :var float runtime: seconds subprocess took to complete
+ :var object result: return value of subprocess if successful
+ :var exception error: exception raised by subprocess if it failed
+ """
+
+ def __init__(self, runner, args = None, priority = 15, start = False):
+ self.runner = runner
+ self.args = args
+ self.priority = priority
+
+ self.status = State.PENDING
+ self.runtime = None
+ self.result = None
+ self.error = None
+
+ self._process = None
+ self._pipe = None
+
+ if start:
+ self.run()
+
+ def run(self):
+ """
+ Invokes the task if it hasn't already been started. If it has this is a
+ no-op.
+ """
+
+ if self.status == State.PENDING:
+ self._pipe, child_pipe = multiprocessing.Pipe()
+ self._process = multiprocessing.Process(target = DaemonTask._run_wrapper, args = (child_pipe, self.priority, self.runner, self.args))
+ self._process.start()
+ self.status = State.RUNNING
+
+ def join(self):
+ """
+ Provides the result of the daemon task. If still running this blocks until
+ the task is completed.
+
+ :returns: response of the function we ran
+
+ :raises: exception raised by the function if it failed with one
+ """
+
+ if self.status == State.RUNNING:
+ response = self._pipe.recv()
+ self._process.join()
+
+ self.status = response[0]
+ self.runtime = response[1]
+
+ if self.status == State.DONE:
+ self.result = response[2]
+ elif self.status == State.FAILED:
+ self.error = response[2]
+
+ if self.status == State.DONE:
+ return self.result
+ elif self.status == State.FAILED:
+ raise self.error
+ else:
+ raise RuntimeError('BUG: unexpected status from daemon task, %s' % self.status)
+
+ @staticmethod
+ def _run_wrapper(conn, priority, runner, args):
+ start_time = time.time()
+ os.nice(priority)
+
+ try:
+ result = runner(*args) if args else runner()
+ conn.send((State.DONE, time.time() - start_time, result))
+ except Exception as exc:
+ conn.send((State.FAILED, time.time() - start_time, exc))
+ finally:
+ conn.close()
+
+
def is_windows():
"""
Checks if we are running on Windows.
diff --git a/test/integ/util/system.py b/test/integ/util/system.py
index 818b6392..6a310faa 100644
--- a/test/integ/util/system.py
+++ b/test/integ/util/system.py
@@ -13,6 +13,8 @@ import stem.util.system
import test.require
import test.runner
+from stem.util.system import State, DaemonTask
+
try:
# added in python 3.3
from unittest.mock import Mock, patch
@@ -70,6 +72,39 @@ require_path = test.require.needs(lambda: 'PATH' in os.environ, 'requires PATH')
class TestSystem(unittest.TestCase):
+ def test_daemon_task_when_successful(self):
+ """
+ Checks a simple, successfully DaemonTask that simply echos a value.
+ """
+
+ task = DaemonTask(lambda arg: arg, ('hello world',))
+
+ self.assertEqual(None, task.result)
+ self.assertEqual(State.PENDING, task.status)
+
+ task.run()
+ self.assertEqual('hello world', task.join())
+ self.assertEqual(State.DONE, task.status)
+ self.assertTrue(0 < task.runtime < 1.0)
+
+ def test_daemon_task_on_failure(self):
+ """
+ Checks DaemonTask when an exception is raised.
+ """
+
+ def _test_task(arg):
+ raise RuntimeError(arg)
+
+ task = DaemonTask(_test_task, ('hello world',))
+
+ self.assertEqual(None, task.result)
+ self.assertEqual(State.PENDING, task.status)
+
+ task.run()
+ self.assertRaisesRegexp(RuntimeError, 'hello world', task.join)
+ self.assertEqual(State.FAILED, task.status)
+ self.assertTrue(0 < task.runtime < 1.0)
+
@require_path
def test_is_available(self):
"""