[tor-commits] [stem/master] Breath asynchrony into the `BaseController`

atagar at torproject.org atagar at torproject.org
Thu Jul 16 01:28:58 UTC 2020


commit 21ee7d288bdc1eacb7a9be805f5fcfcc28da8fc5
Author: Illia Volochii <illia.volochii at gmail.com>
Date:   Sun Apr 12 19:42:40 2020 +0300

    Breath asynchrony into the `BaseController`
---
 stem/control.py | 25 +++++++++----------------
 1 file changed, 9 insertions(+), 16 deletions(-)

diff --git a/stem/control.py b/stem/control.py
index a3b7d253..d592c4af 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -547,10 +547,6 @@ class BaseController(object):
 
     self._asyncio_loop = asyncio.get_event_loop()
 
-    self._asyncio_thread = threading.Thread(target = self._asyncio_loop.run_forever, name = 'asyncio')
-    self._asyncio_thread.setDaemon(True)
-    self._asyncio_thread.start()
-
     self._msg_lock = threading.RLock()
 
     self._status_listeners = []  # type: List[Tuple[Callable[[stem.control.BaseController, stem.control.State, float], None], bool]] # tuples of the form (callback, spawn_thread)
@@ -582,7 +578,7 @@ class BaseController(object):
     if is_authenticated:
       self._post_authentication()
 
-  def msg(self, message: str) -> stem.response.ControlMessage:
+  async def msg(self, message: str) -> stem.response.ControlMessage:
     """
     Sends a message to our control socket and provides back its reply.
 
@@ -648,7 +644,7 @@ class BaseController(object):
       try:
         self._asyncio_loop.create_task(self._socket.send(message))
 
-        response = asyncio.run_coroutine_threadsafe(self._reply_queue.get(), self._asyncio_loop).result()
+        response = await self._reply_queue.get()
 
         # If the message we received back had an exception then re-raise it to the
         # caller. Otherwise return the response.
@@ -711,7 +707,7 @@ class BaseController(object):
 
     return self._is_authenticated if self.is_alive() else False
 
-  def connect(self) -> None:
+  async def connect(self) -> None:
     """
     Reconnects our control socket. This is a pass-through for our socket's
     :func:`~stem.socket.ControlSocket.connect` method.
@@ -719,15 +715,15 @@ class BaseController(object):
     :raises: :class:`stem.SocketError` if unable to make a socket
     """
 
-    asyncio.run_coroutine_threadsafe(self._socket.connect(), self._asyncio_loop).result()
+    await self._socket.connect()
 
-  def close(self) -> None:
+  async def close(self) -> None:
     """
     Closes our socket connection. This is a pass-through for our socket's
     :func:`~stem.socket.BaseSocket.close` method.
     """
 
-    asyncio.run_coroutine_threadsafe(self._socket.close(), self._asyncio_loop).result()
+    await self._socket.close()
 
     # Join on any outstanding state change listeners. Closing is a state change
     # of its own, so if we have any listeners it's quite likely there's some
@@ -740,9 +736,6 @@ class BaseController(object):
       if t.is_alive() and threading.current_thread() != t:
         t.join()
 
-    self._asyncio_loop.call_soon_threadsafe(self._asyncio_loop.stop)
-    self._asyncio_thread.join()
-
   def get_socket(self) -> stem.socket.ControlSocket:
     """
     Provides the socket used to speak with the tor process. Communicating with
@@ -815,11 +808,11 @@ class BaseController(object):
       self._status_listeners = new_listeners
       return is_changed
 
-  def __enter__(self) -> 'stem.control.BaseController':
+  async def __aenter__(self) -> 'stem.control.BaseController':
     return self
 
-  def __exit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None:
-    self.close()
+  await def __aexit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None:
+    await self.close()
 
   def _handle_event(self, event_message: stem.response.ControlMessage) -> None:
     """





More information about the tor-commits mailing list