commit bd5c8aaaa9ddc170f0e45007d6c9f0144befbbd1 Author: Damian Johnson atagar@torproject.org Date: Wed Nov 27 15:49:32 2019 -0800
Move arrived_at from the Event class to ControlMessage
Our Event's arrived_at attribute has a couple wrinkes...
* This timestamp reflects when the event was **parsed** rather than **received**, so it becomes inaccurate if our event loop gets bogged down.
* There's nothing event specific about this attribute. It should apply to all controller messages.
As such moving this up to the parent class. I first spotted the bug via the following script...
import time
from stem.control import EventType, Controller
def slow_handler(event): print("processing a BW event that's %0.1f seconds old" % (time.time() - event.arrived_at)) time.sleep(5)
with Controller.from_port() as controller: controller.authenticate() controller.add_event_listener(slow_handler, EventType.BW) time.sleep(10)
Previously this produced...
% python demo.py processing a BW event that's 0.0 seconds old processing a BW event that's 0.0 seconds old processing a BW event that's 0.0 seconds old processing a BW event that's 0.0 seconds old
... and now we get...
% python demo.py processing a BW event that's 0.4 seconds old processing a BW event that's 4.4 seconds old processing a BW event that's 8.4 seconds old --- docs/change_log.rst | 1 + stem/control.py | 6 +++--- stem/response/__init__.py | 12 ++++++++++-- stem/response/events.py | 13 +++---------- stem/socket.py | 6 +++--- 5 files changed, 20 insertions(+), 18 deletions(-)
diff --git a/docs/change_log.rst b/docs/change_log.rst index c5b5051e..6a39280d 100644 --- a/docs/change_log.rst +++ b/docs/change_log.rst @@ -52,6 +52,7 @@ The following are only available within Stem's `git repository * Controller events could fail to be delivered in a timely fashion (:trac:`27173`) * Adjusted :func:`~stem.control.Controller.get_microdescriptors` fallback to also use '.new' cache files (:trac:`28508`) * ExitPolicies could raise TypeError when read concurrently (:trac:`29899`) + * Moved the *arrived_at* attribute from :class:`~stem.response.event.Event` to :class:`~stem.response.__init__.ControlMessage` * **STALE_DESC** :data:`~stem.Flag` (:spec:`d14164d`) * **DORMANT** and **ACTIVE** :data:`~stem.Signal` (:spec:`4421149`) * **QUERY_RATE_LIMITED** :data:`~stem.HSDescReason` (:spec:`bd80679`) diff --git a/stem/control.py b/stem/control.py index 86f4e787..24640fd5 100644 --- a/stem/control.py +++ b/stem/control.py @@ -3543,7 +3543,7 @@ class Controller(BaseController):
for circ in response.splitlines(): circ_message = stem.socket.recv_message(io.BytesIO(stem.util.str_tools._to_bytes('650 CIRC %s\r\n' % circ))) - stem.response.convert('EVENT', circ_message, arrived_at = 0) + stem.response.convert('EVENT', circ_message) circuits.append(circ_message)
return circuits @@ -3738,7 +3738,7 @@ class Controller(BaseController):
for stream in response.splitlines(): message = stem.socket.recv_message(io.BytesIO(stem.util.str_tools._to_bytes('650 STREAM %s\r\n' % stream))) - stem.response.convert('EVENT', message, arrived_at = 0) + stem.response.convert('EVENT', message) streams.append(message)
return streams @@ -4009,7 +4009,7 @@ class Controller(BaseController):
def _handle_event(self, event_message): try: - stem.response.convert('EVENT', event_message, arrived_at = time.time()) + stem.response.convert('EVENT', event_message) event_type = event_message.type except stem.ProtocolError as exc: log.error('Tor sent a malformed event (%s): %s' % (exc, event_message)) diff --git a/stem/response/__init__.py b/stem/response/__init__.py index 7d2c5c5c..d90061b8 100644 --- a/stem/response/__init__.py +++ b/stem/response/__init__.py @@ -31,6 +31,7 @@ Parses replies from the control socket. import codecs import io import re +import time import threading
import stem.socket @@ -129,8 +130,13 @@ class ControlMessage(object): individual message components stripped of protocol formatting. Messages are never empty.
+ :var int arrived_at: unix timestamp for when the message arrived + .. versionchanged:: 1.7.0 Implemented equality and hashing. + + .. versionchanged:: 1.8.0 + Moved **arrived_at** from the Event class up to this base ControlMessage. """
@staticmethod @@ -158,17 +164,19 @@ class ControlMessage(object):
content = re.sub('([\r]?)\n', '\r\n', content)
- msg = stem.socket.recv_message(io.BytesIO(stem.util.str_tools._to_bytes(content))) + msg = stem.socket.recv_message(io.BytesIO(stem.util.str_tools._to_bytes(content)), arrived_at = kwargs.pop('arrived_at', None))
if msg_type is not None: convert(msg_type, msg, **kwargs)
return msg
- def __init__(self, parsed_content, raw_content): + def __init__(self, parsed_content, raw_content, arrived_at = None): if not parsed_content: raise ValueError("ControlMessages can't be empty")
+ self.arrived_at = arrived_at if arrived_at else int(time.time()) + self._parsed_content = parsed_content self._raw_content = raw_content self._str = None diff --git a/stem/response/events.py b/stem/response/events.py index 27a3e405..8b819315 100644 --- a/stem/response/events.py +++ b/stem/response/events.py @@ -3,7 +3,6 @@
import io import re -import time
import stem import stem.control @@ -36,7 +35,6 @@ class Event(stem.response.ControlMessage): https://gitweb.torproject.org/torspec.git/tree/control-spec.txt`_.
:var str type: event type - :var int arrived_at: unix timestamp for when the message arrived :var list positional_args: positional arguments of the event :var dict keyword_args: key/value arguments of the event """ @@ -48,24 +46,19 @@ class Event(stem.response.ControlMessage): _SKIP_PARSING = False # skip parsing contents into our positional_args and keyword_args _VERSION_ADDED = stem.version.Version('0.1.1.1-alpha') # minimum version with control-spec V1 event support
- def _parse_message(self, arrived_at = None): - if arrived_at is None: - arrived_at = int(time.time()) - + def _parse_message(self): if not str(self).strip(): raise stem.ProtocolError('Received a blank tor event. Events must at the very least have a type.')
self.type = str(self).split()[0] - self.arrived_at = arrived_at + self.positional_args = [] + self.keyword_args = {}
# if we're a recognized event type then translate ourselves into that subclass
if self.type in EVENT_TYPE_TO_CLASS: self.__class__ = EVENT_TYPE_TO_CLASS[self.type]
- self.positional_args = [] - self.keyword_args = {} - if not self._SKIP_PARSING: self._parse_standard_attr()
diff --git a/stem/socket.py b/stem/socket.py index 80d29b83..8303fd06 100644 --- a/stem/socket.py +++ b/stem/socket.py @@ -653,7 +653,7 @@ def _write_to_socket(socket_file, message): raise stem.SocketClosed('file has been closed')
-def recv_message(control_file): +def recv_message(control_file, arrived_at = None): """ Pulls from a control socket until we either have a complete message or encounter a problem. @@ -720,7 +720,7 @@ def recv_message(control_file): if first_line: if divider == ' ': _log_trace(line) - return stem.response.ControlMessage([(status_code, divider, content)], line) + return stem.response.ControlMessage([(status_code, divider, content)], line, arrived_at = arrived_at) else: parsed_content, raw_content, first_line = [], bytearray(), False
@@ -733,7 +733,7 @@ def recv_message(control_file): # end of the message, return the message parsed_content.append((status_code, divider, content)) _log_trace(bytes(raw_content)) - return stem.response.ControlMessage(parsed_content, bytes(raw_content)) + return stem.response.ControlMessage(parsed_content, bytes(raw_content), arrived_at = arrived_at) elif divider == '+': # data entry, all of the following lines belong to the content until we # get a line with just a period