[tor-commits] [stem/master] Prototype Controller asyncio usage

atagar at torproject.org atagar at torproject.org
Thu Jul 16 01:28:58 UTC 2020


commit 341dc930bf89aed8567cad255b333b36576a5f06
Author: Damian Johnson <atagar at torproject.org>
Date:   Sat Mar 21 17:12:29 2020 -0700

    Prototype Controller asyncio usage
    
    Still very much an experiment. Several things are broken but this adjusts our
    Controller successfully wrap an asyncio socket.
    
      from stem.control import Controller
    
      controller = Controller.from_port()
      controller.connect()
      controller.authenticate()
      print("Tor is running version %s" % controller.get_version())
      controller.close()
    
      ----------------------------------------
    
      % python3.7 demo.py
      Tor is running version 0.4.3.0-alpha-dev (git-5e70c27e8560ac18)
    
    Essentially this replaces our reader daemon with an asyncio loop. Synchronous
    code can invoke this loop in the following ways...
    
      * Non-blocking call of a coroutine
    
        self._asyncio_loop.create_task(my_coroutine())
    
      * Blocking call of a coroutine
    
        result = asyncio.run_coroutine_threadsafe(my_coroutine(), self._asyncio_loop).result(timeout)
    
      * Non-blocking call of a function
    
        self._asyncio_loop.call_soon_threadsafe(my_function)
    
    I'm not quite sold yet on if we'll go this direction. An asynchronous socket
    has two potential advantages...
    
      * Simplify multi-threading
      * Offer asynchronous variants of our methods
    
    Either or both of these might be dead ends. I'll need to experiment more, but
    now that we've seen this work in this limited way we have something to build
    off of.
---
 stem/control.py | 56 ++++++++++++++++++++++++++++++++------------------------
 1 file changed, 32 insertions(+), 24 deletions(-)

diff --git a/stem/control.py b/stem/control.py
index ec88aa38..2d33bd54 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -240,13 +240,13 @@ If you're fine with allowing your script to raise exceptions then this can be mo
   =============== ===========
 """
 
+import asyncio
 import calendar
 import collections
 import functools
 import inspect
 import io
 import os
-import queue
 import threading
 import time
 
@@ -544,14 +544,21 @@ class BaseController(object):
 
   def __init__(self, control_socket: stem.socket.ControlSocket, is_authenticated: bool = False) -> None:
     self._socket = control_socket
+
+    self._asyncio_loop = asyncio.get_event_loop()
+
+    self._asyncio_thread = threading.Thread(target = self._asyncio_loop.run_forever, name = 'asyncio')
+    self._asyncio_thread.setDaemon(True)
+    self._asyncio_thread.start()
+
     self._msg_lock = threading.RLock()
 
     self._status_listeners = []  # type: List[Tuple[Callable[[stem.control.BaseController, stem.control.State, float], None], bool]] # tuples of the form (callback, spawn_thread)
     self._status_listeners_lock = threading.RLock()
 
     # queues where incoming messages are directed
-    self._reply_queue = queue.Queue()  # type: queue.Queue[Union[stem.response.ControlMessage, stem.ControllerError]]
-    self._event_queue = queue.Queue()  # type: queue.Queue[stem.response.ControlMessage]
+    self._reply_queue = asyncio.Queue()  # type: asyncio.Queue[Union[stem.response.ControlMessage, stem.ControllerError]]
+    self._event_queue = asyncio.Queue()  # type: asyncio.Queue[stem.response.ControlMessage]
 
     # thread to continually pull from the control socket
     self._reader_thread = None  # type: Optional[threading.Thread]
@@ -637,15 +644,16 @@ class BaseController(object):
             log.info('Socket experienced a problem (%s)' % response)
           elif isinstance(response, stem.response.ControlMessage):
             log.info('Failed to deliver a response: %s' % response)
-        except queue.Empty:
+        except asyncio.QueueEmpty:
           # the empty() method is documented to not be fully reliable so this
           # isn't entirely surprising
 
           break
 
       try:
-        self._socket.send(message)
-        response = self._reply_queue.get()
+        self._asyncio_loop.create_task(self._socket.send(message))
+
+        response = asyncio.run_coroutine_threadsafe(self._reply_queue.get(), self._asyncio_loop).result()
 
         # If the message we received back had an exception then re-raise it to the
         # caller. Otherwise return the response.
@@ -716,7 +724,7 @@ class BaseController(object):
     :raises: :class:`stem.SocketError` if unable to make a socket
     """
 
