commit c1bad9e9b249388ee9c6e29b0cb75da0ad6eea64 Author: Damian Johnson atagar@torproject.org Date: Sat Jul 25 15:59:54 2020 -0700
Move Synchronous to its own module
This class has grown sophisticated enough that it deserves its own module. Also, I'd like to discuss this with the wider python community and this will make it easier to cite. --- stem/control.py | 3 +- stem/descriptor/remote.py | 3 +- stem/util/__init__.py | 232 +------------------------ stem/util/asyncio.py | 238 ++++++++++++++++++++++++++ test/settings.cfg | 2 +- test/unit/util/__init__.py | 1 + test/unit/util/{synchronous.py => asyncio.py} | 8 +- 7 files changed, 250 insertions(+), 237 deletions(-)
diff --git a/stem/control.py b/stem/control.py index 47b8a37c..578a471d 100644 --- a/stem/control.py +++ b/stem/control.py @@ -269,7 +269,8 @@ import stem.util.tor_tools import stem.version
from stem import UNDEFINED, CircStatus, Signal -from stem.util import Synchronous, log +from stem.util import log +from stem.util.asyncio import Synchronous from types import TracebackType from typing import Any, AsyncIterator, Awaitable, Callable, Dict, List, Mapping, Optional, Sequence, Set, Tuple, Type, Union
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py index ff4b1403..2e9ec641 100644 --- a/stem/descriptor/remote.py +++ b/stem/descriptor/remote.py @@ -99,7 +99,8 @@ import stem.util.enum import stem.util.tor_tools
from stem.descriptor import Compression -from stem.util import Synchronous, log, str_tools +from stem.util import log, str_tools +from stem.util.asyncio import Synchronous from typing import Any, AsyncIterator, Dict, List, Optional, Sequence, Tuple, Union
# Tor has a limited number of descriptors we can fetch explicitly by their diff --git a/stem/util/__init__.py b/stem/util/__init__.py index c525dd6f..7d5ca853 100644 --- a/stem/util/__init__.py +++ b/stem/util/__init__.py @@ -5,18 +5,12 @@ Utility functions used by the stem library. """
-import asyncio import datetime -import functools -import inspect -import threading -import typing -import unittest.mock
-from types import TracebackType -from typing import Any, AsyncIterator, Iterator, Optional, Type, Union +from typing import Any, Union
__all__ = [ + 'asyncio', 'conf', 'connection', 'enum', @@ -148,225 +142,3 @@ def _hash_attr(obj: Any, *attributes: str, **kwargs: Any) -> int: setattr(obj, '_cached_hash', my_hash)
return my_hash - - -class Synchronous(object): - """ - Mixin that lets a class run within both synchronous and asynchronous - contexts. - - :: - - class Example(Synchronous): - async def hello(self): - return 'hello' - - def sync_demo(): - instance = Example() - print('%s from a synchronous context' % instance.hello()) - instance.stop() - - async def async_demo(): - instance = Example() - print('%s from an asynchronous context' % await instance.hello()) - instance.stop() - - sync_demo() - asyncio.run(async_demo()) - - Our async methods always run within a loop. For asyncio users this class has - no affect, but otherwise we transparently create an async context to run - within. - - Class initialization and any non-async methods should assume they're running - within an synchronous context. If our class supplies an **__ainit__()** - method it is invoked within our loop during initialization... - - :: - - class Example(Synchronous): - def __init__(self): - super(Example, self).__init__() - - # Synchronous part of our initialization. Avoid anything - # that must run within an asyncio loop. - - def __ainit__(self): - # Asychronous part of our initialization. You can call - # asyncio.get_running_loop(), and construct objects that - # require it (like asyncio.Queue and asyncio.Lock). - - Users are responsible for calling :func:`~stem.util.Synchronous.stop` when - finished to clean up underlying resources. - """ - - def __init__(self) -> None: - self._loop = None # type: Optional[asyncio.AbstractEventLoop] - self._loop_thread = None # type: Optional[threading.Thread] - self._loop_lock = threading.RLock() - - # this class is a no-op when created within an asyncio context - - self._no_op = Synchronous.is_asyncio_context() - - if self._no_op: - # TODO: replace with get_running_loop() when we remove python 3.6 support - - self._loop = asyncio.get_event_loop() - self.__ainit__() # this is already an asyncio context - else: - # Run coroutines through our loop. This calls methods by name rather than - # reference so runtime replacements (like mocks) work. - - for name, func in inspect.getmembers(self): - if name in ('__aiter__', '__aenter__', '__aexit__'): - pass # async object methods with synchronous counterparts - elif isinstance(func, unittest.mock.Mock) and inspect.iscoroutinefunction(func.side_effect): - setattr(self, name, functools.partial(self._run_async_method, name)) - elif inspect.ismethod(func) and inspect.iscoroutinefunction(func): - setattr(self, name, functools.partial(self._run_async_method, name)) - - Synchronous.start(self) - asyncio.run_coroutine_threadsafe(asyncio.coroutine(self.__ainit__)(), self._loop).result() - - def __ainit__(self): - """ - Implicitly called during construction. This method is assured to have an - asyncio loop during its execution. - """ - - # This method should be async (so 'await' works), but apparently that - # is not possible. - # - # When our object is constructed our __init__() can be called from a - # synchronous or asynchronous context. If synchronous, it's trivial to - # run an asynchronous variant of this method because we fully control - # the execution of our loop... - # - # asyncio.run_coroutine_threadsafe(self.__ainit__(), self._loop).result() - # - # However, when constructed from an asynchronous context the above will - # likely hang because our loop is already processing a task (namely, - # whatever is constructing us). While we can schedule a follow-up task, we - # cannot invoke it during our construction. - # - # Finally, when this method is simple we could directly invoke it... - # - # class Synchronous(object): - # def __init__(self): - # if Synchronous.is_asyncio_context(): - # try: - # self.__ainit__().send(None) - # except StopIteration: - # pass - # else: - # asyncio.run_coroutine_threadsafe(self.__ainit__(), self._loop).result() - # - # async def __ainit__(self): - # # asynchronous construction - # - # However, this breaks if any 'await' suspends our execution. For more - # information see... - # - # https://stackoverflow.com/questions/52783605/how-to-run-a-coroutine-outside-... - - pass - - def start(self) -> None: - """ - Initiate resources to make this object callable from synchronous contexts. - """ - - with self._loop_lock: - if not self._no_op and self._loop is None: - self._loop = asyncio.new_event_loop() - self._loop_thread = threading.Thread( - name = '%s asyncio' % type(self).__name__, - target = self._loop.run_forever, - daemon = True, - ) - - self._loop_thread.start() - - def stop(self) -> None: - """ - Terminate resources that permits this from being callable from synchronous - contexts. Calling either :func:`~stem.util.Synchronous.start` or any async - method will resume us. - """ - - with self._loop_lock: - if not self._no_op and self._loop is not None: - self._loop.call_soon_threadsafe(self._loop.stop) - - if threading.current_thread() != self._loop_thread: - self._loop_thread.join() - - self._loop = None - self._loop_thread = None - - @staticmethod - def is_asyncio_context() -> bool: - """ - Check if running within a synchronous or asynchronous context. - - :returns: **True** if within an asyncio conext, **False** otherwise - """ - - try: - asyncio.get_running_loop() - return True - except RuntimeError: - return False - except AttributeError: - # TODO: drop when we remove python 3.6 support - - try: - return asyncio._get_running_loop() is not None - except AttributeError: - return False # python 3.5.3 or below - - def _run_async_method(self, method_name: str, *args: Any, **kwargs: Any) -> Any: - """ - Run this async method from either a synchronous or asynchronous context. - - :param method_name: name of the method to invoke - :param args: positional arguments - :param kwargs: keyword arguments - - :returns: method's return value - - :raises: **AttributeError** if this method doesn't exist - """ - - func = getattr(type(self), method_name, None) - - if not func: - raise AttributeError("'%s' does not have a %s method" % (type(self).__name__, method_name)) - elif self._no_op or Synchronous.is_asyncio_context(): - return func(self, *args, **kwargs) - - with self._loop_lock: - if self._loop is None: - Synchronous.start(self) - - # convert iterator if indicated by this method's name or type hint - - if method_name == '__aiter__' or (inspect.ismethod(func) and typing.get_type_hints(func).get('return') == AsyncIterator): - async def convert_generator(generator: AsyncIterator) -> Iterator: - return iter([d async for d in generator]) - - future = asyncio.run_coroutine_threadsafe(convert_generator(func(self, *args, **kwargs)), self._loop) - else: - future = asyncio.run_coroutine_threadsafe(func(self, *args, **kwargs), self._loop) - - return future.result() - - def __iter__(self) -> Iterator: - return self._run_async_method('__aiter__') - - def __enter__(self): - return self._run_async_method('__aenter__') - - def __exit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]): - return self._run_async_method('__aexit__', exit_type, value, traceback) diff --git a/stem/util/asyncio.py b/stem/util/asyncio.py new file mode 100644 index 00000000..4ee8df3f --- /dev/null +++ b/stem/util/asyncio.py @@ -0,0 +1,238 @@ +# Copyright 2020, Damian Johnson and The Tor Project +# See LICENSE for licensing information + +""" +Utilities for working with asyncio. +""" + +import asyncio +import functools +import inspect +import threading +import typing +import unittest.mock + +from types import TracebackType +from typing import Any, AsyncIterator, Iterator, Optional, Type + + +class Synchronous(object): + """ + Mixin that lets a class run within both synchronous and asynchronous + contexts. + + :: + + class Example(Synchronous): + async def hello(self): + return 'hello' + + def sync_demo(): + instance = Example() + print('%s from a synchronous context' % instance.hello()) + instance.stop() + + async def async_demo(): + instance = Example() + print('%s from an asynchronous context' % await instance.hello()) + instance.stop() + + sync_demo() + asyncio.run(async_demo()) + + Our async methods always run within a loop. For asyncio users this class has + no affect, but otherwise we transparently create an async context to run + within. + + Class initialization and any non-async methods should assume they're running + within an synchronous context. If our class supplies an **__ainit__()** + method it is invoked within our loop during initialization... + + :: + + class Example(Synchronous): + def __init__(self): + super(Example, self).__init__() + + # Synchronous part of our initialization. Avoid anything + # that must run within an asyncio loop. + + def __ainit__(self): + # Asychronous part of our initialization. You can call + # asyncio.get_running_loop(), and construct objects that + # require it (like asyncio.Queue and asyncio.Lock). + + Users are responsible for calling :func:`~stem.util.Synchronous.stop` when + finished to clean up underlying resources. + """ + + def __init__(self) -> None: + self._loop = None # type: Optional[asyncio.AbstractEventLoop] + self._loop_thread = None # type: Optional[threading.Thread] + self._loop_lock = threading.RLock() + + # this class is a no-op when created within an asyncio context + + self._no_op = Synchronous.is_asyncio_context() + + if self._no_op: + # TODO: replace with get_running_loop() when we remove python 3.6 support + + self._loop = asyncio.get_event_loop() + self.__ainit__() # this is already an asyncio context + else: + # Run coroutines through our loop. This calls methods by name rather than + # reference so runtime replacements (like mocks) work. + + for name, func in inspect.getmembers(self): + if name in ('__aiter__', '__aenter__', '__aexit__'): + pass # async object methods with synchronous counterparts + elif isinstance(func, unittest.mock.Mock) and inspect.iscoroutinefunction(func.side_effect): + setattr(self, name, functools.partial(self._run_async_method, name)) + elif inspect.ismethod(func) and inspect.iscoroutinefunction(func): + setattr(self, name, functools.partial(self._run_async_method, name)) + + Synchronous.start(self) + asyncio.run_coroutine_threadsafe(asyncio.coroutine(self.__ainit__)(), self._loop).result() + + def __ainit__(self): + """ + Implicitly called during construction. This method is assured to have an + asyncio loop during its execution. + """ + + # This method should be async (so 'await' works), but apparently that + # is not possible. + # + # When our object is constructed our __init__() can be called from a + # synchronous or asynchronous context. If synchronous, it's trivial to + # run an asynchronous variant of this method because we fully control + # the execution of our loop... + # + # asyncio.run_coroutine_threadsafe(self.__ainit__(), self._loop).result() + # + # However, when constructed from an asynchronous context the above will + # likely hang because our loop is already processing a task (namely, + # whatever is constructing us). While we can schedule a follow-up task, we + # cannot invoke it during our construction. + # + # Finally, when this method is simple we could directly invoke it... + # + # class Synchronous(object): + # def __init__(self): + # if Synchronous.is_asyncio_context(): + # try: + # self.__ainit__().send(None) + # except StopIteration: + # pass + # else: + # asyncio.run_coroutine_threadsafe(self.__ainit__(), self._loop).result() + # + # async def __ainit__(self): + # # asynchronous construction + # + # However, this breaks if any 'await' suspends our execution. For more + # information see... + # + # https://stackoverflow.com/questions/52783605/how-to-run-a-coroutine-outside-... + + pass + + def start(self) -> None: + """ + Initiate resources to make this object callable from synchronous contexts. + """ + + with self._loop_lock: + if not self._no_op and self._loop is None: + self._loop = asyncio.new_event_loop() + self._loop_thread = threading.Thread( + name = '%s asyncio' % type(self).__name__, + target = self._loop.run_forever, + daemon = True, + ) + + self._loop_thread.start() + + def stop(self) -> None: + """ + Terminate resources that permits this from being callable from synchronous + contexts. Calling either :func:`~stem.util.Synchronous.start` or any async + method will resume us. + """ + + with self._loop_lock: + if not self._no_op and self._loop is not None: + self._loop.call_soon_threadsafe(self._loop.stop) + + if threading.current_thread() != self._loop_thread: + self._loop_thread.join() + + self._loop = None + self._loop_thread = None + + @staticmethod + def is_asyncio_context() -> bool: + """ + Check if running within a synchronous or asynchronous context. + + :returns: **True** if within an asyncio conext, **False** otherwise + """ + + try: + asyncio.get_running_loop() + return True + except RuntimeError: + return False + except AttributeError: + # TODO: drop when we remove python 3.6 support + + try: + return asyncio._get_running_loop() is not None + except AttributeError: + return False # python 3.5.3 or below + + def _run_async_method(self, method_name: str, *args: Any, **kwargs: Any) -> Any: + """ + Run this async method from either a synchronous or asynchronous context. + + :param method_name: name of the method to invoke + :param args: positional arguments + :param kwargs: keyword arguments + + :returns: method's return value + + :raises: **AttributeError** if this method doesn't exist + """ + + func = getattr(type(self), method_name, None) + + if not func: + raise AttributeError("'%s' does not have a %s method" % (type(self).__name__, method_name)) + elif self._no_op or Synchronous.is_asyncio_context(): + return func(self, *args, **kwargs) + + with self._loop_lock: + if self._loop is None: + Synchronous.start(self) + + # convert iterator if indicated by this method's name or type hint + + if method_name == '__aiter__' or (inspect.ismethod(func) and typing.get_type_hints(func).get('return') == AsyncIterator): + async def convert_generator(generator: AsyncIterator) -> Iterator: + return iter([d async for d in generator]) + + future = asyncio.run_coroutine_threadsafe(convert_generator(func(self, *args, **kwargs)), self._loop) + else: + future = asyncio.run_coroutine_threadsafe(func(self, *args, **kwargs), self._loop) + + return future.result() + + def __iter__(self) -> Iterator: + return self._run_async_method('__aiter__') + + def __enter__(self): + return self._run_async_method('__aenter__') + + def __exit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]): + return self._run_async_method('__aexit__', exit_type, value, traceback) diff --git a/test/settings.cfg b/test/settings.cfg index ef543a18..e49835ea 100644 --- a/test/settings.cfg +++ b/test/settings.cfg @@ -258,7 +258,7 @@ test.unit_tests |test.unit.util.system.TestSystem |test.unit.util.term.TestTerminal |test.unit.util.tor_tools.TestTorTools -|test.unit.util.synchronous.TestSynchronous +|test.unit.util.asyncio.TestSynchronous |test.unit.util.__init__.TestBaseUtil |test.unit.installation.TestInstallation |test.unit.descriptor.descriptor.TestDescriptor diff --git a/test/unit/util/__init__.py b/test/unit/util/__init__.py index c29901b1..ffbc0bc4 100644 --- a/test/unit/util/__init__.py +++ b/test/unit/util/__init__.py @@ -9,6 +9,7 @@ import unittest from stem.util import datetime_to_unix
__all__ = [ + 'asyncio', 'conf', 'connection', 'enum', diff --git a/test/unit/util/synchronous.py b/test/unit/util/asyncio.py similarity index 94% rename from test/unit/util/synchronous.py rename to test/unit/util/asyncio.py index 9485e09a..aef5c63f 100644 --- a/test/unit/util/synchronous.py +++ b/test/unit/util/asyncio.py @@ -1,5 +1,5 @@ """ -Unit tests for the stem.util.Synchronous class. +Unit tests for the stem.util.asyncio module. """
import asyncio @@ -8,7 +8,7 @@ import unittest
from unittest.mock import patch, Mock
-from stem.util import Synchronous +from stem.util.asyncio import Synchronous from stem.util.test_tools import coro_func_returning_value
EXAMPLE_OUTPUT = """\ @@ -231,7 +231,7 @@ class TestSynchronous(unittest.TestCase):
pre_constructed = Demo()
- with patch('test.unit.util.synchronous.Demo.async_method', Mock(side_effect = coro_func_returning_value('mocked call'))): + with patch('test.unit.util.asyncio.Demo.async_method', Mock(side_effect = coro_func_returning_value('mocked call'))): post_constructed = Demo()
self.assertEqual('mocked call', pre_constructed.async_method()) @@ -242,7 +242,7 @@ class TestSynchronous(unittest.TestCase):
# synchronous methods are unaffected
- with patch('test.unit.util.synchronous.Demo.sync_method', Mock(return_value = 'mocked call')): + with patch('test.unit.util.asyncio.Demo.sync_method', Mock(return_value = 'mocked call')): self.assertEqual('mocked call', pre_constructed.sync_method())
self.assertEqual('sync call', pre_constructed.sync_method())
tor-commits@lists.torproject.org