[tor-commits] [stem/master] Synchronous mixin

atagar at torproject.org atagar at torproject.org
Thu Jul 16 01:29:00 UTC 2020


commit 1cbf3397ccbb77ea7df35109839a522dcf03556b
Author: Damian Johnson <atagar at torproject.org>
Date:   Sat Jun 20 16:31:17 2020 -0700

    Synchronous mixin
    
    Illia's AsyncClassWrapper does the trick but I think we can make this more
    transparent. Lets try a mixin that overwrites asyncio methods dynamically.
    
    Earlier I added a AsyncClassWrapper.__del__() method to clean itself up,
    but doing so was a mistake. When our Python interpreter shuts down asyncio
    closes its scheduler *before* this method invokes, which makes join() hang
    because loop.stop() never runs.
    
    To avoid these deadlocks we need Synchronous (or AsyncClassWrapper) users to
    explicitly close the class themself.
---
 stem/util/__init__.py         | 89 +++++++++++++++++++++++++++++++++++++++++++
 test/settings.cfg             |  1 +
 test/unit/util/synchronous.py | 54 ++++++++++++++++++++++++++
 3 files changed, 144 insertions(+)

diff --git a/stem/util/__init__.py b/stem/util/__init__.py
index 25282b99..72239273 100644
--- a/stem/util/__init__.py
+++ b/stem/util/__init__.py
@@ -7,6 +7,8 @@ Utility functions used by the stem library.
 
 import asyncio
 import datetime
+import functools
+import inspect
 import threading
 from concurrent.futures import Future
 
@@ -144,6 +146,93 @@ def _hash_attr(obj: Any, *attributes: str, **kwargs: Any):
   return my_hash
 
 
+class Synchronous(object):
+  """
+  Mixin that lets a class be called from both synchronous and asynchronous
+  contexts.
+
+  ::
+
+    class Example(Synchronous):
+      async def hello(self):
+        return 'hello'
+
+    def sync_demo():
+      instance = Example()
+      print('%s from a synchronous context' % instance.hello())
+      instance.close()
+
+    async def async_demo():
+      instance = Example()
+      print('%s from an asynchronous context' % await instance.hello())
+      instance.close()
+
+    sync_demo()
+    asyncio.run(async_demo())
+
+  Users are responsible for calling :func:`~stem.util.Synchronous.close` when
+  finished to clean up underlying resources.
+  """
+
+  def __init__(self):
+    self._loop = asyncio.new_event_loop()
+    self._loop_lock = threading.RLock()
+    self._loop_thread = threading.Thread(
+      name = '%s asyncio' % self.__class__.__name__,
+      target = self._loop.run_forever,
+      daemon = True,
+    )
+
+    self._is_closed = False
+
+    # overwrite asynchronous class methods with instance methods that can be
+    # called from either context
+
+    def wrap(func, *args, **kwargs):
+      if Synchronous.is_asyncio_context():
+        return func(*args, **kwargs)
+      else:
+        with self._loop_lock:
+          if self._is_closed:
+            raise RuntimeError('%s has been closed' % type(self).__name__)
+          elif not self._loop_thread.is_alive():
+            self._loop_thread.start()
+
+          return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), self._loop).result()
+
+    for method_name, func in inspect.getmembers(self, predicate = inspect.ismethod):
+      if inspect.iscoroutinefunction(func):
+        setattr(self, method_name, functools.partial(wrap, func))
+
+  def close(self):
+    """
+    Terminate resources that permits this from being callable from synchronous
+    contexts. Once called any further synchronous invocations will fail with a
+    **RuntimeError**.
+    """
+
+    with self._loop_lock:
+      if self._loop_thread.is_alive():
+        self._loop.call_soon_threadsafe(self._loop.stop)
+        self._loop_thread.join()
+
+      self._is_closed = True
+
+  @staticmethod
+  def is_asyncio_context():
+    """
+    Check if running within a synchronous or asynchronous context.
+
+    :returns: **True** if within an asyncio conext, **False** otherwise
+    """
+
+    try:
+      asyncio.get_running_loop()
+      return True
+    except RuntimeError:
+      return False
+
+
 class AsyncClassWrapper:
   _loop: asyncio.AbstractEventLoop
   _loop_thread: threading.Thread
diff --git a/test/settings.cfg b/test/settings.cfg
index 51109f96..fcef5ec1 100644
--- a/test/settings.cfg
+++ b/test/settings.cfg
@@ -248,6 +248,7 @@ test.unit_tests
 |test.unit.util.system.TestSystem
 |test.unit.util.term.TestTerminal
 |test.unit.util.tor_tools.TestTorTools
+|test.unit.util.synchronous.TestSynchronous
 |test.unit.util.__init__.TestBaseUtil
 |test.unit.installation.TestInstallation
 |test.unit.descriptor.descriptor.TestDescriptor
diff --git a/test/unit/util/synchronous.py b/test/unit/util/synchronous.py
new file mode 100644
index 00000000..26dad98d
--- /dev/null
+++ b/test/unit/util/synchronous.py
@@ -0,0 +1,54 @@
+"""
+Unit tests for the stem.util.Synchronous class.
+"""
+
+import asyncio
+import io
+import unittest
+
+from unittest.mock import patch
+
+from stem.util import Synchronous
+
+EXAMPLE_OUTPUT = """\
+hello from a synchronous context
+hello from an asynchronous context
+"""
+
+
+class Example(Synchronous):
+  async def hello(self):
+    return 'hello'
+
+
+class TestSynchronous(unittest.TestCase):
+  @patch('sys.stdout', new_callable = io.StringIO)
+  def test_example(self, stdout_mock):
+    def sync_demo():
+      instance = Example()
+      print('%s from a synchronous context' % instance.hello())
+      instance.close()
+
+    async def async_demo():
+      instance = Example()
+      print('%s from an asynchronous context' % await instance.hello())
+      instance.close()
+
+    sync_demo()
+    asyncio.run(async_demo())
+
+    self.assertEqual(EXAMPLE_OUTPUT, stdout_mock.getvalue())
+
+  def test_after_close(self):
+    # close a used instance
+
+    instance = Example()
+    self.assertEqual('hello', instance.hello())
+    instance.close()
+    self.assertRaises(RuntimeError, instance.hello)
+
+    # close an unused instance
+
+    instance = Example()
+    instance.close()
+    self.assertRaises(RuntimeError, instance.hello)





More information about the tor-commits mailing list