commit 1cbf3397ccbb77ea7df35109839a522dcf03556b Author: Damian Johnson atagar@torproject.org Date: Sat Jun 20 16:31:17 2020 -0700
Synchronous mixin
Illia's AsyncClassWrapper does the trick but I think we can make this more transparent. Lets try a mixin that overwrites asyncio methods dynamically.
Earlier I added a AsyncClassWrapper.__del__() method to clean itself up, but doing so was a mistake. When our Python interpreter shuts down asyncio closes its scheduler *before* this method invokes, which makes join() hang because loop.stop() never runs.
To avoid these deadlocks we need Synchronous (or AsyncClassWrapper) users to explicitly close the class themself. --- stem/util/__init__.py | 89 +++++++++++++++++++++++++++++++++++++++++++ test/settings.cfg | 1 + test/unit/util/synchronous.py | 54 ++++++++++++++++++++++++++ 3 files changed, 144 insertions(+)
diff --git a/stem/util/__init__.py b/stem/util/__init__.py index 25282b99..72239273 100644 --- a/stem/util/__init__.py +++ b/stem/util/__init__.py @@ -7,6 +7,8 @@ Utility functions used by the stem library.
import asyncio import datetime +import functools +import inspect import threading from concurrent.futures import Future
@@ -144,6 +146,93 @@ def _hash_attr(obj: Any, *attributes: str, **kwargs: Any): return my_hash
+class Synchronous(object): + """ + Mixin that lets a class be called from both synchronous and asynchronous + contexts. + + :: + + class Example(Synchronous): + async def hello(self): + return 'hello' + + def sync_demo(): + instance = Example() + print('%s from a synchronous context' % instance.hello()) + instance.close() + + async def async_demo(): + instance = Example() + print('%s from an asynchronous context' % await instance.hello()) + instance.close() + + sync_demo() + asyncio.run(async_demo()) + + Users are responsible for calling :func:`~stem.util.Synchronous.close` when + finished to clean up underlying resources. + """ + + def __init__(self): + self._loop = asyncio.new_event_loop() + self._loop_lock = threading.RLock() + self._loop_thread = threading.Thread( + name = '%s asyncio' % self.__class__.__name__, + target = self._loop.run_forever, + daemon = True, + ) + + self._is_closed = False + + # overwrite asynchronous class methods with instance methods that can be + # called from either context + + def wrap(func, *args, **kwargs): + if Synchronous.is_asyncio_context(): + return func(*args, **kwargs) + else: + with self._loop_lock: + if self._is_closed: + raise RuntimeError('%s has been closed' % type(self).__name__) + elif not self._loop_thread.is_alive(): + self._loop_thread.start() + + return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), self._loop).result() + + for method_name, func in inspect.getmembers(self, predicate = inspect.ismethod): + if inspect.iscoroutinefunction(func): + setattr(self, method_name, functools.partial(wrap, func)) + + def close(self): + """ + Terminate resources that permits this from being callable from synchronous + contexts. Once called any further synchronous invocations will fail with a + **RuntimeError**. + """ + + with self._loop_lock: + if self._loop_thread.is_alive(): + self._loop.call_soon_threadsafe(self._loop.stop) + self._loop_thread.join() + + self._is_closed = True + + @staticmethod + def is_asyncio_context(): + """ + Check if running within a synchronous or asynchronous context. + + :returns: **True** if within an asyncio conext, **False** otherwise + """ + + try: + asyncio.get_running_loop() + return True + except RuntimeError: + return False + + class AsyncClassWrapper: _loop: asyncio.AbstractEventLoop _loop_thread: threading.Thread diff --git a/test/settings.cfg b/test/settings.cfg index 51109f96..fcef5ec1 100644 --- a/test/settings.cfg +++ b/test/settings.cfg @@ -248,6 +248,7 @@ test.unit_tests |test.unit.util.system.TestSystem |test.unit.util.term.TestTerminal |test.unit.util.tor_tools.TestTorTools +|test.unit.util.synchronous.TestSynchronous |test.unit.util.__init__.TestBaseUtil |test.unit.installation.TestInstallation |test.unit.descriptor.descriptor.TestDescriptor diff --git a/test/unit/util/synchronous.py b/test/unit/util/synchronous.py new file mode 100644 index 00000000..26dad98d --- /dev/null +++ b/test/unit/util/synchronous.py @@ -0,0 +1,54 @@ +""" +Unit tests for the stem.util.Synchronous class. +""" + +import asyncio +import io +import unittest + +from unittest.mock import patch + +from stem.util import Synchronous + +EXAMPLE_OUTPUT = """\ +hello from a synchronous context +hello from an asynchronous context +""" + + +class Example(Synchronous): + async def hello(self): + return 'hello' + + +class TestSynchronous(unittest.TestCase): + @patch('sys.stdout', new_callable = io.StringIO) + def test_example(self, stdout_mock): + def sync_demo(): + instance = Example() + print('%s from a synchronous context' % instance.hello()) + instance.close() + + async def async_demo(): + instance = Example() + print('%s from an asynchronous context' % await instance.hello()) + instance.close() + + sync_demo() + asyncio.run(async_demo()) + + self.assertEqual(EXAMPLE_OUTPUT, stdout_mock.getvalue()) + + def test_after_close(self): + # close a used instance + + instance = Example() + self.assertEqual('hello', instance.hello()) + instance.close() + self.assertRaises(RuntimeError, instance.hello) + + # close an unused instance + + instance = Example() + instance.close() + self.assertRaises(RuntimeError, instance.hello)