[tor-commits] [stem/master] Copy "stem/socket.py" to "stem/async_socket.py"

atagar at torproject.org atagar at torproject.org
Thu Jul 16 01:28:57 UTC 2020


commit bba3029f8d710b26cc0299ce086d31b28104c897
Author: Illia Volochii <illia.volochii at gmail.com>
Date:   Sat Mar 14 21:57:20 2020 +0200

    Copy "stem/socket.py" to "stem/async_socket.py"
---
 stem/async_socket.py | 768 +++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 768 insertions(+)

diff --git a/stem/async_socket.py b/stem/async_socket.py
new file mode 100644
index 00000000..2ef42dd5
--- /dev/null
+++ b/stem/async_socket.py
@@ -0,0 +1,768 @@
+# Copyright 2011-2020, Damian Johnson and The Tor Project
+# See LICENSE for licensing information
+
+"""
+Supports communication with sockets speaking Tor protocols. This
+allows us to send messages as basic strings, and receive responses as
+:class:`~stem.response.ControlMessage` instances.
+
+**This module only consists of low level components, and is not intended for
+users.** See our `tutorials <../tutorials.html>`_ and `Control Module
+<control.html>`_ if you're new to Stem and looking to get started.
+
+With that aside, these can still be used for raw socket communication with
+Tor...
+
+::
+
+  import stem
+  import stem.connection
+  import stem.socket
+
+  if __name__ == '__main__':
+    try:
+      control_socket = stem.socket.ControlPort(port = 9051)
+      stem.connection.authenticate(control_socket)
+    except stem.SocketError as exc:
+      print 'Unable to connect to tor on port 9051: %s' % exc
+      sys.exit(1)
+    except stem.connection.AuthenticationFailure as exc:
+      print 'Unable to authenticate: %s' % exc
+      sys.exit(1)
+
+    print "Issuing 'GETINFO version' query...\\n"
+    control_socket.send('GETINFO version')
+    print control_socket.recv()
+
+::
+
+  % python example.py
+  Issuing 'GETINFO version' query...
+
+  version=0.2.4.10-alpha-dev (git-8be6058d8f31e578)
+  OK
+
+**Module Overview:**
+
+::
+
+  BaseSocket - Thread safe socket.
+    |- RelaySocket - Socket for a relay's ORPort.
+    |  |- send - sends a message to the socket
+    |  +- recv - receives a response from the socket
+    |
+    |- ControlSocket - Socket wrapper that speaks the tor control protocol.
+    |  |- ControlPort - Control connection via a port.
+    |  |- ControlSocketFile - Control connection via a local file socket.
+    |  |
+    |  |- send - sends a message to the socket
+    |  +- recv - receives a ControlMessage from the socket
+    |
+    |- is_alive - reports if the socket is known to be closed
+    |- is_localhost - returns if the socket is for the local system or not
+    |- connection_time - timestamp when socket last connected or disconnected
+    |- connect - connects a new socket
+    |- close - shuts down the socket
+    +- __enter__ / __exit__ - manages socket connection
+
+  send_message - Writes a message to a control socket.
+  recv_message - Reads a ControlMessage from a control socket.
+  send_formatting - Performs the formatting expected from sent messages.
+"""
+
+from __future__ import absolute_import
+
+import re
+import socket
+import ssl
+import threading
+import time
+
+import stem.response
+import stem.util.str_tools
+
+from stem.util import log
+
+MESSAGE_PREFIX = re.compile(b'^[a-zA-Z0-9]{3}[-+ ]')
+ERROR_MSG = 'Error while receiving a control message (%s): %s'
+
+# lines to limit our trace logging to, you can disable this by setting it to None
+
+TRUNCATE_LOGS = 10
+
+
+class BaseSocket(object):
+  """
+  Thread safe socket, providing common socket functionality.
+  """
+
+  def __init__(self):
+    self._socket, self._socket_file = None, None
+    self._is_alive = False
+    self._connection_time = 0.0  # time when we last connected or disconnected
+
+    # Tracks sending and receiving separately. This should be safe, and doing
+    # so prevents deadlock where we block writes because we're waiting to read
+    # a message that isn't coming.
+
+    self._send_lock = threading.RLock()
+    self._recv_lock = threading.RLock()
+
+  def is_alive(self):
+    """
+    Checks if the socket is known to be closed. We won't be aware if it is
+    until we either use it or have explicitily shut it down.
+
+    In practice a socket derived from a port knows about its disconnection
+    after failing to receive data, whereas socket file derived connections
+    know after either sending or receiving data.
+
+    This means that to have reliable detection for when we're disconnected
+    you need to continually pull from the socket (which is part of what the
+    :class:`~stem.control.BaseController` does).
+
+    :returns: **bool** that's **True** if our socket is connected and **False**
+      otherwise
+    """
+
+    return self._is_alive
+
+  def is_localhost(self):
+    """
+    Returns if the connection is for the local system or not.
+
+    :returns: **bool** that's **True** if the connection is for the local host
+      and **False** otherwise
+    """
+
+    return False
+
+  def connection_time(self):
+    """
+    Provides the unix timestamp for when our socket was either connected or
+    disconnected. That is to say, the time we connected if we're currently
+    connected and the time we disconnected if we're not connected.
+
+    .. versionadded:: 1.3.0
+
+    :returns: **float** for when we last connected or disconnected, zero if
+      we've never connected
+    """
+
+    return self._connection_time
+
+  def connect(self):
+    """
+    Connects to a new socket, closing our previous one if we're already
+    attached.
+
+    :raises: :class:`stem.SocketError` if unable to make a socket
+    """
+
+    with self._send_lock:
+      # Closes the socket if we're currently attached to one. Once we're no
+      # longer alive it'll be safe to acquire the recv lock because recv()
+      # calls no longer block (raising SocketClosed instead).
+
+      if self.is_alive():
+        self.close()
+
+      with self._recv_lock:
+        self._socket = self._make_socket()
+        self._socket_file = self._socket.makefile(mode = 'rwb')
+        self._is_alive = True
+        self._connection_time = time.time()
+
+        # It's possible for this to have a transient failure...
+        # SocketError: [Errno 4] Interrupted system call
+        #
+        # It's safe to retry, so give it another try if it fails.
+
+        try:
+          self._connect()
+        except stem.SocketError:
+          self._connect()  # single retry
+
+  def close(self):
+    """
+    Shuts down the socket. If it's already closed then this is a no-op.
+    """
+
+    with self._send_lock:
+      # Function is idempotent with one exception: we notify _close() if this
+      # is causing our is_alive() state to change.
+
+      is_change = self.is_alive()
+
+      if self._socket:
+        # if we haven't yet established a connection then this raises an error
+        # socket.error: [Errno 107] Transport endpoint is not connected
+
+        try:
+          self._socket.shutdown(socket.SHUT_RDWR)
+        except socket.error:
+          pass
+
+        self._socket.close()
+
+      if self._socket_file:
+        try:
+          self._socket_file.close()
+        except BrokenPipeError:
+          pass
+
+      self._socket = None
+      self._socket_file = None
+      self._is_alive = False
+      self._connection_time = time.time()
+
+      if is_change:
+        self._close()
+
+  def _send(self, message, handler):
+    """
+    Send message in a thread safe manner. Handler is expected to be of the form...
+
+    ::
+
+      my_handler(socket, socket_file, message)
+    """
+
+    with self._send_lock:
+      try:
+        if not self.is_alive():
+          raise stem.SocketClosed()
+
+        handler(self._socket, self._socket_file, message)
+      except stem.SocketClosed:
+        # if send_message raises a SocketClosed then we should properly shut
+        # everything down
+
+        if self.is_alive():
+          self.close()
+
+        raise
+
+  def _recv(self, handler):
+    """
+    Receives a message in a thread safe manner. Handler is expected to be of the form...
+
+    ::
+
+      my_handler(socket, socket_file)
+    """
+
+    with self._recv_lock:
+      try:
+        # makes a temporary reference to the _socket_file because connect()
+        # and close() may set or unset it
+
+        my_socket, my_socket_file = self._socket, self._socket_file
+
+        if not my_socket or not my_socket_file:
+          raise stem.SocketClosed()
+
+        return handler(my_socket, my_socket_file)
+      except stem.SocketClosed:
+        # If recv_message raises a SocketClosed then we should properly shut
+        # everything down. However, there's a couple cases where this will
+        # cause deadlock...
+        #
+        # * This SocketClosed was *caused by* a close() call, which is joining
+        #   on our thread.
+        #
+        # * A send() call that's currently in flight is about to call close(),
+        #   also attempting to join on us.
+        #
+        # To resolve this we make a non-blocking call to acquire the send lock.
+        # If we get it then great, we can close safely. If not then one of the
+        # above are in progress and we leave the close to them.
+
+        if self.is_alive():
+          if self._send_lock.acquire(False):
+            self.close()
+            self._send_lock.release()
+
+        raise
+
+  def _get_send_lock(self):
+    """
+    The send lock is useful to classes that interact with us at a deep level
+    because it's used to lock :func:`stem.socket.ControlSocket.connect` /
+    :func:`stem.socket.BaseSocket.close`, and by extension our
+    :func:`stem.socket.BaseSocket.is_alive` state changes.
+
+    :returns: **threading.RLock** that governs sending messages to our socket
+      and state changes
+    """
+
+    return self._send_lock
+
+  def __enter__(self):
+    return self
+
+  def __exit__(self, exit_type, value, traceback):
+    self.close()
+
+  def _connect(self):
+    """
+    Connection callback that can be overwritten by subclasses and wrappers.
+    """
+
+    pass
+
+  def _close(self):
+    """
+    Disconnection callback that can be overwritten by subclasses and wrappers.
+    """
+
+    pass
+
+  def _make_socket(self):
+    """
+    Constructs and connects new socket. This is implemented by subclasses.
+
+    :returns: **socket.socket** for our configuration
+
+    :raises:
+      * :class:`stem.SocketError` if unable to make a socket
+      * **NotImplementedError** if not implemented by a subclass
+    """
+
+    raise NotImplementedError('Unsupported Operation: this should be implemented by the BaseSocket subclass')
+
+
+class RelaySocket(BaseSocket):
+  """
+  `Link-level connection
+  <https://gitweb.torproject.org/torspec.git/tree/tor-spec.txt>`_ to a Tor
+  relay.
+
+  .. versionadded:: 1.7.0
+
+  :var str address: address our socket connects to
+  :var int port: ORPort our socket connects to
+  """
+
+  def __init__(self, address = '127.0.0.1', port = 9050, connect = True):
+    """
+    RelaySocket constructor.
+
+    :param str address: ip address of the relay
+    :param int port: orport of the relay
+    :param bool connect: connects to the socket if True, leaves it unconnected otherwise
+
+    :raises: :class:`stem.SocketError` if connect is **True** and we're
+      unable to establish a connection
+    """
+
+    super(RelaySocket, self).__init__()
+    self.address = address
+    self.port = port
+
+    if connect:
+      self.connect()
+
+  def send(self, message):
+    """
+    Sends a message to the relay's ORPort.
+
+    :param str message: message to be formatted and sent to the socket
+
+    :raises:
+      * :class:`stem.SocketError` if a problem arises in using the socket
+      * :class:`stem.SocketClosed` if the socket is known to be shut down
+    """
+
+    self._send(message, lambda s, sf, msg: _write_to_socket(sf, msg))
+
+  def recv(self, timeout = None):
+    """
+    Receives a message from the relay.
+
+    :param float timeout: maxiumum number of seconds to await a response, this
+      blocks indefinitely if **None**
+
+    :returns: bytes for the message received
+
+    :raises:
+      * :class:`stem.ProtocolError` the content from the socket is malformed
+      * :class:`stem.SocketClosed` if the socket closes before we receive a complete message
+    """
+
+    def wrapped_recv(s, sf):
+      if timeout is None:
+        return s.recv()
+      else:
+        s.setblocking(0)
+        s.settimeout(timeout)
+
+        try:
+          return s.recv()
+        except (socket.timeout, ssl.SSLError, ssl.SSLWantReadError):
+          return None
+        finally:
+          s.setblocking(1)
+
+    return self._recv(wrapped_recv)
+
+  def is_localhost(self):
+    return self.address == '127.0.0.1'
+
+  def _make_socket(self):
+    try:
+      relay_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+      relay_socket.connect((self.address, self.port))
+      return ssl.wrap_socket(relay_socket)
+    except socket.error as exc:
+      raise stem.SocketError(exc)
+
+
+class ControlSocket(BaseSocket):
+  """
+  Wrapper for a socket connection that speaks the Tor control protocol. To the
+  better part this transparently handles the formatting for sending and
+  receiving complete messages.
+
+  Callers should not instantiate this class directly, but rather use subclasses
+  which are expected to implement the **_make_socket()** method.
+  """
+
+  def __init__(self):
+    super(ControlSocket, self).__init__()
+
+  def send(self, message):
+    """
+    Formats and sends a message to the control socket. For more information see
+    the :func:`~stem.socket.send_message` function.
+
+    :param str message: message to be formatted and sent to the socket
+
+    :raises:
+      * :class:`stem.SocketError` if a problem arises in using the socket
+      * :class:`stem.SocketClosed` if the socket is known to be shut down
+    """
+
+    self._send(message, lambda s, sf, msg: send_message(sf, msg))
+
+  def recv(self):
+    """
+    Receives a message from the control socket, blocking until we've received
+    one. For more information see the :func:`~stem.socket.recv_message` function.
+
+    :returns: :class:`~stem.response.ControlMessage` for the message received
+
+    :raises:
+      * :class:`stem.ProtocolError` the content from the socket is malformed
+      * :class:`stem.SocketClosed` if the socket closes before we receive a complete message
+    """
+
+    return self._recv(lambda s, sf: recv_message(sf))
+
+
+class ControlPort(ControlSocket):
+  """
+  Control connection to tor. For more information see tor's ControlPort torrc
+  option.
+
+  :var str address: address our socket connects to
+  :var int port: ControlPort our socket connects to
+  """
+
+  def __init__(self, address = '127.0.0.1', port = 9051, connect = True):
+    """
+    ControlPort constructor.
+
+    :param str address: ip address of the controller
+    :param int port: port number of the controller
+    :param bool connect: connects to the socket if True, leaves it unconnected otherwise
+
+    :raises: :class:`stem.SocketError` if connect is **True** and we're
+      unable to establish a connection
+    """
+
+    super(ControlPort, self).__init__()
+    self.address = address
+    self.port = port
+
+    if connect:
+      self.connect()
+
+  def is_localhost(self):
+    return self.address == '127.0.0.1'
+
+  def _make_socket(self):
+    try:
+      control_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+      control_socket.connect((self.address, self.port))
+      return control_socket
+    except socket.error as exc:
+      raise stem.SocketError(exc)
+
+
+class ControlSocketFile(ControlSocket):
+  """
+  Control connection to tor. For more information see tor's ControlSocket torrc
+  option.
+
+  :var str path: filesystem path of the socket we connect to
+  """
+
+  def __init__(self, path = '/var/run/tor/control', connect = True):
+    """
+    ControlSocketFile constructor.
+
+    :param str socket_path: path where the control socket is located
+    :param bool connect: connects to the socket if True, leaves it unconnected otherwise
+
+    :raises: :class:`stem.SocketError` if connect is **True** and we're
+      unable to establish a connection
+    """
+
+    super(ControlSocketFile, self).__init__()
+    self.path = path
+
+    if connect:
+      self.connect()
+
+  def is_localhost(self):
+    return True
+
+  def _make_socket(self):
+    try:
+      control_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+      control_socket.connect(self.path)
+      return control_socket
+    except socket.error as exc:
+      raise stem.SocketError(exc)
+
+
+def send_message(control_file, message, raw = False):
+  """
+  Sends a message to the control socket, adding the expected formatting for
+  single verses multi-line messages. Neither message type should contain an
+  ending newline (if so it'll be treated as a multi-line message with a blank
+  line at the end). If the message doesn't contain a newline then it's sent
+  as...
+
+  ::
+
+    <message>\\r\\n
+
+  and if it does contain newlines then it's split on ``\\n`` and sent as...
+
+  ::
+
+    +<line 1>\\r\\n
+    <line 2>\\r\\n
+    <line 3>\\r\\n
+    .\\r\\n
+
+  :param file control_file: file derived from the control socket (see the
+    socket's makefile() method for more information)
+  :param str message: message to be sent on the control socket
+  :param bool raw: leaves the message formatting untouched, passing it to the
+    socket as-is
+
+  :raises:
+    * :class:`stem.SocketError` if a problem arises in using the socket
+    * :class:`stem.SocketClosed` if the socket is known to be shut down
+  """
+
+  if not raw:
+    message = send_formatting(message)
+
+  _write_to_socket(control_file, message)
+
+  if log.is_tracing():
+    log_message = message.replace('\r\n', '\n').rstrip()
+    msg_div = '\n' if '\n' in log_message else ' '
+    log.trace('Sent to tor:%s%s' % (msg_div, log_message))
+
+
+def _write_to_socket(socket_file, message):
+  try:
+    socket_file.write(stem.util.str_tools._to_bytes(message))
+    socket_file.flush()
+  except socket.error as exc:
+    log.info('Failed to send: %s' % exc)
+
+    # When sending there doesn't seem to be a reliable method for
+    # distinguishing between failures from a disconnect verses other things.
+    # Just accounting for known disconnection responses.
+
+    if str(exc) == '[Errno 32] Broken pipe':
+      raise stem.SocketClosed(exc)
+    else:
+      raise stem.SocketError(exc)
+  except AttributeError:
+    # if the control_file has been closed then flush will receive:
+    # AttributeError: 'NoneType' object has no attribute 'sendall'
+
+    log.info('Failed to send: file has been closed')
+    raise stem.SocketClosed('file has been closed')
+
+
+def recv_message(control_file, arrived_at = None):
+  """
+  Pulls from a control socket until we either have a complete message or
+  encounter a problem.
+
+  :param file control_file: file derived from the control socket (see the
+    socket's makefile() method for more information)
+
+  :returns: :class:`~stem.response.ControlMessage` read from the socket
+
+  :raises:
+    * :class:`stem.ProtocolError` the content from the socket is malformed
+    * :class:`stem.SocketClosed` if the socket closes before we receive
+      a complete message
+  """
+
+  parsed_content, raw_content, first_line = None, None, True
+
+  while True:
+    try:
+      line = control_file.readline()
+    except AttributeError:
+      # if the control_file has been closed then we will receive:
+      # AttributeError: 'NoneType' object has no attribute 'recv'
+
+      log.info(ERROR_MSG % ('SocketClosed', 'socket file has been closed'))
+      raise stem.SocketClosed('socket file has been closed')
+    except (OSError, ValueError) as exc:
+      # when disconnected this errors with...
+      #
+      #   * ValueError: I/O operation on closed file
+      #   * OSError: [Errno 107] Transport endpoint is not connected
+      #   * OSError: [Errno 9] Bad file descriptor
+
+      log.info(ERROR_MSG % ('SocketClosed', 'received exception "%s"' % exc))
+      raise stem.SocketClosed(exc)
+
+    # Parses the tor control lines. These are of the form...
+    # <status code><divider><content>\r\n
+
+    if not line:
+      # if the socket is disconnected then the readline() method will provide
+      # empty content
+
+      log.info(ERROR_MSG % ('SocketClosed', 'empty socket content'))
+      raise stem.SocketClosed('Received empty socket content.')
+    elif not MESSAGE_PREFIX.match(line):
+      log.info(ERROR_MSG % ('ProtocolError', 'malformed status code/divider, "%s"' % log.escape(line)))
+      raise stem.ProtocolError('Badly formatted reply line: beginning is malformed')
+    elif not line.endswith(b'\r\n'):
+      log.info(ERROR_MSG % ('ProtocolError', 'no CRLF linebreak, "%s"' % log.escape(line)))
+      raise stem.ProtocolError('All lines should end with CRLF')
+
+    status_code, divider, content = line[:3], line[3:4], line[4:-2]  # strip CRLF off content
+
+    status_code = stem.util.str_tools._to_unicode(status_code)
+    divider = stem.util.str_tools._to_unicode(divider)
+
+    # Most controller responses are single lines, in which case we don't need
+    # so much overhead.
+
+    if first_line:
+      if divider == ' ':
+        _log_trace(line)
+        return stem.response.ControlMessage([(status_code, divider, content)], line, arrived_at = arrived_at)
+      else:
+        parsed_content, raw_content, first_line = [], bytearray(), False
+
+    raw_content += line
+
+    if divider == '-':
+      # mid-reply line, keep pulling for more content
+      parsed_content.append((status_code, divider, content))
+    elif divider == ' ':
+      # end of the message, return the message
+      parsed_content.append((status_code, divider, content))
+      _log_trace(bytes(raw_content))
+      return stem.response.ControlMessage(parsed_content, bytes(raw_content), arrived_at = arrived_at)
+    elif divider == '+':
+      # data entry, all of the following lines belong to the content until we
+      # get a line with just a period
+
+      content_block = bytearray(content)
+
+      while True:
+        try:
+          line = control_file.readline()
+          raw_content += line
+        except socket.error as exc:
+          log.info(ERROR_MSG % ('SocketClosed', 'received an exception while mid-way through a data reply (exception: "%s", read content: "%s")' % (exc, log.escape(bytes(raw_content)))))
+          raise stem.SocketClosed(exc)
+
+        if not line.endswith(b'\r\n'):
+          log.info(ERROR_MSG % ('ProtocolError', 'CRLF linebreaks missing from a data reply, "%s"' % log.escape(bytes(raw_content))))
+          raise stem.ProtocolError('All lines should end with CRLF')
+        elif line == b'.\r\n':
+          break  # data block termination
+
+        line = line[:-2]  # strips off the CRLF
+
+        # lines starting with a period are escaped by a second period (as per
+        # section 2.4 of the control-spec)
+
+        if line.startswith(b'..'):
+          line = line[1:]
+
+        content_block += b'\n' + line
+
+      # joins the content using a newline rather than CRLF separator (more
+      # conventional for multi-line string content outside the windows world)
+
+      parsed_content.append((status_code, divider, bytes(content_block)))
+    else:
+      # this should never be reached due to the prefix regex, but might as well
+      # be safe...
+
+      log.warn(ERROR_MSG % ('ProtocolError', "\"%s\" isn't a recognized divider type" % divider))
+      raise stem.ProtocolError("Unrecognized divider type '%s': %s" % (divider, stem.util.str_tools._to_unicode(line)))
+
+
+def send_formatting(message):
+  """
+  Performs the formatting expected from sent control messages. For more
+  information see the :func:`~stem.socket.send_message` function.
+
+  :param str message: message to be formatted
+
+  :returns: **str** of the message wrapped by the formatting expected from
+    controllers
+  """
+
+  # From control-spec section 2.2...
+  #   Command = Keyword OptArguments CRLF / "+" Keyword OptArguments CRLF CmdData
+  #   Keyword = 1*ALPHA
+  #   OptArguments = [ SP *(SP / VCHAR) ]
+  #
+  # A command is either a single line containing a Keyword and arguments, or a
+  # multiline command whose initial keyword begins with +, and whose data
+  # section ends with a single "." on a line of its own.
+
+  # if we already have \r\n entries then standardize on \n to start with
+  message = message.replace('\r\n', '\n')
+
+  if '\n' in message:
+    return '+%s\r\n.\r\n' % message.replace('\n', '\r\n')
+  else:
+    return message + '\r\n'
+
+
+def _log_trace(response):
+  if not log.is_tracing():
+    return
+
+  log_message = stem.util.str_tools._to_unicode(response.replace(b'\r\n', b'\n').rstrip())
+  log_message_lines = log_message.split('\n')
+
+  if TRUNCATE_LOGS and len(log_message_lines) > TRUNCATE_LOGS:
+    log_message = '\n'.join(log_message_lines[:TRUNCATE_LOGS] + ['... %i more lines...' % (len(log_message_lines) - TRUNCATE_LOGS)])
+
+  if len(log_message_lines) > 2:
+    log.trace('Received from tor:\n%s' % log_message)
+  else:
+    log.trace('Received from tor: %s' % log_message.replace('\n', '\\n'))





More information about the tor-commits mailing list