commit 007cf1ae5654ac057cc56dc06364561ce1d25c58 Author: Damian Johnson atagar@torproject.org Date: Mon Jul 13 16:09:35 2020 -0700
Match loop scope to thread
Asyncio threads can be restarted, but doing so lacks a significant benefit and can get complicated. For instance, when we're stopped from an async method our loop is closed asynchronously (because we cannot join our own thread). This is fine, except that start() can subsiquently fail because we cannot resume a running loop...
Traceback (most recent call last): File "/home/atagar/Python-3.7.0/Lib/threading.py", line 917, in _bootstrap_inner self.run() File "/home/atagar/Python-3.7.0/Lib/threading.py", line 865, in run self._target(*self._args, **self._kwargs) File "/home/atagar/Python-3.7.0/Lib/asyncio/base_events.py", line 510, in run_forever raise RuntimeError('This event loop is already running') RuntimeError: This event loop is already running
By creating a new loop for each thread we not only sidestep this but simplify asynchronicity beacause each run of our class will have its own event queue. --- stem/util/__init__.py | 40 +++++++++++++++++++--------------------- test/unit/util/synchronous.py | 9 +++++++-- 2 files changed, 26 insertions(+), 23 deletions(-)
diff --git a/stem/util/__init__.py b/stem/util/__init__.py index fbd844f8..c28b0b27 100644 --- a/stem/util/__init__.py +++ b/stem/util/__init__.py @@ -204,17 +204,19 @@ class Synchronous(object): def __init__(self) -> None: self._loop = None # type: Optional[asyncio.AbstractEventLoop] self._loop_thread = None # type: Optional[threading.Thread] - self._loop_thread_lock = threading.RLock() + self._loop_lock = threading.RLock()
- # this class is a no-op when created from an asyncio context + # this class is a no-op when created within an asyncio context
self._no_op = Synchronous.is_asyncio_context()
- if not self._no_op: - self._loop = asyncio.new_event_loop() + if self._no_op: + self.__ainit__() # this is already an asyncio context + else: Synchronous.start(self)
- # call any coroutines through our loop + # Run coroutines through our loop. This calls methods by name rather than + # reference so runtime replacements (like mocks) work.
for name, func in inspect.getmembers(self): if name in ('__aiter__', '__aenter__', '__aexit__'): @@ -224,9 +226,6 @@ class Synchronous(object): elif inspect.ismethod(func) and inspect.iscoroutinefunction(func): setattr(self, name, functools.partial(self._run_async_method, name))
- if self._no_op: - self.__ainit__() # this is already an asyncio context - else: asyncio.run_coroutine_threadsafe(asyncio.coroutine(self.__ainit__)(), self._loop).result()
def __ainit__(self): @@ -277,8 +276,9 @@ class Synchronous(object): Initiate resources to make this object callable from synchronous contexts. """
- with self._loop_thread_lock: - if not self._no_op and self._loop_thread is None: + with self._loop_lock: + if not self._no_op and self._loop is None: + self._loop = asyncio.new_event_loop() self._loop_thread = threading.Thread( name = '%s asyncio' % type(self).__name__, target = self._loop.run_forever, @@ -294,13 +294,14 @@ class Synchronous(object): **RuntimeError**. """
- with self._loop_thread_lock: - if not self._no_op and self._loop_thread is not None: + with self._loop_lock: + if not self._no_op and self._loop is not None: self._loop.call_soon_threadsafe(self._loop.stop)
if threading.current_thread() != self._loop_thread: self._loop_thread.join()
+ self._loop = None self._loop_thread = None
@staticmethod @@ -330,19 +331,16 @@ class Synchronous(object): :raises: **AttributeError** if this method doesn't exist """
- # Retrieving methods by name (rather than keeping a reference) so runtime - # replacements like test mocks work. - - func = getattr(type(self), method_name) + func = getattr(type(self), method_name, None)
- if self._no_op or Synchronous.is_asyncio_context(): + if not func: + raise AttributeError("'%s' does not have a %s method" % (type(self).__name__, method_name)) + elif self._no_op or Synchronous.is_asyncio_context(): return func(self, *args, **kwargs)
- with self._loop_thread_lock: - if self._loop_thread is None: + with self._loop_lock: + if self._loop is None: raise RuntimeError('%s has been stopped' % type(self).__name__) - elif not func: - raise TypeError("'%s' does not have a %s method" % (type(self).__name__, method_name))
# convert iterator if indicated by this method's name or type hint
diff --git a/test/unit/util/synchronous.py b/test/unit/util/synchronous.py index bbf91d18..bfe6113c 100644 --- a/test/unit/util/synchronous.py +++ b/test/unit/util/synchronous.py @@ -120,15 +120,20 @@ class TestSynchronous(unittest.TestCase):
def test_stop_from_async(self): """ - Ensure we can stop our instance from within an async method without - deadlock. + Ensure we can start and stop our instance from within an async method + without deadlock. """
class AsyncStop(Synchronous): + async def restart(self): + self.stop() + self.start() + async def call_stop(self): self.stop()
instance = AsyncStop() + instance.restart() instance.call_stop() self.assertRaises(RuntimeError, instance.call_stop)