commit e2d8575ce491e40f4412efc6a12db31158c7e7dc Author: Damian Johnson atagar@torproject.org Date: Wed Jan 2 12:32:23 2019 -0800
Remove hardcoded buffer size from ORPort sockets
When reading ORPort data that exceeded a hardcode (and arbitrary) buffer size we cropped the content. This was caught by starlight when attempting to use one of our demo scripts...
https://trac.torproject.org/projects/tor/ticket/28961 https://stem.torproject.org/tutorials/examples/download_descriptor.html
Original traceback: File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 589, in _download_descriptors self.content, self.reply_headers = _download_from_orport(endpoint, self.compression, self.resource) File "/home/atagar/Desktop/stem/stem/descriptor/remote.py", line 998, in _download_from_orport response = b''.join([cell.data for cell in circ.send(RelayCommand.DATA, request, stream_id = 1)]) File "/home/atagar/Desktop/stem/stem/client/__init__.py", line 268, in send decrypted_cell, backward_key, backward_digest = stem.client.cell.RelayCell.decrypt(self.relay.link_protocol, encrypted_cell, self.backward_key, self.backward_digest) File "/home/atagar/Desktop/stem/stem/client/cell.py", line 412, in decrypt raise stem.ProtocolError('RELAY cells should be %i bytes, but received %i' % (link_protocol.fixed_cell_length, len(content))) ProtocolError: RELAY cells should be 512 bytes, but received 464
I'm unhappy with this approach, but after three days of chewing on this it's the least bad approach I've come up with and seems to work. Patches welcome if there's a smarter way of going about this. --- stem/client/__init__.py | 43 ++++++++++++++++++++++++++++++++++++++----- stem/socket.py | 26 ++++++++++++++++++-------- 2 files changed, 56 insertions(+), 13 deletions(-)
diff --git a/stem/client/__init__.py b/stem/client/__init__.py index 3fb68a69..1da008fb 100644 --- a/stem/client/__init__.py +++ b/stem/client/__init__.py @@ -130,6 +130,38 @@ class Relay(object):
return Relay(conn, link_protocol)
+ def _msg(self, cell): + """ + Sends a cell on the ORPort and provides the response we receive in reply. + + Unfortunately unlike control sockets, ORPorts don't have generalized rules + for predictable message IO. With control sockets... + + * Each message we send receives a single reply. + * We may also receive asynchronous events marked with a 650 status. + + ORPorts by contrast receive variable length cells with differing rules on + their arrival. As such making a best effort attempt at a send-and-receive + method in which we do the following... + + * Discard any existing unread data from the socket. + * Send our request. + * Await up to a second for a reply. + + It's quite possible this is a stupid approach. If so, patches welcome. + + :param stem.client.cell.Cell cell: cell to be sent + + :returns: **generator** with the cells received in reply + """ + + self._orport.recv(timeout = 0) # discard unread data + self._orport.send(cell.pack(self.link_protocol)) + response = self._orport.recv(timeout = 1) + + for received_cell in stem.client.cell.Cell.pop(response, self.link_protocol): + yield received_cell + def is_alive(self): """ Checks if our socket is currently connected. This is a pass-through for our @@ -170,15 +202,16 @@ class Relay(object): circ_id = max(self._circuits) + 1 if self._circuits else self.link_protocol.first_circ_id
create_fast_cell = stem.client.cell.CreateFastCell(circ_id) - self._orport.send(create_fast_cell.pack(self.link_protocol)) + created_fast_cell = None
- response = stem.client.cell.Cell.unpack(self._orport.recv(), self.link_protocol) - created_fast_cells = filter(lambda cell: isinstance(cell, stem.client.cell.CreatedFastCell), response) + for cell in self._msg(create_fast_cell): + if isinstance(cell, stem.client.cell.CreatedFastCell): + created_fast_cell = cell + break
- if not created_fast_cells: + if not created_fast_cell: raise ValueError('We should get a CREATED_FAST response from a CREATE_FAST request')
- created_fast_cell = list(created_fast_cells)[0] kdf = KDF.from_value(create_fast_cell.key_material + created_fast_cell.key_material)
if created_fast_cell.derivative_key != kdf.key_hash: diff --git a/stem/socket.py b/stem/socket.py index 2040b950..9290d6c6 100644 --- a/stem/socket.py +++ b/stem/socket.py @@ -91,10 +91,6 @@ ERROR_MSG = 'Error while receiving a control message (%s): %s'
TRUNCATE_LOGS = 10
-# maximum number of bytes to read at a time from a relay socket - -MAX_READ_BUFFER_LEN = 10 * 1024 * 1024 -
class BaseSocket(object): """ @@ -389,10 +385,13 @@ class RelaySocket(BaseSocket):
self._send(message, lambda s, sf, msg: _write_to_socket(sf, msg))
- def recv(self): + def recv(self, timeout = None): """ Receives a message from the relay.
+ :param float timeout: maxiumum number of seconds to await a response, this + blocks indefinitely if **None** + :returns: bytes for the message received
:raises: @@ -400,10 +399,21 @@ class RelaySocket(BaseSocket): * :class:`stem.SocketClosed` if the socket closes before we receive a complete message """
- # TODO: Is MAX_READ_BUFFER_LEN defined in the spec? Not sure where it came - # from. + def wrapped_recv(s, sf): + if timeout is None: + return s.recv() + else: + s.setblocking(0) + s.settimeout(timeout) + + try: + return s.recv() + except ssl.SSLWantReadError: + return None + finally: + s.setblocking(1)
- return self._recv(lambda s, sf: s.recv(MAX_READ_BUFFER_LEN)) + return self._recv(wrapped_recv)
def is_localhost(self): return self.address == '127.0.0.1'