[tor-commits] [stem/master] Move Synchronous to its own module

atagar at torproject.org atagar at torproject.org
Sat Jul 25 23:13:20 UTC 2020


commit c1bad9e9b249388ee9c6e29b0cb75da0ad6eea64
Author: Damian Johnson <atagar at torproject.org>
Date:   Sat Jul 25 15:59:54 2020 -0700

    Move Synchronous to its own module
    
    This class has grown sophisticated enough that it deserves its own module.
    Also, I'd like to discuss this with the wider python community and this will
    make it easier to cite.
---
 stem/control.py                               |   3 +-
 stem/descriptor/remote.py                     |   3 +-
 stem/util/__init__.py                         | 232 +------------------------
 stem/util/asyncio.py                          | 238 ++++++++++++++++++++++++++
 test/settings.cfg                             |   2 +-
 test/unit/util/__init__.py                    |   1 +
 test/unit/util/{synchronous.py => asyncio.py} |   8 +-
 7 files changed, 250 insertions(+), 237 deletions(-)

diff --git a/stem/control.py b/stem/control.py
index 47b8a37c..578a471d 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -269,7 +269,8 @@ import stem.util.tor_tools
 import stem.version
 
 from stem import UNDEFINED, CircStatus, Signal
-from stem.util import Synchronous, log
+from stem.util import log
+from stem.util.asyncio import Synchronous
 from types import TracebackType
 from typing import Any, AsyncIterator, Awaitable, Callable, Dict, List, Mapping, Optional, Sequence, Set, Tuple, Type, Union
 
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index ff4b1403..2e9ec641 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -99,7 +99,8 @@ import stem.util.enum
 import stem.util.tor_tools
 
 from stem.descriptor import Compression
-from stem.util import Synchronous, log, str_tools
+from stem.util import log, str_tools
+from stem.util.asyncio import Synchronous
 from typing import Any, AsyncIterator, Dict, List, Optional, Sequence, Tuple, Union
 
 # Tor has a limited number of descriptors we can fetch explicitly by their
