commit 5d29a263130c47ec399dcdb5ef9994792a854cff
Author: Damian Johnson <atagar(a)torproject.org>
Date: Wed Jul 22 18:11:41 2020 -0700
All client usage failed with ValueErrors
When our Relay class sends a message it first drains its socket of unread
data...
async def _msg(self, cell):
await self._orport.recv(timeout = 0)
await self._orport.send(cell.pack(self.link_protocol))
response = await self._orport.recv(timeout = 1)
yield stem.client.cell.Cell.pop(response, self.link_protocol)[0]
This in turn called asyncio.wait_for() with a timeout value of zero which
returns immediately, leaving our socket undrained.
Our following recv() is then polluted with unexpected data. For instance,
this is caused anything that uses create_circuit() (such as descriptor
downloads) to fail with confusing exceptions such as...
======================================================================
ERROR: test_downloading_via_orport
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/atagar/Desktop/stem/test/require.py", line 60, in wrapped
return func(self, *args, **kwargs)
File "/home/atagar/Desktop/stem/test/require.py", line 75, in wrapped
return func(self, *args, **kwargs)
File "/home/atagar/Desktop/stem/test/require.py", line 75, in wrapped
return func(self, *args, **kwargs)
File "/home/atagar/Desktop/stem/test/integ/descriptor/remote.py", line 27, in test_downloading_via_orport
fall_back_to_authority = False,
File "/home/atagar/Desktop/stem/stem/util/__init__.py", line 363, in _run_async_method
return future.result()
File "/home/atagar/Python-3.7.0/Lib/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/home/atagar/Python-3.7.0/Lib/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 469, in run
return [desc async for desc in self._run(suppress)]
File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 469, in <listcomp>
return [desc async for desc in self._run(suppress)]
File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 482, in _run
raise self.error
File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 549, in _download_descriptors
response = await asyncio.wait_for(self._download_from(endpoint), time_remaining)
File "/home/atagar/Python-3.7.0/Lib/asyncio/tasks.py", line 384, in wait_for
return await fut
File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 588, in _download_from
async with await relay.create_circuit() as circ:
File "/home/atagar/Desktop/stem/stem/client/__init__.py", line 270, in create_circuit
async for cell in self._msg(create_fast_cell):
File "/home/atagar/Desktop/stem/stem/client/__init__.py", line 226, in _msg
yield stem.client.cell.Cell.pop(response, self.link_protocol)[0]
File "/home/atagar/Desktop/stem/stem/client/cell.py", line 182, in pop
cls = Cell.by_value(command)
File "/home/atagar/Desktop/stem/stem/client/cell.py", line 139, in by_value
raise ValueError("'%s' isn't a valid cell value" % value)
ValueError: '65' isn't a valid cell value
This also reverts our Relay's '_orport_lock' back to a threaded RLock because
asyncio locks are not reentrant, causing methods such as directory() (which
call _send()) to deadlock upon themselves. We might drop this lock entirely in
the future (thread safety should be moot now that the stem.client module is
fully asynchronous).
---
stem/client/__init__.py | 18 +++++++++---------
stem/socket.py | 7 +++----
test/unit/descriptor/remote.py | 2 +-
3 files changed, 13 insertions(+), 14 deletions(-)
diff --git a/stem/client/__init__.py b/stem/client/__init__.py
index 8c8da923..877d60e2 100644
--- a/stem/client/__init__.py
+++ b/stem/client/__init__.py
@@ -25,8 +25,8 @@ a wrapper for :class:`~stem.socket.RelaySocket`, much the same way as
+- close - closes this circuit
"""
-import asyncio
import hashlib
+import threading
import stem
import stem.client.cell
@@ -71,7 +71,7 @@ class Relay(object):
self.link_protocol = LinkProtocol(link_protocol)
self._orport = orport
self._orport_buffer = b'' # unread bytes
- self._orport_lock = asyncio.Lock()
+ self._orport_lock = threading.RLock()
self._circuits = {} # type: Dict[int, stem.client.Circuit]
@staticmethod
@@ -162,7 +162,7 @@ class Relay(object):
:returns: next :class:`~stem.client.cell.Cell`
"""
- async with self._orport_lock:
+ with self._orport_lock:
# cells begin with [circ_id][cell_type][...]
circ_id_size = self.link_protocol.circ_id_size.size
@@ -253,7 +253,7 @@ class Relay(object):
:func:`~stem.socket.BaseSocket.close` method.
"""
- async with self._orport_lock:
+ with self._orport_lock:
return await self._orport.close()
async def create_circuit(self) -> 'stem.client.Circuit':
@@ -261,7 +261,7 @@ class Relay(object):
Establishes a new circuit.
"""
- async with self._orport_lock:
+ with self._orport_lock:
circ_id = max(self._circuits) + 1 if self._circuits else self.link_protocol.first_circ_id
create_fast_cell = stem.client.cell.CreateFastCell(circ_id)
@@ -286,7 +286,7 @@ class Relay(object):
return circ
async def __aiter__(self) -> AsyncIterator['stem.client.Circuit']:
- async with self._orport_lock:
+ with self._orport_lock:
for circ in self._circuits.values():
yield circ
@@ -338,7 +338,7 @@ class Circuit(object):
:returns: **str** with the requested descriptor data
"""
- async with self.relay._orport_lock:
+ with self.relay._orport_lock:
await self._send(RelayCommand.BEGIN_DIR, stream_id = stream_id)
await self._send(RelayCommand.DATA, request, stream_id = stream_id)
@@ -372,7 +372,7 @@ class Circuit(object):
:param stream_id: specific stream this concerns
"""
- async with self.relay._orport_lock:
+ with self.relay._orport_lock:
# Encrypt and send the cell. Our digest/key only updates if the cell is
# successfully sent.
@@ -384,7 +384,7 @@ class Circuit(object):
self.forward_key = forward_key
async def close(self) -> None:
- async with self.relay._orport_lock:
+ with self.relay._orport_lock:
await self.relay._orport.send(stem.client.cell.DestroyCell(self.id).pack(self.relay.link_protocol))
del self.relay._circuits[self.id]
diff --git a/stem/socket.py b/stem/socket.py
index a19a8f25..af52fffa 100644
--- a/stem/socket.py
+++ b/stem/socket.py
@@ -362,13 +362,12 @@ class RelaySocket(BaseSocket):
"""
async def wrapped_recv(reader: asyncio.StreamReader) -> Optional[bytes]:
- read_coroutine = reader.read(1024)
if timeout is None:
- return await read_coroutine
+ return await reader.read(1024)
else:
try:
- return await asyncio.wait_for(read_coroutine, timeout)
- except (asyncio.TimeoutError, ssl.SSLError, ssl.SSLWantReadError):
+ return await asyncio.wait_for(reader.read(1024), max(timeout, 0.0001))
+ except asyncio.TimeoutError:
return None
return await self._recv(wrapped_recv)
diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py
index 6b9ca3ad..7447fd16 100644
--- a/test/unit/descriptor/remote.py
+++ b/test/unit/descriptor/remote.py
@@ -98,7 +98,7 @@ class TestDescriptorDownloader(unittest.TestCase):
self.assertEqual('moria1', desc.nickname)
self.assertEqual('128.31.0.34', desc.address)
self.assertEqual('9695DFC35FFEB861329B9F1AB04C46397020CE31', desc.fingerprint)
- self.assertEqual(TEST_DESCRIPTOR, desc.get_bytes())
+ self.assertEqual(TEST_DESCRIPTOR.rstrip(), desc.get_bytes())
reply.stop()