-    self._socket.connect()
+    asyncio.run_coroutine_threadsafe(self._socket.connect(), self._asyncio_loop).result()
 
   def close(self) -> None:
     """
@@ -724,7 +732,7 @@ class BaseController(object):
     :func:`~stem.socket.BaseSocket.close` method.
     """
 
-    self._socket.close()
+    asyncio.run_coroutine_threadsafe(self._socket.close(), self._asyncio_loop).result()
 
     # Join on any outstanding state change listeners. Closing is a state change
     # of its own, so if we have any listeners it's quite likely there's some
@@ -737,6 +745,9 @@ class BaseController(object):
       if t.is_alive() and threading.current_thread() != t:
         t.join()
 
+    self._asyncio_loop.call_soon_threadsafe(self._asyncio_loop.stop)
+    self._asyncio_thread.join()
+
   def get_socket(self) -> stem.socket.ControlSocket:
     """
     Provides the socket used to speak with the tor process. Communicating with
@@ -825,13 +836,13 @@ class BaseController(object):
 
     pass
 
-  def _connect(self) -> None:
+  async def _connect(self) -> None:
     self._launch_threads()
     self._notify_status_listeners(State.INIT)
-    self._socket_connect()
+    await self._socket_connect()
     self._is_authenticated = False
 
-  def _close(self) -> None:
+  async def _close(self) -> None:
     # Our is_alive() state is now false. Our reader thread should already be
     # awake from recv() raising a closure exception. Wake up the event thread
     # too so it can end.
@@ -847,7 +858,7 @@ class BaseController(object):
 
     self._notify_status_listeners(State.CLOSED)
 
-    self._socket_close()
+    await self._socket_close()
 
   def _post_authentication(self) -> None:
     # actions to be taken after we have a newly authenticated connection
@@ -904,21 +915,18 @@ class BaseController(object):
     them if we're restarted.
     """
 
+    self._asyncio_loop.create_task(self._reader_loop())
+
     # In theory concurrent calls could result in multiple start() calls on a
     # single thread, which would cause an unexpected exception. Best be safe.
 
     with self._socket._get_send_lock():
-      if not self._reader_thread or not self._reader_thread.is_alive():
-        self._reader_thread = threading.Thread(target = self._reader_loop, name = 'Tor listener')
-        self._reader_thread.setDaemon(True)
-        self._reader_thread.start()
-
       if not self._event_thread or not self._event_thread.is_alive():
         self._event_thread = threading.Thread(target = self._event_loop, name = 'Event notifier')
         self._event_thread.setDaemon(True)
         self._event_thread.start()
 
-  def _reader_loop(self) -> None:
+  async def _reader_loop(self) -> None:
     """
     Continually pulls from the control socket, directing the messages into
     queues based on their type. Controller messages come in two varieties...
@@ -929,23 +937,23 @@ class BaseController(object):
 
     while self.is_alive():
       try:
-        control_message = self._socket.recv()
+        control_message = await self._socket.recv()
         self._last_heartbeat = time.time()
 
         if control_message.content()[-1][0] == '650':
           # asynchronous message, adds to the event queue and wakes up its handler
-          self._event_queue.put(control_message)
+          await self._event_queue.put(control_message)
           self._event_notice.set()
         else:
           # response to a msg() call
-          self._reply_queue.put(control_message)
+          await self._reply_queue.put(control_message)
       except stem.ControllerError as exc:
         # Assume that all exceptions belong to the reader. This isn't always
         # true, but the msg() call can do a better job of sorting it out.
         #
         # Be aware that the msg() method relies on this to unblock callers.
 
-        self._reply_queue.put(exc)
+        await self._reply_queue.put(exc)
 
   def _event_loop(self) -> None:
     """
@@ -970,7 +978,7 @@ class BaseController(object):
             socket_closed_at = time.time()
           elif time.time() - socket_closed_at > EVENTS_LISTENING_TIMEOUT:
             break
-      except queue.Empty:
+      except asyncio.queues.QueueEmpty:
         if not self.is_alive():
           break
 
@@ -3972,7 +3980,7 @@ def _get_with_timeout(event_queue: queue.Queue, timeout: float, start_time: floa
 
     try:
       return event_queue.get(True, time_left)
-    except queue.Empty:
+    except asyncio.queues.QueueEmpty:
       raise stem.Timeout('Reached our %0.1f second timeout' % timeout)
   else:
     return event_queue.get()





More information about the tor-commits mailing list