diff --git a/stem/util/__init__.py b/stem/util/__init__.py
index c525dd6f..7d5ca853 100644
--- a/stem/util/__init__.py
+++ b/stem/util/__init__.py
@@ -5,18 +5,12 @@
 Utility functions used by the stem library.
 """
 
-import asyncio
 import datetime
-import functools
-import inspect
-import threading
-import typing
-import unittest.mock
 
-from types import TracebackType
-from typing import Any, AsyncIterator, Iterator, Optional, Type, Union
+from typing import Any, Union
 
 __all__ = [
+  'asyncio',
   'conf',
   'connection',
   'enum',
@@ -148,225 +142,3 @@ def _hash_attr(obj: Any, *attributes: str, **kwargs: Any) -> int:
     setattr(obj, '_cached_hash', my_hash)
 
   return my_hash
-
-
-class Synchronous(object):
-  """
-  Mixin that lets a class run within both synchronous and asynchronous
-  contexts.
-
-  ::
-
-    class Example(Synchronous):
-      async def hello(self):
-        return 'hello'
-
-    def sync_demo():
-      instance = Example()
-      print('%s from a synchronous context' % instance.hello())
-      instance.stop()
-
-    async def async_demo():
-      instance = Example()
-      print('%s from an asynchronous context' % await instance.hello())
-      instance.stop()
-
-    sync_demo()
-    asyncio.run(async_demo())
-
-  Our async methods always run within a loop. For asyncio users this class has
-  no affect, but otherwise we transparently create an async context to run
-  within.
-
-  Class initialization and any non-async methods should assume they're running
-  within an synchronous context. If our class supplies an **__ainit__()**
-  method it is invoked within our loop during initialization...
-
-  ::
-
-    class Example(Synchronous):
-      def __init__(self):
-        super(Example, self).__init__()
-
-        # Synchronous part of our initialization. Avoid anything
-        # that must run within an asyncio loop.
-
-      def __ainit__(self):
-        # Asychronous part of our initialization. You can call
-        # asyncio.get_running_loop(), and construct objects that
-        # require it (like asyncio.Queue and asyncio.Lock).
-
-  Users are responsible for calling :func:`~stem.util.Synchronous.stop` when
-  finished to clean up underlying resources.
-  """
-
-  def __init__(self) -> None:
-    self._loop = None  # type: Optional[asyncio.AbstractEventLoop]
-    self._loop_thread = None  # type: Optional[threading.Thread]
-    self._loop_lock = threading.RLock()
-
-    # this class is a no-op when created within an asyncio context
-
-    self._no_op = Synchronous.is_asyncio_context()
-
-    if self._no_op:
-      # TODO: replace with get_running_loop() when we remove python 3.6 support
-
-      self._loop = asyncio.get_event_loop()
-      self.__ainit__()  # this is already an asyncio context
-    else:
-      # Run coroutines through our loop. This calls methods by name rather than
-      # reference so runtime replacements (like mocks) work.
-
-      for name, func in inspect.getmembers(self):
-        if name in ('__aiter__', '__aenter__', '__aexit__'):
-          pass  # async object methods with synchronous counterparts
-        elif isinstance(func, unittest.mock.Mock) and inspect.iscoroutinefunction(func.side_effect):
-          setattr(self, name, functools.partial(self._run_async_method, name))
-        elif inspect.ismethod(func) and inspect.iscoroutinefunction(func):
-          setattr(self, name, functools.partial(self._run_async_method, name))
-
-      Synchronous.start(self)
-      asyncio.run_coroutine_threadsafe(asyncio.coroutine(self.__ainit__)(), self._loop).result()
-
-  def __ainit__(self):
-    """
-    Implicitly called during construction. This method is assured to have an
-    asyncio loop during its execution.
-    """
-
-    # This method should be async (so 'await' works), but apparently that
-    # is not possible.
-    #
-    # When our object is constructed our __init__() can be called from a
-    # synchronous or asynchronous context. If synchronous, it's trivial to
-    # run an asynchronous variant of this method because we fully control
-    # the execution of our loop...
-    #
-    #   asyncio.run_coroutine_threadsafe(self.__ainit__(), self._loop).result()
-    #
-    # However, when constructed from an asynchronous context the above will
-    # likely hang because our loop is already processing a task (namely,
-    # whatever is constructing us). While we can schedule a follow-up task, we
-    # cannot invoke it during our construction.
-    #
-    # Finally, when this method is simple we could directly invoke it...
-    #
-    #   class Synchronous(object):
-    #     def __init__(self):
-    #       if Synchronous.is_asyncio_context():
-    #         try:
-    #           self.__ainit__().send(None)
-    #         except StopIteration:
-    #           pass
-    #       else:
-    #         asyncio.run_coroutine_threadsafe(self.__ainit__(), self._loop).result()
-    #
-    #     async def __ainit__(self):
-    #       # asynchronous construction
-    #
-    # However, this breaks if any 'await' suspends our execution. For more
-    # information see...
-    #
-    #   https://stackoverflow.com/questions/52783605/how-to-run-a-coroutine-outside-of-an-event-loop/52829325#52829325
-
-    pass
-
-  def start(self) -> None:
-    """
-    Initiate resources to make this object callable from synchronous contexts.
-    """
-
-    with self._loop_lock:
-      if not self._no_op and self._loop is None:
-        self._loop = asyncio.new_event_loop()
-        self._loop_thread = threading.Thread(
-          name = '%s asyncio' % type(self).__name__,
-          target = self._loop.run_forever,
-          daemon = True,
-        )
-
-        self._loop_thread.start()
-
-  def stop(self) -> None:
-    """
-    Terminate resources that permits this from being callable from synchronous
-    contexts. Calling either :func:`~stem.util.Synchronous.start` or any async
-    method will resume us.
-    """
-
-    with self._loop_lock:
-      if not self._no_op and self._loop is not None:
-        self._loop.call_soon_threadsafe(self._loop.stop)
-
-        if threading.current_thread() != self._loop_thread:
-          self._loop_thread.join()
-
-        self._loop = None
-        self._loop_thread = None
-
-  @staticmethod
-  def is_asyncio_context() -> bool:
-    """
-    Check if running within a synchronous or asynchronous context.
-
-    :returns: **True** if within an asyncio conext, **False** otherwise
-    """
-
-    try:
-      asyncio.get_running_loop()
-      return True
-    except RuntimeError:
-      return False
-    except AttributeError:
-      # TODO: drop when we remove python 3.6 support
-
-      try:
-        return asyncio._get_running_loop() is not None
-      except AttributeError:
-        return False  # python 3.5.3 or below
-
-  def _run_async_method(self, method_name: str, *args: Any, **kwargs: Any) -> Any:
-    """
-    Run this async method from either a synchronous or asynchronous context.
-
-    :param method_name: name of the method to invoke
-    :param args: positional arguments
-    :param kwargs: keyword arguments
-
-    :returns: method's return value
-
-    :raises: **AttributeError** if this method doesn't exist
-    """
-
-    func = getattr(type(self), method_name, None)
-
-    if not func:
-      raise AttributeError("'%s' does not have a %s method" % (type(self).__name__, method_name))
-    elif self._no_op or Synchronous.is_asyncio_context():
-      return func(self, *args, **kwargs)
-
-    with self._loop_lock:
-      if self._loop is None:
-        Synchronous.start(self)
-
-      # convert iterator if indicated by this method's name or type hint
-
-      if method_name == '__aiter__' or (inspect.ismethod(func) and typing.get_type_hints(func).get('return') == AsyncIterator):
-        async def convert_generator(generator: AsyncIterator) -> Iterator:
-          return iter([d async for d in generator])
-
-        future = asyncio.run_coroutine_threadsafe(convert_generator(func(self, *args, **kwargs)), self._loop)
-      else:
-        future = asyncio.run_coroutine_threadsafe(func(self, *args, **kwargs), self._loop)
-
-    return future.result()
-
-  def __iter__(self) -> Iterator:
-    return self._run_async_method('__aiter__')
-
-  def __enter__(self):
-    return self._run_async_method('__aenter__')
-
-  def __exit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]):
-    return self._run_async_method('__aexit__', exit_type, value, traceback)
diff --git a/stem/util/asyncio.py b/stem/util/asyncio.py
new file mode 100644
index 00000000..4ee8df3f
--- /dev/null
+++ b/stem/util/asyncio.py
@@ -0,0 +1,238 @@
+# Copyright 2020, Damian Johnson and The Tor Project
+# See LICENSE for licensing information
+
+"""
+Utilities for working with asyncio.
+"""
+
+import asyncio
+import functools
+import inspect
+import threading
+import typing
+import unittest.mock
+
+from types import TracebackType
+from typing import Any, AsyncIterator, Iterator, Optional, Type
+
+
+class Synchronous(object):
+  """
+  Mixin that lets a class run within both synchronous and asynchronous
+  contexts.
+
+  ::
+
+    class Example(Synchronous):
+      async def hello(self):
+        return 'hello'
+
+    def sync_demo():
+      instance = Example()
+      print('%s from a synchronous context' % instance.hello())
+      instance.stop()
+
+    async def async_demo():
+      instance = Example()
+      print('%s from an asynchronous context' % await instance.hello())
+      instance.stop()
+
+    sync_demo()
+    asyncio.run(async_demo())
+
+  Our async methods always run within a loop. For asyncio users this class has
+  no affect, but otherwise we transparently create an async context to run
+  within.
+
+  Class initialization and any non-async methods should assume they're running
+  within an synchronous context. If our class supplies an **__ainit__()**
+  method it is invoked within our loop during initialization...
+
+  ::
+
+    class Example(Synchronous):
+      def __init__(self):
+        super(Example, self).__init__()
+
+        # Synchronous part of our initialization. Avoid anything
+        # that must run within an asyncio loop.
+
+      def __ainit__(self):
+        # Asychronous part of our initialization. You can call
+        # asyncio.get_running_loop(), and construct objects that
+        # require it (like asyncio.Queue and asyncio.Lock).
+
+  Users are responsible for calling :func:`~stem.util.Synchronous.stop` when
+  finished to clean up underlying resources.
+  """
+
+  def __init__(self) -> None:
+    self._loop = None  # type: Optional[asyncio.AbstractEventLoop]
+    self._loop_thread = None  # type: Optional[threading.Thread]
+    self._loop_lock = threading.RLock()
+
+    # this class is a no-op when created within an asyncio context
+
+    self._no_op = Synchronous.is_asyncio_context()
+
+    if self._no_op:
+      # TODO: replace with get_running_loop() when we remove python 3.6 support
+
+      self._loop = asyncio.get_event_loop()
+      self.__ainit__()  # this is already an asyncio context
+    else:
+      # Run coroutines through our loop. This calls methods by name rather than
+      # reference so runtime replacements (like mocks) work.
+
+      for name, func in inspect.getmembers(self):
+        if name in ('__aiter__', '__aenter__', '__aexit__'):
+          pass  # async object methods with synchronous counterparts
+        elif isinstance(func, unittest.mock.Mock) and inspect.iscoroutinefunction(func.side_effect):
+          setattr(self, name, functools.partial(self._run_async_method, name))
+        elif inspect.ismethod(func) and inspect.iscoroutinefunction(func):
+          setattr(self, name, functools.partial(self._run_async_method, name))
+
+      Synchronous.start(self)
+      asyncio.run_coroutine_threadsafe(asyncio.coroutine(self.__ainit__)(), self._loop).result()
+
+  def __ainit__(self):
+    """
+    Implicitly called during construction. This method is assured to have an
+    asyncio loop during its execution.
+    """
+
+    # This method should be async (so 'await' works), but apparently that
+    # is not possible.
+    #
+    # When our object is constructed our __init__() can be called from a
+    # synchronous or asynchronous context. If synchronous, it's trivial to
+    # run an asynchronous variant of this method because we fully control
+    # the execution of our loop...
+    #
+    #   asyncio.run_coroutine_threadsafe(self.__ainit__(), self._loop).result()
+    #
+    # However, when constructed from an asynchronous context the above will
+    # likely hang because our loop is already processing a task (namely,
+    # whatever is constructing us). While we can schedule a follow-up task, we
+    # cannot invoke it during our construction.
+    #
+    # Finally, when this method is simple we could directly invoke it...
+    #
+    #   class Synchronous(object):
+    #     def __init__(self):
+    #       if Synchronous.is_asyncio_context():
+    #         try:
+    #           self.__ainit__().send(None)
+    #         except StopIteration:
+    #           pass
+    #       else:
+    #         asyncio.run_coroutine_threadsafe(self.__ainit__(), self._loop).result()
+    #
+    #     async def __ainit__(self):
+    #       # asynchronous construction
+    #
+    # However, this breaks if any 'await' suspends our execution. For more
+    # information see...
+    #
+    #   https://stackoverflow.com/questions/52783605/how-to-run-a-coroutine-outside-of-an-event-loop/52829325#52829325
+
+    pass
+
+  def start(self) -> None:
+    """
+    Initiate resources to make this object callable from synchronous contexts.
+    """
+
+    with self._loop_lock:
+      if not self._no_op and self._loop is None:
+        self._loop = asyncio.new_event_loop()
+        self._loop_thread = threading.Thread(
+          name = '%s asyncio' % type(self).__name__,
+          target = self._loop.run_forever,
+          daemon = True,
+        )
+
+        self._loop_thread.start()
+
+  def stop(self) -> None:
+    """
+    Terminate resources that permits this from being callable from synchronous
+    contexts. Calling either :func:`~stem.util.Synchronous.start` or any async
+    method will resume us.
+    """
+
+    with self._loop_lock:
+      if not self._no_op and self._loop is not None:
+        self._loop.call_soon_threadsafe(self._loop.stop)
+
+        if threading.current_thread() != self._loop_thread:
+          self._loop_thread.join()
+
+        self._loop = None
+        self._loop_thread = None
+
+  @staticmethod
+  def is_asyncio_context() -> bool:
+    """
+    Check if running within a synchronous or asynchronous context.
+
+    :returns: **True** if within an asyncio conext, **False** otherwise
+    """
+
+    try:
+      asyncio.get_running_loop()
+      return True
+    except RuntimeError:
+      return False
+    except AttributeError:
+      # TODO: drop when we remove python 3.6 support
+
+      try:
+        return asyncio._get_running_loop() is not None
+      except AttributeError:
+        return False  # python 3.5.3 or below
+
+  def _run_async_method(self, method_name: str, *args: Any, **kwargs: Any) -> Any:
+    """
+    Run this async method from either a synchronous or asynchronous context.
+
+    :param method_name: name of the method to invoke
+    :param args: positional arguments
+    :param kwargs: keyword arguments
+
+    :returns: method's return value
+
+    :raises: **AttributeError** if this method doesn't exist
+    """
+
+    func = getattr(type(self), method_name, None)
+
+    if not func:
+      raise AttributeError("'%s' does not have a %s method" % (type(self).__name__, method_name))
+    elif self._no_op or Synchronous.is_asyncio_context():
+      return func(self, *args, **kwargs)
+
+    with self._loop_lock:
+      if self._loop is None:
+        Synchronous.start(self)
+
+      # convert iterator if indicated by this method's name or type hint
+
+      if method_name == '__aiter__' or (inspect.ismethod(func) and typing.get_type_hints(func).get('return') == AsyncIterator):
+        async def convert_generator(generator: AsyncIterator) -> Iterator:
+          return iter([d async for d in generator])
+
+        future = asyncio.run_coroutine_threadsafe(convert_generator(func(self, *args, **kwargs)), self._loop)
+      else:
+        future = asyncio.run_coroutine_threadsafe(func(self, *args, **kwargs), self._loop)
+
+    return future.result()
+
+  def __iter__(self) -> Iterator:
+    return self._run_async_method('__aiter__')
+
+  def __enter__(self):
+    return self._run_async_method('__aenter__')
+
+  def __exit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]):
+    return self._run_async_method('__aexit__', exit_type, value, traceback)
diff --git a/test/settings.cfg b/test/settings.cfg
index ef543a18..e49835ea 100644
--- a/test/settings.cfg
+++ b/test/settings.cfg
@@ -258,7 +258,7 @@ test.unit_tests
 |test.unit.util.system.TestSystem
 |test.unit.util.term.TestTerminal
 |test.unit.util.tor_tools.TestTorTools
-|test.unit.util.synchronous.TestSynchronous
+|test.unit.util.asyncio.TestSynchronous
 |test.unit.util.__init__.TestBaseUtil
 |test.unit.installation.TestInstallation
 |test.unit.descriptor.descriptor.TestDescriptor
diff --git a/test/unit/util/__init__.py b/test/unit/util/__init__.py
index c29901b1..ffbc0bc4 100644
--- a/test/unit/util/__init__.py
+++ b/test/unit/util/__init__.py
@@ -9,6 +9,7 @@ import unittest
 from stem.util import datetime_to_unix
 
 __all__ = [
+  'asyncio',
   'conf',
   'connection',
   'enum',
diff --git a/test/unit/util/synchronous.py b/test/unit/util/asyncio.py
similarity index 94%
rename from test/unit/util/synchronous.py
rename to test/unit/util/asyncio.py
index 9485e09a..aef5c63f 100644
--- a/test/unit/util/synchronous.py
+++ b/test/unit/util/asyncio.py
@@ -1,5 +1,5 @@
 """
