commit 4be6695a38c96f5fcba58d08e7f86ea1f6647c2e Author: Damian Johnson atagar@torproject.org Date: Sat Jan 26 17:40:45 2019 -0800
ORPort directory requests cropped
Great catch from starlight that if descriptors downloaded via an ORPort exceeding a certain size get cropped...
https://trac.torproject.org/projects/tor/ticket/28961
There were a couple issues here...
* Our socket handling was pretty screwed up. Calling socket's recv() provides data available at the time, so if we haven't received the a full cell we should pull more.
* Descriptors can be composed of multiple RELAY cells (the descriptor was cropped because we stopped reading after the first). Directory requests now keep reading cells we receive the END signal. --- stem/client/__init__.py | 125 ++++++++++++++++++++++++++++++++--------- stem/client/cell.py | 7 ++- stem/descriptor/remote.py | 4 +- test/task.py | 1 - test/unit/descriptor/remote.py | 2 +- 5 files changed, 105 insertions(+), 34 deletions(-)
diff --git a/stem/client/__init__.py b/stem/client/__init__.py index 1da008fb..76b5addc 100644 --- a/stem/client/__init__.py +++ b/stem/client/__init__.py @@ -33,7 +33,20 @@ import stem.client.cell import stem.socket import stem.util.connection
-from stem.client.datatype import ZERO, LinkProtocol, Address, KDF, split +from stem.client.cell import ( + CELL_TYPE_SIZE, + FIXED_PAYLOAD_LEN, + Cell, +) + +from stem.client.datatype import ( + ZERO, + Address, + KDF, + LinkProtocol, + RelayCommand, + split, +)
__all__ = [ 'cell', @@ -51,8 +64,17 @@ class Relay(object): """
def __init__(self, orport, link_protocol): + # TODO: Python 3.x adds a getbuffer() method which + # lets us get the size... + # + # https://stackoverflow.com/questions/26827055/python-how-to-get-iobytes-alloc... + # + # When we drop python 2.x support we should replace + # self._orport_buffer with an io.BytesIO. + self.link_protocol = LinkProtocol(link_protocol) self._orport = orport + self._orport_buffer = b'' # unread bytes self._orport_lock = threading.RLock() self._circuits = {}
@@ -130,6 +152,47 @@ class Relay(object):
return Relay(conn, link_protocol)
+ def _recv(self, raw = False): + """ + Reads the next cell from our ORPort. If none is present this blocks + until one is available. + + :param bool raw: provides bytes rather than parsing as a cell if **True** + + :returns: next :class:`~stem.client.cell.Cell` + """ + + with self._orport_lock: + # cells begin with [circ_id][cell_type][...] + + circ_id_size = self.link_protocol.circ_id_size.size + + while len(self._orport_buffer) < (circ_id_size + CELL_TYPE_SIZE.size): + self._orport_buffer += self._orport.recv() # read until we know the cell type + + cell_type = Cell.by_value(CELL_TYPE_SIZE.pop(self._orport_buffer[circ_id_size:])[0]) + + if cell_type.IS_FIXED_SIZE: + cell_size = circ_id_size + CELL_TYPE_SIZE.size + FIXED_PAYLOAD_LEN + else: + # variable length, our next field is the payload size + + while len(self._orport_buffer) < (circ_id_size + CELL_TYPE_SIZE.size + FIXED_PAYLOAD_LEN.size): + self._orport_buffer += self._orport.recv() # read until we know the cell size + + payload_len = FIXED_PAYLOAD_LEN.pop(self._orport_buffer[circ_id_size + CELL_TYPE_SIZE.size:])[0] + cell_size = circ_id_size + CELL_TYPE_SIZE.size + FIXED_PAYLOAD_LEN.size + payload_len + + while len(self._orport_buffer) < cell_size: + self._orport_buffer += self._orport.recv() # read until we have the full cell + + if raw: + content, self._orport_buffer = split(self._orport_buffer, cell_size) + return content + else: + cell, self._orport_buffer = Cell.pop(self._orport_buffer, self.link_protocol) + return cell + def _msg(self, cell): """ Sends a cell on the ORPort and provides the response we receive in reply. @@ -263,41 +326,28 @@ class Circuit(object): self.forward_key = Cipher(algorithms.AES(kdf.forward_key), ctr, default_backend()).encryptor() self.backward_key = Cipher(algorithms.AES(kdf.backward_key), ctr, default_backend()).decryptor()
- def send(self, command, data = '', stream_id = 0): + def directory(self, request, stream_id = 0): """ - Sends a message over the circuit. + Request descriptors from the relay.
- :param stem.client.datatype.RelayCommand command: command to be issued - :param bytes data: message payload + :param str request: directory request to make :param int stream_id: specific stream this concerns
- :returns: **list** of :class:`~stem.client.cell.RelayCell` responses + :returns: **str** with the requested descriptor data """
with self.relay._orport_lock: - # Encrypt and send the cell. Our digest/key only updates if the cell is - # successfully sent. + self._send(RelayCommand.BEGIN_DIR, stream_id = stream_id) + self._send(RelayCommand.DATA, request, stream_id = stream_id)
- cell = stem.client.cell.RelayCell(self.id, command, data, stream_id = stream_id) - payload, forward_key, forward_digest = cell.encrypt(self.relay.link_protocol, self.forward_key, self.forward_digest) - self.relay._orport.send(payload) - - self.forward_digest = forward_digest - self.forward_key = forward_key - - # Decrypt relay cells received in response. Again, our digest/key only - # updates when handled successfully. - - reply = self.relay._orport.recv() - reply_cells = [] + response = []
- while reply: - reply_cmd = stem.client.datatype.Size.CHAR.pop(reply[self.relay.link_protocol.circ_id_size.size:])[0] + while True: + # Decrypt relay cells received in response. Our digest/key only + # updates when handled successfully.
- if reply_cmd != stem.client.cell.RelayCell.VALUE: - raise stem.ProtocolError('Circuit response should be a series of RELAY cells, but received an unexpected %s (%i)' % (stem.client.cell.Cell.by_value(reply_cmd), reply_cmd)) + encrypted_cell = self.relay._recv(raw = True)
- encrypted_cell, reply = split(reply, self.relay.link_protocol.fixed_cell_length) decrypted_cell, backward_key, backward_digest = stem.client.cell.RelayCell.decrypt(self.relay.link_protocol, encrypted_cell, self.backward_key, self.backward_digest)
if self.id != decrypted_cell.circ_id: @@ -306,9 +356,30 @@ class Circuit(object): self.backward_digest = backward_digest self.backward_key = backward_key
- reply_cells.append(decrypted_cell) + if decrypted_cell.command == RelayCommand.END: + return b''.join([cell.data for cell in response]) + else: + response.append(decrypted_cell) + + def _send(self, command, data = '', stream_id = 0): + """ + Sends a message over the circuit. + + :param stem.client.datatype.RelayCommand command: command to be issued + :param bytes data: message payload + :param int stream_id: specific stream this concerns + """ + + with self.relay._orport_lock: + # Encrypt and send the cell. Our digest/key only updates if the cell is + # successfully sent. + + cell = stem.client.cell.RelayCell(self.id, command, data, stream_id = stream_id) + payload, forward_key, forward_digest = cell.encrypt(self.relay.link_protocol, self.forward_key, self.forward_digest) + self.relay._orport.send(payload)
- return reply_cells + self.forward_digest = forward_digest + self.forward_key = forward_key
def close(self): with self.relay._orport_lock: diff --git a/stem/client/cell.py b/stem/client/cell.py index 7aae7fbb..94862b38 100644 --- a/stem/client/cell.py +++ b/stem/client/cell.py @@ -51,6 +51,9 @@ from stem.util import datetime_to_unix, str_tools
FIXED_PAYLOAD_LEN = 509 # PAYLOAD_LEN, per tor-spec section 0.2 AUTH_CHALLENGE_SIZE = 32 + +CELL_TYPE_SIZE = Size.CHAR +PAYLOAD_LEN_SIZE = Size.SHORT RELAY_DIGEST_SIZE = Size.LONG
STREAM_ID_REQUIRED = ( @@ -169,13 +172,13 @@ class Cell(object): link_protocol = LinkProtocol(link_protocol)
circ_id, content = link_protocol.circ_id_size.pop(content) - command, content = Size.CHAR.pop(content) + command, content = CELL_TYPE_SIZE.pop(content) cls = Cell.by_value(command)
if cls.IS_FIXED_SIZE: payload_len = FIXED_PAYLOAD_LEN else: - payload_len, content = Size.SHORT.pop(content) + payload_len, content = PAYLOAD_LEN_SIZE.pop(content)
if len(content) < payload_len: raise ValueError('%s cell should have a payload of %i bytes, but only had %i' % (cls.NAME, payload_len, len(content))) diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py index 9eb639ad..6ff1e794 100644 --- a/stem/descriptor/remote.py +++ b/stem/descriptor/remote.py @@ -107,7 +107,6 @@ import stem.directory import stem.prereq import stem.util.enum
-from stem.client.datatype import RelayCommand from stem.util import log, str_tools
try: @@ -966,8 +965,7 @@ def _download_from_orport(endpoint, compression, resource): 'User-Agent: %s' % stem.USER_AGENT, )) + '\r\n\r\n'
- circ.send(RelayCommand.BEGIN_DIR, stream_id = 1) - response = b''.join([cell.data for cell in circ.send(RelayCommand.DATA, request, stream_id = 1)]) + response = circ.directory(request, stream_id = 1) first_line, data = response.split(b'\r\n', 1) header_data, body_data = data.split(b'\r\n\r\n', 1)
diff --git a/test/task.py b/test/task.py index 14626356..c527a0c7 100644 --- a/test/task.py +++ b/test/task.py @@ -86,7 +86,6 @@ def _check_tor_version(tor_path): version = test.tor_version(tor_path) version_str = str(version).split()[0]
- if version.git_commit: return '%s (commit %s)' % (version_str, version.git_commit[:8]) else: diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py index d2688775..858ad702 100644 --- a/test/unit/descriptor/remote.py +++ b/test/unit/descriptor/remote.py @@ -95,7 +95,7 @@ def _orport_mock(data, encoding = 'identity', response_code_header = None): connect_mock = MagicMock() relay_mock = connect_mock().__enter__() circ_mock = relay_mock.create_circuit().__enter__() - circ_mock.send.return_value = cells + circ_mock.directory.return_value = data return connect_mock
tor-commits@lists.torproject.org