commit 341dc930bf89aed8567cad255b333b36576a5f06 Author: Damian Johnson atagar@torproject.org Date: Sat Mar 21 17:12:29 2020 -0700
Prototype Controller asyncio usage
Still very much an experiment. Several things are broken but this adjusts our Controller successfully wrap an asyncio socket.
from stem.control import Controller
controller = Controller.from_port() controller.connect() controller.authenticate() print("Tor is running version %s" % controller.get_version()) controller.close()
----------------------------------------
% python3.7 demo.py Tor is running version 0.4.3.0-alpha-dev (git-5e70c27e8560ac18)
Essentially this replaces our reader daemon with an asyncio loop. Synchronous code can invoke this loop in the following ways...
* Non-blocking call of a coroutine
self._asyncio_loop.create_task(my_coroutine())
* Blocking call of a coroutine
result = asyncio.run_coroutine_threadsafe(my_coroutine(), self._asyncio_loop).result(timeout)
* Non-blocking call of a function
self._asyncio_loop.call_soon_threadsafe(my_function)
I'm not quite sold yet on if we'll go this direction. An asynchronous socket has two potential advantages...
* Simplify multi-threading * Offer asynchronous variants of our methods
Either or both of these might be dead ends. I'll need to experiment more, but now that we've seen this work in this limited way we have something to build off of. --- stem/control.py | 56 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 24 deletions(-)
diff --git a/stem/control.py b/stem/control.py index ec88aa38..2d33bd54 100644 --- a/stem/control.py +++ b/stem/control.py @@ -240,13 +240,13 @@ If you're fine with allowing your script to raise exceptions then this can be mo =============== =========== """
+import asyncio import calendar import collections import functools import inspect import io import os -import queue import threading import time
@@ -544,14 +544,21 @@ class BaseController(object):
def __init__(self, control_socket: stem.socket.ControlSocket, is_authenticated: bool = False) -> None: self._socket = control_socket + + self._asyncio_loop = asyncio.get_event_loop() + + self._asyncio_thread = threading.Thread(target = self._asyncio_loop.run_forever, name = 'asyncio') + self._asyncio_thread.setDaemon(True) + self._asyncio_thread.start() + self._msg_lock = threading.RLock()
self._status_listeners = [] # type: List[Tuple[Callable[[stem.control.BaseController, stem.control.State, float], None], bool]] # tuples of the form (callback, spawn_thread) self._status_listeners_lock = threading.RLock()
# queues where incoming messages are directed - self._reply_queue = queue.Queue() # type: queue.Queue[Union[stem.response.ControlMessage, stem.ControllerError]] - self._event_queue = queue.Queue() # type: queue.Queue[stem.response.ControlMessage] + self._reply_queue = asyncio.Queue() # type: asyncio.Queue[Union[stem.response.ControlMessage, stem.ControllerError]] + self._event_queue = asyncio.Queue() # type: asyncio.Queue[stem.response.ControlMessage]
# thread to continually pull from the control socket self._reader_thread = None # type: Optional[threading.Thread] @@ -637,15 +644,16 @@ class BaseController(object): log.info('Socket experienced a problem (%s)' % response) elif isinstance(response, stem.response.ControlMessage): log.info('Failed to deliver a response: %s' % response) - except queue.Empty: + except asyncio.QueueEmpty: # the empty() method is documented to not be fully reliable so this # isn't entirely surprising
break
try: - self._socket.send(message) - response = self._reply_queue.get() + self._asyncio_loop.create_task(self._socket.send(message)) + + response = asyncio.run_coroutine_threadsafe(self._reply_queue.get(), self._asyncio_loop).result()
# If the message we received back had an exception then re-raise it to the # caller. Otherwise return the response. @@ -716,7 +724,7 @@ class BaseController(object): :raises: :class:`stem.SocketError` if unable to make a socket """
- self._socket.connect() + asyncio.run_coroutine_threadsafe(self._socket.connect(), self._asyncio_loop).result()
def close(self) -> None: """ @@ -724,7 +732,7 @@ class BaseController(object): :func:`~stem.socket.BaseSocket.close` method. """
- self._socket.close() + asyncio.run_coroutine_threadsafe(self._socket.close(), self._asyncio_loop).result()
# Join on any outstanding state change listeners. Closing is a state change # of its own, so if we have any listeners it's quite likely there's some @@ -737,6 +745,9 @@ class BaseController(object): if t.is_alive() and threading.current_thread() != t: t.join()
+ self._asyncio_loop.call_soon_threadsafe(self._asyncio_loop.stop) + self._asyncio_thread.join() + def get_socket(self) -> stem.socket.ControlSocket: """ Provides the socket used to speak with the tor process. Communicating with @@ -825,13 +836,13 @@ class BaseController(object):
pass
- def _connect(self) -> None: + async def _connect(self) -> None: self._launch_threads() self._notify_status_listeners(State.INIT) - self._socket_connect() + await self._socket_connect() self._is_authenticated = False
- def _close(self) -> None: + async def _close(self) -> None: # Our is_alive() state is now false. Our reader thread should already be # awake from recv() raising a closure exception. Wake up the event thread # too so it can end. @@ -847,7 +858,7 @@ class BaseController(object):
self._notify_status_listeners(State.CLOSED)
- self._socket_close() + await self._socket_close()
def _post_authentication(self) -> None: # actions to be taken after we have a newly authenticated connection @@ -904,21 +915,18 @@ class BaseController(object): them if we're restarted. """
+ self._asyncio_loop.create_task(self._reader_loop()) + # In theory concurrent calls could result in multiple start() calls on a # single thread, which would cause an unexpected exception. Best be safe.
with self._socket._get_send_lock(): - if not self._reader_thread or not self._reader_thread.is_alive(): - self._reader_thread = threading.Thread(target = self._reader_loop, name = 'Tor listener') - self._reader_thread.setDaemon(True) - self._reader_thread.start() - if not self._event_thread or not self._event_thread.is_alive(): self._event_thread = threading.Thread(target = self._event_loop, name = 'Event notifier') self._event_thread.setDaemon(True) self._event_thread.start()
- def _reader_loop(self) -> None: + async def _reader_loop(self) -> None: """ Continually pulls from the control socket, directing the messages into queues based on their type. Controller messages come in two varieties... @@ -929,23 +937,23 @@ class BaseController(object):
while self.is_alive(): try: - control_message = self._socket.recv() + control_message = await self._socket.recv() self._last_heartbeat = time.time()
if control_message.content()[-1][0] == '650': # asynchronous message, adds to the event queue and wakes up its handler - self._event_queue.put(control_message) + await self._event_queue.put(control_message) self._event_notice.set() else: # response to a msg() call - self._reply_queue.put(control_message) + await self._reply_queue.put(control_message) except stem.ControllerError as exc: # Assume that all exceptions belong to the reader. This isn't always # true, but the msg() call can do a better job of sorting it out. # # Be aware that the msg() method relies on this to unblock callers.
- self._reply_queue.put(exc) + await self._reply_queue.put(exc)
def _event_loop(self) -> None: """ @@ -970,7 +978,7 @@ class BaseController(object): socket_closed_at = time.time() elif time.time() - socket_closed_at > EVENTS_LISTENING_TIMEOUT: break - except queue.Empty: + except asyncio.queues.QueueEmpty: if not self.is_alive(): break
@@ -3972,7 +3980,7 @@ def _get_with_timeout(event_queue: queue.Queue, timeout: float, start_time: floa
try: return event_queue.get(True, time_left) - except queue.Empty: + except asyncio.queues.QueueEmpty: raise stem.Timeout('Reached our %0.1f second timeout' % timeout) else: return event_queue.get()