-Unit tests for the stem.util.Synchronous class.
+Unit tests for the stem.util.asyncio module.
 """
 
 import asyncio
@@ -8,7 +8,7 @@ import unittest
 
 from unittest.mock import patch, Mock
 
-from stem.util import Synchronous
+from stem.util.asyncio import Synchronous
 from stem.util.test_tools import coro_func_returning_value
 
 EXAMPLE_OUTPUT = """\
@@ -231,7 +231,7 @@ class TestSynchronous(unittest.TestCase):
 
     pre_constructed = Demo()
 
-    with patch('test.unit.util.synchronous.Demo.async_method', Mock(side_effect = coro_func_returning_value('mocked call'))):
+    with patch('test.unit.util.asyncio.Demo.async_method', Mock(side_effect = coro_func_returning_value('mocked call'))):
       post_constructed = Demo()
 
       self.assertEqual('mocked call', pre_constructed.async_method())
@@ -242,7 +242,7 @@ class TestSynchronous(unittest.TestCase):
 
     # synchronous methods are unaffected
 
-    with patch('test.unit.util.synchronous.Demo.sync_method', Mock(return_value = 'mocked call')):
+    with patch('test.unit.util.asyncio.Demo.sync_method', Mock(return_value = 'mocked call')):
       self.assertEqual('mocked call', pre_constructed.sync_method())
 
     self.assertEqual('sync call', pre_constructed.sync_method())



More information about the tor-commits mailing list