commit 5d29a263130c47ec399dcdb5ef9994792a854cff Author: Damian Johnson atagar@torproject.org Date: Wed Jul 22 18:11:41 2020 -0700
All client usage failed with ValueErrors
When our Relay class sends a message it first drains its socket of unread data...
async def _msg(self, cell): await self._orport.recv(timeout = 0) await self._orport.send(cell.pack(self.link_protocol)) response = await self._orport.recv(timeout = 1) yield stem.client.cell.Cell.pop(response, self.link_protocol)[0]
This in turn called asyncio.wait_for() with a timeout value of zero which returns immediately, leaving our socket undrained.
Our following recv() is then polluted with unexpected data. For instance, this is caused anything that uses create_circuit() (such as descriptor downloads) to fail with confusing exceptions such as...
====================================================================== ERROR: test_downloading_via_orport ---------------------------------------------------------------------- Traceback (most recent call last): File "/home/atagar/Desktop/stem/test/require.py", line 60, in wrapped return func(self, *args, **kwargs) File "/home/atagar/Desktop/stem/test/require.py", line 75, in wrapped return func(self, *args, **kwargs) File "/home/atagar/Desktop/stem/test/require.py", line 75, in wrapped return func(self, *args, **kwargs) File "/home/atagar/Desktop/stem/test/integ/descriptor/remote.py", line 27, in test_downloading_via_orport fall_back_to_authority = False, File "/home/atagar/Desktop/stem/stem/util/__init__.py", line 363, in _run_async_method return future.result() File "/home/atagar/Python-3.7.0/Lib/concurrent/futures/_base.py", line 432, in result return self.__get_result() File "/home/atagar/Python-3.7.0/Lib/concurrent/futures/_base.py", line 384, in __get_result raise self._exception File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 469, in run return [desc async for desc in self._run(suppress)] File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 469, in <listcomp> return [desc async for desc in self._run(suppress)] File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 482, in _run raise self.error File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 549, in _download_descriptors response = await asyncio.wait_for(self._download_from(endpoint), time_remaining) File "/home/atagar/Python-3.7.0/Lib/asyncio/tasks.py", line 384, in wait_for return await fut File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 588, in _download_from async with await relay.create_circuit() as circ: File "/home/atagar/Desktop/stem/stem/client/__init__.py", line 270, in create_circuit async for cell in self._msg(create_fast_cell): File "/home/atagar/Desktop/stem/stem/client/__init__.py", line 226, in _msg yield stem.client.cell.Cell.pop(response, self.link_protocol)[0] File "/home/atagar/Desktop/stem/stem/client/cell.py", line 182, in pop cls = Cell.by_value(command) File "/home/atagar/Desktop/stem/stem/client/cell.py", line 139, in by_value raise ValueError("'%s' isn't a valid cell value" % value) ValueError: '65' isn't a valid cell value
This also reverts our Relay's '_orport_lock' back to a threaded RLock because asyncio locks are not reentrant, causing methods such as directory() (which call _send()) to deadlock upon themselves. We might drop this lock entirely in the future (thread safety should be moot now that the stem.client module is fully asynchronous). --- stem/client/__init__.py | 18 +++++++++--------- stem/socket.py | 7 +++---- test/unit/descriptor/remote.py | 2 +- 3 files changed, 13 insertions(+), 14 deletions(-)
diff --git a/stem/client/__init__.py b/stem/client/__init__.py index 8c8da923..877d60e2 100644 --- a/stem/client/__init__.py +++ b/stem/client/__init__.py @@ -25,8 +25,8 @@ a wrapper for :class:`~stem.socket.RelaySocket`, much the same way as +- close - closes this circuit """
-import asyncio import hashlib +import threading
import stem import stem.client.cell @@ -71,7 +71,7 @@ class Relay(object): self.link_protocol = LinkProtocol(link_protocol) self._orport = orport self._orport_buffer = b'' # unread bytes - self._orport_lock = asyncio.Lock() + self._orport_lock = threading.RLock() self._circuits = {} # type: Dict[int, stem.client.Circuit]
@staticmethod @@ -162,7 +162,7 @@ class Relay(object): :returns: next :class:`~stem.client.cell.Cell` """
- async with self._orport_lock: + with self._orport_lock: # cells begin with [circ_id][cell_type][...]
circ_id_size = self.link_protocol.circ_id_size.size @@ -253,7 +253,7 @@ class Relay(object): :func:`~stem.socket.BaseSocket.close` method. """
- async with self._orport_lock: + with self._orport_lock: return await self._orport.close()
async def create_circuit(self) -> 'stem.client.Circuit': @@ -261,7 +261,7 @@ class Relay(object): Establishes a new circuit. """
- async with self._orport_lock: + with self._orport_lock: circ_id = max(self._circuits) + 1 if self._circuits else self.link_protocol.first_circ_id
create_fast_cell = stem.client.cell.CreateFastCell(circ_id) @@ -286,7 +286,7 @@ class Relay(object): return circ
async def __aiter__(self) -> AsyncIterator['stem.client.Circuit']: - async with self._orport_lock: + with self._orport_lock: for circ in self._circuits.values(): yield circ
@@ -338,7 +338,7 @@ class Circuit(object): :returns: **str** with the requested descriptor data """
- async with self.relay._orport_lock: + with self.relay._orport_lock: await self._send(RelayCommand.BEGIN_DIR, stream_id = stream_id) await self._send(RelayCommand.DATA, request, stream_id = stream_id)
@@ -372,7 +372,7 @@ class Circuit(object): :param stream_id: specific stream this concerns """
- async with self.relay._orport_lock: + with self.relay._orport_lock: # Encrypt and send the cell. Our digest/key only updates if the cell is # successfully sent.
@@ -384,7 +384,7 @@ class Circuit(object): self.forward_key = forward_key
async def close(self) -> None: - async with self.relay._orport_lock: + with self.relay._orport_lock: await self.relay._orport.send(stem.client.cell.DestroyCell(self.id).pack(self.relay.link_protocol)) del self.relay._circuits[self.id]
diff --git a/stem/socket.py b/stem/socket.py index a19a8f25..af52fffa 100644 --- a/stem/socket.py +++ b/stem/socket.py @@ -362,13 +362,12 @@ class RelaySocket(BaseSocket): """
async def wrapped_recv(reader: asyncio.StreamReader) -> Optional[bytes]: - read_coroutine = reader.read(1024) if timeout is None: - return await read_coroutine + return await reader.read(1024) else: try: - return await asyncio.wait_for(read_coroutine, timeout) - except (asyncio.TimeoutError, ssl.SSLError, ssl.SSLWantReadError): + return await asyncio.wait_for(reader.read(1024), max(timeout, 0.0001)) + except asyncio.TimeoutError: return None
return await self._recv(wrapped_recv) diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py index 6b9ca3ad..7447fd16 100644 --- a/test/unit/descriptor/remote.py +++ b/test/unit/descriptor/remote.py @@ -98,7 +98,7 @@ class TestDescriptorDownloader(unittest.TestCase): self.assertEqual('moria1', desc.nickname) self.assertEqual('128.31.0.34', desc.address) self.assertEqual('9695DFC35FFEB861329B9F1AB04C46397020CE31', desc.fingerprint) - self.assertEqual(TEST_DESCRIPTOR, desc.get_bytes()) + self.assertEqual(TEST_DESCRIPTOR.rstrip(), desc.get_bytes())
reply.stop()