commit 1fc0535bf3d423f361755c03ba5ff51c4c91593b Author: Illia Volochii illia.volochii@gmail.com Date: Sat Apr 25 19:08:07 2020 +0300
Create a synchronous version of `recv_message` --- stem/socket.py | 120 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+)
diff --git a/stem/socket.py b/stem/socket.py index a82c4892..e8ae492b 100644 --- a/stem/socket.py +++ b/stem/socket.py @@ -690,6 +690,126 @@ async def recv_message(reader: asyncio.StreamReader, arrived_at: Optional[float] raise stem.ProtocolError("Unrecognized divider type '%s': %s" % (divider, stem.util.str_tools._to_unicode(line)))
+def recv_message_from_bytes_io(reader: asyncio.StreamReader, arrived_at: Optional[float] = None) -> 'stem.response.ControlMessage': + """ + Pulls from a control socket until we either have a complete message or + encounter a problem. + + :param file control_file: file derived from the control socket (see the + socket's makefile() method for more information) + + :returns: :class:`~stem.response.ControlMessage` read from the socket + + :raises: + * :class:`stem.ProtocolError` the content from the socket is malformed + * :class:`stem.SocketClosed` if the socket closes before we receive + a complete message + """ + + parsed_content, raw_content, first_line = None, None, True + + while True: + try: + line = reader.readline() + except AttributeError: + # if the control_file has been closed then we will receive: + # AttributeError: 'NoneType' object has no attribute 'recv' + + log.info(ERROR_MSG % ('SocketClosed', 'socket file has been closed')) + raise stem.SocketClosed('socket file has been closed') + except (OSError, ValueError) as exc: + # when disconnected this errors with... + # + # * ValueError: I/O operation on closed file + # * OSError: [Errno 107] Transport endpoint is not connected + # * OSError: [Errno 9] Bad file descriptor + + log.info(ERROR_MSG % ('SocketClosed', 'received exception "%s"' % exc)) + raise stem.SocketClosed(exc) + + # Parses the tor control lines. These are of the form... + # <status code><divider><content>\r\n + + if not line: + # if the socket is disconnected then the readline() method will provide + # empty content + + log.info(ERROR_MSG % ('SocketClosed', 'empty socket content')) + raise stem.SocketClosed('Received empty socket content.') + elif not MESSAGE_PREFIX.match(line): + log.info(ERROR_MSG % ('ProtocolError', 'malformed status code/divider, "%s"' % log.escape(line))) + raise stem.ProtocolError('Badly formatted reply line: beginning is malformed') + elif not line.endswith(b'\r\n'): + log.info(ERROR_MSG % ('ProtocolError', 'no CRLF linebreak, "%s"' % log.escape(line))) + raise stem.ProtocolError('All lines should end with CRLF') + + status_code, divider, content = line[:3], line[3:4], line[4:-2] # strip CRLF off content + + status_code = stem.util.str_tools._to_unicode(status_code) + divider = stem.util.str_tools._to_unicode(divider) + + # Most controller responses are single lines, in which case we don't need + # so much overhead. + + if first_line: + if divider == ' ': + _log_trace(line) + return stem.response.ControlMessage([(status_code, divider, content)], line, arrived_at = arrived_at) + else: + parsed_content, raw_content, first_line = [], bytearray(), False + + raw_content += line + + if divider == '-': + # mid-reply line, keep pulling for more content + parsed_content.append((status_code, divider, content)) + elif divider == ' ': + # end of the message, return the message + parsed_content.append((status_code, divider, content)) + _log_trace(bytes(raw_content)) + return stem.response.ControlMessage(parsed_content, bytes(raw_content), arrived_at = arrived_at) + elif divider == '+': + # data entry, all of the following lines belong to the content until we + # get a line with just a period + + content_block = bytearray(content) + + while True: + try: + line = reader.readline() + raw_content += line + except socket.error as exc: + log.info(ERROR_MSG % ('SocketClosed', 'received an exception while mid-way through a data reply (exception: "%s", read content: "%s")' % (exc, log.escape(bytes(raw_content))))) + raise stem.SocketClosed(exc) + + if not line.endswith(b'\r\n'): + log.info(ERROR_MSG % ('ProtocolError', 'CRLF linebreaks missing from a data reply, "%s"' % log.escape(bytes(raw_content)))) + raise stem.ProtocolError('All lines should end with CRLF') + elif line == b'.\r\n': + break # data block termination + + line = line[:-2] # strips off the CRLF + + # lines starting with a period are escaped by a second period (as per + # section 2.4 of the control-spec) + + if line.startswith(b'..'): + line = line[1:] + + content_block += b'\n' + line + + # joins the content using a newline rather than CRLF separator (more + # conventional for multi-line string content outside the windows world) + + parsed_content.append((status_code, divider, bytes(content_block))) + else: + # this should never be reached due to the prefix regex, but might as well + # be safe... + + log.warn(ERROR_MSG % ('ProtocolError', ""%s" isn't a recognized divider type" % divider)) + raise stem.ProtocolError("Unrecognized divider type '%s': %s" % (divider, stem.util.str_tools._to_unicode(line))) + + def send_formatting(message: str) -> str: """ Performs the formatting expected from sent control messages. For more