[tor-commits] [stem/master] Match loop scope to thread

atagar at torproject.org atagar at torproject.org
Thu Jul 16 01:29:00 UTC 2020


commit 007cf1ae5654ac057cc56dc06364561ce1d25c58
Author: Damian Johnson <atagar at torproject.org>
Date:   Mon Jul 13 16:09:35 2020 -0700

    Match loop scope to thread
    
    Asyncio threads can be restarted, but doing so lacks a significant benefit and
    can get complicated. For instance, when we're stopped from an async method our
    loop is closed asynchronously (because we cannot join our own thread). This is
    fine, except that start() can subsiquently fail because we cannot resume a
    running loop...
    
      Traceback (most recent call last):
        File "/home/atagar/Python-3.7.0/Lib/threading.py", line 917, in _bootstrap_inner
          self.run()
        File "/home/atagar/Python-3.7.0/Lib/threading.py", line 865, in run
          self._target(*self._args, **self._kwargs)
        File "/home/atagar/Python-3.7.0/Lib/asyncio/base_events.py", line 510, in run_forever
          raise RuntimeError('This event loop is already running')
      RuntimeError: This event loop is already running
    
    By creating a new loop for each thread we not only sidestep this but simplify
    asynchronicity beacause each run of our class will have its own event queue.
---
 stem/util/__init__.py         | 40 +++++++++++++++++++---------------------
 test/unit/util/synchronous.py |  9 +++++++--
 2 files changed, 26 insertions(+), 23 deletions(-)

diff --git a/stem/util/__init__.py b/stem/util/__init__.py
index fbd844f8..c28b0b27 100644
--- a/stem/util/__init__.py
+++ b/stem/util/__init__.py
@@ -204,17 +204,19 @@ class Synchronous(object):
   def __init__(self) -> None:
     self._loop = None  # type: Optional[asyncio.AbstractEventLoop]
     self._loop_thread = None  # type: Optional[threading.Thread]
-    self._loop_thread_lock = threading.RLock()
+    self._loop_lock = threading.RLock()
 
-    # this class is a no-op when created from an asyncio context
+    # this class is a no-op when created within an asyncio context
 
     self._no_op = Synchronous.is_asyncio_context()
 
-    if not self._no_op:
-      self._loop = asyncio.new_event_loop()
+    if self._no_op:
+      self.__ainit__()  # this is already an asyncio context
+    else:
       Synchronous.start(self)
 
-      # call any coroutines through our loop
+      # Run coroutines through our loop. This calls methods by name rather than
+      # reference so runtime replacements (like mocks) work.
 
       for name, func in inspect.getmembers(self):
         if name in ('__aiter__', '__aenter__', '__aexit__'):
@@ -224,9 +226,6 @@ class Synchronous(object):
         elif inspect.ismethod(func) and inspect.iscoroutinefunction(func):
           setattr(self, name, functools.partial(self._run_async_method, name))
 
-    if self._no_op:
-      self.__ainit__()  # this is already an asyncio context
-    else:
       asyncio.run_coroutine_threadsafe(asyncio.coroutine(self.__ainit__)(), self._loop).result()
 
   def __ainit__(self):
@@ -277,8 +276,9 @@ class Synchronous(object):
     Initiate resources to make this object callable from synchronous contexts.
     """
 
-    with self._loop_thread_lock:
-      if not self._no_op and self._loop_thread is None:
+    with self._loop_lock:
+      if not self._no_op and self._loop is None:
+        self._loop = asyncio.new_event_loop()
         self._loop_thread = threading.Thread(
           name = '%s asyncio' % type(self).__name__,
           target = self._loop.run_forever,
@@ -294,13 +294,14 @@ class Synchronous(object):
     **RuntimeError**.
     """
 
-    with self._loop_thread_lock:
-      if not self._no_op and self._loop_thread is not None:
+    with self._loop_lock:
+      if not self._no_op and self._loop is not None:
         self._loop.call_soon_threadsafe(self._loop.stop)
 
         if threading.current_thread() != self._loop_thread:
           self._loop_thread.join()
 
+        self._loop = None
         self._loop_thread = None
 
   @staticmethod
@@ -330,19 +331,16 @@ class Synchronous(object):
     :raises: **AttributeError** if this method doesn't exist
     """
 
-    # Retrieving methods by name (rather than keeping a reference) so runtime
-    # replacements like test mocks work.
-
-    func = getattr(type(self), method_name)
+    func = getattr(type(self), method_name, None)
 
-    if self._no_op or Synchronous.is_asyncio_context():
+    if not func:
+      raise AttributeError("'%s' does not have a %s method" % (type(self).__name__, method_name))
+    elif self._no_op or Synchronous.is_asyncio_context():
       return func(self, *args, **kwargs)
 
-    with self._loop_thread_lock:
-      if self._loop_thread is None:
+    with self._loop_lock:
+      if self._loop is None:
         raise RuntimeError('%s has been stopped' % type(self).__name__)
-      elif not func:
-        raise TypeError("'%s' does not have a %s method" % (type(self).__name__, method_name))
 
       # convert iterator if indicated by this method's name or type hint
 
diff --git a/test/unit/util/synchronous.py b/test/unit/util/synchronous.py
index bbf91d18..bfe6113c 100644
--- a/test/unit/util/synchronous.py
+++ b/test/unit/util/synchronous.py
@@ -120,15 +120,20 @@ class TestSynchronous(unittest.TestCase):
 
   def test_stop_from_async(self):
     """
-    Ensure we can stop our instance from within an async method without
-    deadlock.
+    Ensure we can start and stop our instance from within an async method
+    without deadlock.
     """
 
     class AsyncStop(Synchronous):
+      async def restart(self):
+        self.stop()
+        self.start()
+
       async def call_stop(self):
         self.stop()
 
     instance = AsyncStop()
+    instance.restart()
     instance.call_stop()
     self.assertRaises(RuntimeError, instance.call_stop)
 





More information about the tor-commits mailing list