commit f9de9a9612d639337090715e0b84d44129a0288a Author: Illia Volochii illia.volochii@gmail.com Date: Sun May 24 01:18:31 2020 +0300
Start awaiting finishing of the loop tasks while closing controllers --- stem/control.py | 17 ++++++++++++++--- test/integ/control/base_controller.py | 11 ++++++++--- 2 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/stem/control.py b/stem/control.py index c26da351..6ca6c23b 100644 --- a/stem/control.py +++ b/stem/control.py @@ -648,6 +648,8 @@ class BaseController(_BaseControllerSocketMixin):
self._state_change_threads = [] # type: List[threading.Thread] # threads we've spawned to notify of state changes
+ self._reader_loop_task = None # type: Optional[asyncio.Task] + self._event_loop_task = None # type: Optional[asyncio.Task] if self._socket.is_alive(): self._create_loop_tasks()
@@ -868,6 +870,15 @@ class BaseController(_BaseControllerSocketMixin): self._event_notice.set() self._is_authenticated = False
+ reader_loop_task = self._reader_loop_task + self._reader_loop_task = None + event_loop_task = self._event_loop_task + self._event_loop_task = None + if reader_loop_task and self.is_alive(): + await reader_loop_task + if event_loop_task: + await event_loop_task + self._notify_status_listeners(State.CLOSED)
await self._socket_close() @@ -923,12 +934,12 @@ class BaseController(_BaseControllerSocketMixin):
def _create_loop_tasks(self) -> None: """ - Initializes daemon threads. Threads can't be reused so we need to recreate + Initializes asyncio tasks. Tasks can't be reused so we need to recreate them if we're restarted. """
- for coroutine in (self._reader_loop(), self._event_loop()): - self._asyncio_loop.create_task(coroutine) + self._reader_loop_task = self._asyncio_loop.create_task(self._reader_loop()) + self._event_loop_task = self._asyncio_loop.create_task(self._event_loop())
async def _reader_loop(self) -> None: """ diff --git a/test/integ/control/base_controller.py b/test/integ/control/base_controller.py index ac5f1e56..8fc5f1a2 100644 --- a/test/integ/control/base_controller.py +++ b/test/integ/control/base_controller.py @@ -161,9 +161,14 @@ class TestBaseController(unittest.TestCase): await controller.msg('SETEVENTS') await controller.msg('RESETCONF NodeFamily')
- await controller.close() - controller.receive_notice.set() - await asyncio.sleep(0) + # We need to set the receive notice and shut down the controller + # concurrently because the controller will block on the event handling, + # which in turn is currently blocking on the reveive_notice. + + async def set_receive_notice(): + controller.receive_notice.set() + + await asyncio.gather(controller.close(), set_receive_notice())
self.assertTrue(len(controller.received_events) >= 2)
tor-commits@lists.torproject.org