commit 8d18e6bb83c02ecb1a98e4cafbde7113f4e55730 Author: Illia Volochii illia.volochii@gmail.com Date: Mon Apr 27 18:33:21 2020 +0300
Fix message synchronization --- stem/control.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-)
diff --git a/stem/control.py b/stem/control.py index 3eb706e0..d3e074b7 100644 --- a/stem/control.py +++ b/stem/control.py @@ -553,6 +553,27 @@ def event_description(event: str) -> str: return EVENT_DESCRIPTIONS.get(event.lower())
+class _MsgLock: + def __init__(self): + self._r_lock = threading.RLock() + self._async_lock = asyncio.Lock() + + async def acquire(self): + await self._async_lock.acquire() + self._r_lock.acquire() + + def release(self): + self._r_lock.release() + self._async_lock.release() + + async def __aenter__(self): + await self.acquire() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + self.release() + + class _BaseControllerSocketMixin: def is_alive(self) -> bool: """ @@ -622,7 +643,7 @@ class BaseController(_BaseControllerSocketMixin): self._asyncio_loop = asyncio.get_event_loop() self._asyncio_loop_tasks = []
- self._msg_lock = threading.RLock() + self._msg_lock = _MsgLock()
self._status_listeners = [] # type: List[Tuple[Callable[[stem.control.BaseController, stem.control.State, float], None], bool]] # tuples of the form (callback, spawn_thread) self._status_listeners_lock = threading.RLock() @@ -669,7 +690,7 @@ class BaseController(_BaseControllerSocketMixin): * :class:`stem.SocketClosed` if the socket is shut down """
- with self._msg_lock: + async with self._msg_lock: # If our _reply_queue isn't empty then one of a few things happened... # # - Our connection was closed and probably re-restablished. This was @@ -1128,7 +1149,7 @@ class AsyncController(_ControllerClassMethodMixin, BaseController): * :class:`stem.connection.AuthenticationFailure` if unable to authenticate """
- with self._msg_lock: + async with self._msg_lock: await self.connect() self.clear_cache() await self.authenticate(*args, **kwargs)