commit 21ee7d288bdc1eacb7a9be805f5fcfcc28da8fc5 Author: Illia Volochii illia.volochii@gmail.com Date: Sun Apr 12 19:42:40 2020 +0300
Breath asynchrony into the `BaseController` --- stem/control.py | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-)
diff --git a/stem/control.py b/stem/control.py index a3b7d253..d592c4af 100644 --- a/stem/control.py +++ b/stem/control.py @@ -547,10 +547,6 @@ class BaseController(object):
self._asyncio_loop = asyncio.get_event_loop()
- self._asyncio_thread = threading.Thread(target = self._asyncio_loop.run_forever, name = 'asyncio') - self._asyncio_thread.setDaemon(True) - self._asyncio_thread.start() - self._msg_lock = threading.RLock()
self._status_listeners = [] # type: List[Tuple[Callable[[stem.control.BaseController, stem.control.State, float], None], bool]] # tuples of the form (callback, spawn_thread) @@ -582,7 +578,7 @@ class BaseController(object): if is_authenticated: self._post_authentication()
- def msg(self, message: str) -> stem.response.ControlMessage: + async def msg(self, message: str) -> stem.response.ControlMessage: """ Sends a message to our control socket and provides back its reply.
@@ -648,7 +644,7 @@ class BaseController(object): try: self._asyncio_loop.create_task(self._socket.send(message))
- response = asyncio.run_coroutine_threadsafe(self._reply_queue.get(), self._asyncio_loop).result() + response = await self._reply_queue.get()
# If the message we received back had an exception then re-raise it to the # caller. Otherwise return the response. @@ -711,7 +707,7 @@ class BaseController(object):
return self._is_authenticated if self.is_alive() else False
- def connect(self) -> None: + async def connect(self) -> None: """ Reconnects our control socket. This is a pass-through for our socket's :func:`~stem.socket.ControlSocket.connect` method. @@ -719,15 +715,15 @@ class BaseController(object): :raises: :class:`stem.SocketError` if unable to make a socket """
- asyncio.run_coroutine_threadsafe(self._socket.connect(), self._asyncio_loop).result() + await self._socket.connect()
- def close(self) -> None: + async def close(self) -> None: """ Closes our socket connection. This is a pass-through for our socket's :func:`~stem.socket.BaseSocket.close` method. """
- asyncio.run_coroutine_threadsafe(self._socket.close(), self._asyncio_loop).result() + await self._socket.close()
# Join on any outstanding state change listeners. Closing is a state change # of its own, so if we have any listeners it's quite likely there's some @@ -740,9 +736,6 @@ class BaseController(object): if t.is_alive() and threading.current_thread() != t: t.join()
- self._asyncio_loop.call_soon_threadsafe(self._asyncio_loop.stop) - self._asyncio_thread.join() - def get_socket(self) -> stem.socket.ControlSocket: """ Provides the socket used to speak with the tor process. Communicating with @@ -815,11 +808,11 @@ class BaseController(object): self._status_listeners = new_listeners return is_changed
- def __enter__(self) -> 'stem.control.BaseController': + async def __aenter__(self) -> 'stem.control.BaseController': return self
- def __exit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None: - self.close() + await def __aexit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None: + await self.close()
def _handle_event(self, event_message: stem.response.ControlMessage) -> None: """