commit ac9311c275b1b81904f1d0f72efff919ee590330 Author: Illia Volochii illia.volochii@gmail.com Date: Sun Apr 12 19:20:48 2020 +0300
Stop using a thread for processing asynchronous events --- stem/control.py | 34 ++++++++++------------------------ test/unit/control/controller.py | 2 +- 2 files changed, 11 insertions(+), 25 deletions(-)
diff --git a/stem/control.py b/stem/control.py index 2d33bd54..a3b7d253 100644 --- a/stem/control.py +++ b/stem/control.py @@ -560,12 +560,7 @@ class BaseController(object): self._reply_queue = asyncio.Queue() # type: asyncio.Queue[Union[stem.response.ControlMessage, stem.ControllerError]] self._event_queue = asyncio.Queue() # type: asyncio.Queue[stem.response.ControlMessage]
- # thread to continually pull from the control socket - self._reader_thread = None # type: Optional[threading.Thread] - - # thread to pull from the _event_queue and call handle_event - self._event_notice = threading.Event() - self._event_thread = None # type: Optional[threading.Thread] + self._event_notice = asyncio.Event()
# saves our socket's prior _connect() and _close() methods so they can be # called along with ours @@ -582,7 +577,7 @@ class BaseController(object): self._state_change_threads = [] # type: List[threading.Thread] # threads we've spawned to notify of state changes
if self._socket.is_alive(): - self._launch_threads() + self._create_loop_tasks()
if is_authenticated: self._post_authentication() @@ -837,7 +832,7 @@ class BaseController(object): pass
async def _connect(self) -> None: - self._launch_threads() + self._create_loop_tasks() self._notify_status_listeners(State.INIT) await self._socket_connect() self._is_authenticated = False @@ -852,10 +847,6 @@ class BaseController(object):
# joins on our threads if it's safe to do so
- for t in (self._reader_thread, self._event_thread): - if t and t.is_alive() and threading.current_thread() != t: - t.join() - self._notify_status_listeners(State.CLOSED)
await self._socket_close() @@ -909,22 +900,15 @@ class BaseController(object): else: listener(self, state, change_timestamp)
- def _launch_threads(self) -> None: + def _create_loop_tasks(self) -> None: """ Initializes daemon threads. Threads can't be reused so we need to recreate them if we're restarted. """
self._asyncio_loop.create_task(self._reader_loop()) + self._asyncio_loop.create_task(self._event_loop())
- # In theory concurrent calls could result in multiple start() calls on a - # single thread, which would cause an unexpected exception. Best be safe. - - with self._socket._get_send_lock(): - if not self._event_thread or not self._event_thread.is_alive(): - self._event_thread = threading.Thread(target = self._event_loop, name = 'Event notifier') - self._event_thread.setDaemon(True) - self._event_thread.start()
async def _reader_loop(self) -> None: """ @@ -955,7 +939,7 @@ class BaseController(object):
await self._reply_queue.put(exc)
- def _event_loop(self) -> None: + async def _event_loop(self) -> None: """ Continually pulls messages from the _event_queue and sends them to our handle_event callback. This is done via its own thread so subclasses with a @@ -982,8 +966,10 @@ class BaseController(object): if not self.is_alive(): break
- self._event_notice.wait(0.05) - self._event_notice.clear() + try: + await asyncio.wait_for(self._event_notice.wait(), timeout=0.05) + except asyncio.TimeoutError: + self._event_notice.clear()
class Controller(BaseController): diff --git a/test/unit/control/controller.py b/test/unit/control/controller.py index 02ed2774..c0a07e2a 100644 --- a/test/unit/control/controller.py +++ b/test/unit/control/controller.py @@ -707,7 +707,7 @@ class TestControl(unittest.TestCase): with patch('time.time', Mock(return_value = TEST_TIMESTAMP)): with patch('stem.control.Controller.is_alive') as is_alive_mock: is_alive_mock.return_value = True - self.controller._launch_threads() + self.controller._create_loop_tasks()
try: # Converting an event back into an uncast ControlMessage, then feeding it