[stem/master] Support for STREAM events

commit d735e1e267877d9e9e7fa0686c2cd49c63930f0b Author: Damian Johnson <atagar@torproject.org> Date: Sun Nov 18 11:48:00 2012 -0800 Support for STREAM events Implementaton and tests for STREAM events. I got the test data by... * starting TBB * used netstat to get the control port (shouldn't have needed to do this - https://trac.torproject.org/7512) * connecting to it with telnet * AUTHENTICATE * SETEVENTS STREAM * visited google's front page in firefox Full test data: AUTHENTICATE 250 OK SETEVENTS STREAM 250 OK 650 STREAM 18 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47849 PURPOSE=USER 650 STREAM 18 SENTCONNECT 26 encrypted.google.com:443 650 STREAM 19 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47850 PURPOSE=USER 650 STREAM 19 SENTCONNECT 26 encrypted.google.com:443 650 STREAM 18 REMAP 26 74.125.227.129:443 SOURCE=EXIT 650 STREAM 18 SUCCEEDED 26 74.125.227.129:443 650 STREAM 19 REMAP 26 74.125.227.129:443 SOURCE=EXIT 650 STREAM 19 SUCCEEDED 26 74.125.227.129:443 650 STREAM 20 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47851 PURPOSE=USER 650 STREAM 20 REMAP 0 74.125.227.129:443 SOURCE=CACHE 650 STREAM 20 SENTCONNECT 26 74.125.227.129:443 650 STREAM 21 NEW 0 encrypted.google.com:443 SOURCE_ADDR=127.0.0.1:47852 PURPOSE=USER 650 STREAM 21 REMAP 0 74.125.227.129:443 SOURCE=CACHE 650 STREAM 21 SENTCONNECT 26 74.125.227.129:443 650 STREAM 20 REMAP 26 74.125.227.129:443 SOURCE=EXIT 650 STREAM 20 SUCCEEDED 26 74.125.227.129:443 650 STREAM 21 REMAP 26 74.125.227.129:443 SOURCE=EXIT 650 STREAM 21 SUCCEEDED 26 74.125.227.129:443 650 STREAM 22 NEW 0 www.google.com:443 SOURCE_ADDR=127.0.0.1:47853 PURPOSE=USER 650 STREAM 22 SENTCONNECT 26 www.google.com:443 650 STREAM 23 NEW 0 www.google.com:443 SOURCE_ADDR=127.0.0.1:47854 PURPOSE=USER 650 STREAM 23 SENTCONNECT 26 www.google.com:443 650 STREAM 21 CLOSED 26 74.125.227.129:443 REASON=CONNRESET 650 STREAM 20 CLOSED 26 74.125.227.129:443 REASON=CONNRESET 650 STREAM 22 REMAP 26 74.125.227.147:443 SOURCE=EXIT 650 STREAM 22 SUCCEEDED 26 74.125.227.147:443 650 STREAM 23 REMAP 26 74.125.227.147:443 SOURCE=EXIT 650 STREAM 23 SUCCEEDED 26 74.125.227.147:443 650 STREAM 24 NEW 0 ocsp.thawte.com:80 SOURCE_ADDR=127.0.0.1:47855 PURPOSE=USER 650 STREAM 24 SENTCONNECT 26 ocsp.thawte.com:80 650 STREAM 25 NEW 0 ocsp.thawte.com:80 SOURCE_ADDR=127.0.0.1:47856 PURPOSE=USER 650 STREAM 25 SENTCONNECT 26 ocsp.thawte.com:80 650 STREAM 24 REMAP 26 199.7.52.72:80 SOURCE=EXIT 650 STREAM 24 SUCCEEDED 26 199.7.52.72:80 650 STREAM 25 REMAP 26 199.7.52.72:80 SOURCE=EXIT 650 STREAM 25 SUCCEEDED 26 199.7.52.72:80 650 STREAM 26 NEW 0 ssl.gstatic.com:443 SOURCE_ADDR=127.0.0.1:47857 PURPOSE=USER 650 STREAM 26 SENTCONNECT 26 ssl.gstatic.com:443 650 STREAM 27 NEW 0 ssl.gstatic.com:443 SOURCE_ADDR=127.0.0.1:47858 PURPOSE=USER 650 STREAM 27 SENTCONNECT 26 ssl.gstatic.com:443 650 STREAM 23 CLOSED 26 74.125.227.147:443 REASON=CONNRESET 650 STREAM 26 REMAP 26 74.125.227.143:443 SOURCE=EXIT 650 STREAM 26 SUCCEEDED 26 74.125.227.143:443 650 STREAM 27 REMAP 26 74.125.227.143:443 SOURCE=EXIT 650 STREAM 27 SUCCEEDED 26 74.125.227.143:443 650 STREAM 25 CLOSED 26 199.7.52.72:80 REASON=DONE 650 STREAM 27 CLOSED 26 74.125.227.143:443 REASON=CONNRESET 650 STREAM 26 CLOSED 26 74.125.227.143:443 REASON=DONE 650 STREAM 24 CLOSED 26 199.7.52.72:80 REASON=DONE 650 STREAM 22 CLOSED 26 74.125.227.147:443 REASON=DONE 650 STREAM 19 CLOSED 26 74.125.227.129:443 REASON=DONE 650 STREAM 18 CLOSED 26 74.125.227.129:443 REASON=DONE Connection closed by foreign host. --- stem/control.py | 115 ++++++++++++++++++++++++++++++++++++ stem/response/events.py | 83 ++++++++++++++++++++++++++- test/unit/response/events.py | 131 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 327 insertions(+), 2 deletions(-) diff --git a/stem/control.py b/stem/control.py index 083082e..d0e8462 100644 --- a/stem/control.py +++ b/stem/control.py @@ -182,6 +182,77 @@ providing its own for interacting at a higher level. **HSSR_CONNECTING** connecting to the introductory point **HSSR_JOINED** connected to the rendezvous-point ============================= =========== + +.. data:: StreamStatus (enum) + + State that a stream going through tor can have. Tor may provide states not in + this enum. + + ================= =========== + StreamStatus Description + ================= =========== + **NEW** request for a new connection + **NEWRESOLVE** request to resolve an address + **REMAP** address is being re-mapped to another + **SENTCONNECT** sent a connect cell along a circuit + **SENTRESOLVE** sent a resolve cell along a circuit + **SUCCEEDED** stream has been established + **FAILED** stream is detached, and won't be re-established + **DETACHED** stream is detached, but might be re-established + **CLOSED** stream has closed + ================= =========== + +.. data:: StreamClosureReason (enum) + + Reason that a stream is being closed or failed to be established. Tor may + provide purposes not in this enum. + + ===================== =========== + StreamClosureReason Description + ===================== =========== + **MISC** none of the following reasons + **RESOLVEFAILED** unable to resolve the hostname + **CONNECTREFUSED** remote host refused the connection + **EXITPOLICY** rejected by the exit due to its exit policy + **DESTROY** circuit is being shut down + **DONE** connection has been closed + **TIMEOUT** connection timed out + **NOROUTE** routing error while contacting the destinaiton + **HIBERNATING** relay is hibernating + **INTERNAL** internal error + **RESOURCELIMIT** relay has insufficient resources to service the request + **CONNRESET** connection has been reset + **TORPROTOCOL** violation in the tor protocol + **NOTDIRECTORY** directory information requested from a relay that isn't mirroring it + **END** endpoint has sent a RELAY_END cell + **PRIVATE_ADDR** endpoint was a private address (127.0.0.1, 10.0.0.1, etc) + ===================== =========== + +.. data:: StreamSource (enum) + + Cause of a stream being remapped to another address. + + ============= =========== + StreamSource Description + ============= =========== + **CACHE** tor is remapping because of a cached answer + **EXIT** exit relay requested the remap + ============= =========== + +.. data:: StreamPurpose (enum) + + Purpsoe of the stream. This is only provided with new streams and tor may + provide purposes not in this enum. + + ================= =========== + StreamPurpose Description + ================= =========== + **DIR_FETCH** unknown (https://trac.torproject.org/7508) + **UPLOAD_DESC** unknown (https://trac.torproject.org/7508) + **DNS_REQUEST** unknown (https://trac.torproject.org/7508) + **USER** unknown (https://trac.torproject.org/7508) + **DIRPORT_TEST** unknown (https://trac.torproject.org/7508) + ================= =========== """ from __future__ import with_statement @@ -289,6 +360,50 @@ HiddenServiceState = stem.util.enum.UppercaseEnum( "HSSR_JOINED", ) +StreamStatus = stem.util.enum.UppercaseEnum( + "NEW", + "NEWRESOLVE", + "REMAP", + "SENTCONNECT", + "SENTRESOLVE", + "SUCCEEDED", + "FAILED", + "DETACHED", + "CLOSED", +) + +StreamClosureReason = stem.util.enum.UppercaseEnum( + "MISC", + "RESOLVEFAILED", + "CONNECTREFUSED", + "EXITPOLICY", + "DESTROY", + "DONE", + "TIMEOUT", + "NOROUTE", + "HIBERNATING", + "INTERNAL", + "RESOURCELIMIT", + "CONNRESET", + "TORPROTOCOL", + "NOTDIRECTORY", + "END", + "PRIVATE_ADDR", +) + +StreamSource = stem.util.enum.UppercaseEnum( + "CACHE", + "EXIT", +) + +StreamPurpose = stem.util.enum.UppercaseEnum( + "DIR_FETCH", + "UPLOAD_DESC", + "DNS_REQUEST", + "USER", + "DIRPORT_TEST", +) + # Constant to indicate an undefined argument default. Usually we'd use None for # this, but users will commonly provide None as the argument so need something # else fairly unique... diff --git a/stem/response/events.py b/stem/response/events.py index 4c8d8bd..8a0e428 100644 --- a/stem/response/events.py +++ b/stem/response/events.py @@ -3,7 +3,7 @@ import re import stem.control import stem.response -from stem.util import log, str_tools, tor_tools +from stem.util import connection, log, str_tools, tor_tools # Matches keyword=value arguments. This can't be a simple "(.*)=(.*)" pattern # because some positional arguments, like circuit paths, can have an equal @@ -156,6 +156,86 @@ class CircuitEvent(Event): log_id = "event.circ.unknown_remote_reason.%s" % self.remote_reason log.log_once(log_id, log.INFO, unrecognized_msg % ('remote reason', self.remote_reason)) +class StreamEvent(Event): + """ + Event that indicates that a stream has changed. + + :var str id: stream identifier + :var stem.control.StreamStatus status: reported status for the stream + :var str circ_id: circuit that the stream is attached to + :var str target: destination of the stream + :var str target_address: destination address (ip or hostname) + :var int target_port: destination port + :var stem.control.StreamClosureReason reason: reason for the stream to be closed + :var stem.control.StreamClosureReason remote_reason: remote side's reason for the stream to be closed + :var stem.control.StreamSource source: origin of the REMAP request + :var str source_addr: requester of the connection + :var str source_address: requester address (ip or hostname) + :var int source_port: requester port + :var stem.control.StreamPurpose purpose: purpose for the stream + """ + + _POSITIONAL_ARGS = ("id", "status", "circ_id", "target") + _KEYWORD_ARGS = { + "REASON": "reason", + "REMOTE_REASON": "remote_reason", + "SOURCE": "source", + "SOURCE_ADDR": "source_addr", + "PURPOSE": "purpose", + } + + def _parse(self): + if self.target is None: + self.target_address = None + self.target_port = None + else: + if not ':' in self.target: + raise stem.ProtocolError("Target location must be of the form 'address:port': %s" % self) + + address, port = self.target.split(':') + + if not connection.is_valid_port(port): + raise stem.ProtocolError("Target location's port is invalid: %s" % self) + + self.target_address = address + self.target_port = int(port) + + if self.source_addr is None: + self.source_address = None + self.source_port = None + else: + if not ':' in self.source_addr: + raise stem.ProtocolError("Source location must be of the form 'address:port': %s" % self) + + address, port = self.source_addr.split(':') + + if not connection.is_valid_port(port): + raise stem.ProtocolError("Source location's port is invalid: %s" % self) + + self.source_address = address + self.source_port = int(port) + + # spec specifies a circ_id of zero if the stream is unattached + + if self.circ_id == "0": + self.circ_id = None + + # log if we have an unrecognized closure reason or purpose + + unrecognized_msg = "STREAM event had an unrecognised %%s (%%s). Maybe a new addition to the control protocol? Full Event: '%s'" % self + + if self.reason and (not self.reason in stem.control.StreamClosureReason): + log_id = "event.stream.reason.%s" % self.reason + log.log_once(log_id, log.INFO, unrecognized_msg % ('reason', self.reason)) + + if self.remote_reason and (not self.remote_reason in stem.control.StreamClosureReason): + log_id = "event.stream.remote_reason.%s" % self.remote_reason + log.log_once(log_id, log.INFO, unrecognized_msg % ('remote reason', self.remote_reason)) + + if self.purpose and (not self.purpose in stem.control.StreamPurpose): + log_id = "event.stream.purpose.%s" % self.purpose + log.log_once(log_id, log.INFO, unrecognized_msg % ('purpose', self.purpose)) + class BandwidthEvent(Event): """ Event emitted every second with the bytes sent and received by tor. @@ -196,6 +276,7 @@ class LogEvent(Event): EVENT_TYPE_TO_CLASS = { "CIRC": CircuitEvent, + "STREAM": StreamEvent, "BW": BandwidthEvent, "DEBUG": LogEvent, "INFO": LogEvent, diff --git a/test/unit/response/events.py b/test/unit/response/events.py index d0c0b4e..ac4a22f 100644 --- a/test/unit/response/events.py +++ b/test/unit/response/events.py @@ -11,7 +11,14 @@ import stem.response.events import test.mocking as mocking from stem import ProtocolError -from stem.control import CircStatus, CircBuildFlag, CircPurpose, CircClosureReason +from stem.control import CircStatus,\ + CircBuildFlag,\ + CircPurpose,\ + CircClosureReason,\ + StreamStatus,\ + StreamClosureReason,\ + StreamSource,\ + StreamPurpose # CIRC events from tor v0.2.3.16 @@ -42,6 +49,19 @@ $E57A476CD4DFBD99B4EE52A100A58610AD6E80B9,hamburgerphone" CIRC_BUILT_OLD = "650 CIRC 1 BUILT \ $E57A476CD4DFBD99B4EE52A100A58610AD6E80B9,hamburgerphone,PrivacyRepublic14" +# STREAM events from tor 0.2.3.16 for visiting the google front page + +STREAM_NEW = "650 STREAM 18 NEW 0 \ +encrypted.google.com:443 \ +SOURCE_ADDR=127.0.0.1:47849 \ +PURPOSE=USER" + +STREAM_SENTCONNECT = "650 STREAM 18 SENTCONNECT 26 encrypted.google.com:443" +STREAM_REMAP = "650 STREAM 18 REMAP 26 74.125.227.129:443 SOURCE=EXIT" +STREAM_SUCCEEDED = "650 STREAM 18 SUCCEEDED 26 74.125.227.129:443" +STREAM_CLOSED_RESET = "650 STREAM 21 CLOSED 26 74.125.227.129:443 REASON=CONNRESET" +STREAM_CLOSED_DONE = "650 STREAM 25 CLOSED 26 199.7.52.72:80 REASON=DONE" + def _get_event(content): controller_event = mocking.get_message(content) stem.response.convert("EVENT", controller_event, arrived_at = 25) @@ -169,6 +189,115 @@ class TestEvents(unittest.TestCase): self.assertEqual(None, event.reason) self.assertEqual(None, event.remote_reason) + def test_stream_event(self): + event = _get_event(STREAM_NEW) + + self.assertTrue(isinstance(event, stem.response.events.StreamEvent)) + self.assertEqual(STREAM_NEW.lstrip("650 "), str(event)) + self.assertEqual("18", event.id) + self.assertEqual(StreamStatus.NEW, event.status) + self.assertEqual(None, event.circ_id) + self.assertEqual("encrypted.google.com:443", event.target) + self.assertEqual("encrypted.google.com", event.target_address) + self.assertEqual(443, event.target_port) + self.assertEqual(None, event.reason) + self.assertEqual(None, event.remote_reason) + self.assertEqual(None, event.source) + self.assertEqual("127.0.0.1:47849", event.source_addr) + self.assertEqual("127.0.0.1", event.source_address) + self.assertEqual(47849, event.source_port) + self.assertEqual(StreamPurpose.USER, event.purpose) + + event = _get_event(STREAM_SENTCONNECT) + + self.assertTrue(isinstance(event, stem.response.events.StreamEvent)) + self.assertEqual(STREAM_SENTCONNECT.lstrip("650 "), str(event)) + self.assertEqual("18", event.id) + self.assertEqual(StreamStatus.SENTCONNECT, event.status) + self.assertEqual("26", event.circ_id) + self.assertEqual("encrypted.google.com:443", event.target) + self.assertEqual("encrypted.google.com", event.target_address) + self.assertEqual(443, event.target_port) + self.assertEqual(None, event.reason) + self.assertEqual(None, event.remote_reason) + self.assertEqual(None, event.source) + self.assertEqual(None, event.source_addr) + self.assertEqual(None, event.source_address) + self.assertEqual(None, event.source_port) + self.assertEqual(None, event.purpose) + + event = _get_event(STREAM_REMAP) + + self.assertTrue(isinstance(event, stem.response.events.StreamEvent)) + self.assertEqual(STREAM_REMAP.lstrip("650 "), str(event)) + self.assertEqual("18", event.id) + self.assertEqual(StreamStatus.REMAP, event.status) + self.assertEqual("26", event.circ_id) + self.assertEqual("74.125.227.129:443", event.target) + self.assertEqual("74.125.227.129", event.target_address) + self.assertEqual(443, event.target_port) + self.assertEqual(None, event.reason) + self.assertEqual(None, event.remote_reason) + self.assertEqual(StreamSource.EXIT, event.source) + self.assertEqual(None, event.source_addr) + self.assertEqual(None, event.source_address) + self.assertEqual(None, event.source_port) + self.assertEqual(None, event.purpose) + + event = _get_event(STREAM_SUCCEEDED) + + self.assertTrue(isinstance(event, stem.response.events.StreamEvent)) + self.assertEqual(STREAM_SUCCEEDED.lstrip("650 "), str(event)) + self.assertEqual("18", event.id) + self.assertEqual(StreamStatus.SUCCEEDED, event.status) + self.assertEqual("26", event.circ_id) + self.assertEqual("74.125.227.129:443", event.target) + self.assertEqual("74.125.227.129", event.target_address) + self.assertEqual(443, event.target_port) + self.assertEqual(None, event.reason) + self.assertEqual(None, event.remote_reason) + self.assertEqual(None, event.source) + self.assertEqual(None, event.source_addr) + self.assertEqual(None, event.source_address) + self.assertEqual(None, event.source_port) + self.assertEqual(None, event.purpose) + + event = _get_event(STREAM_CLOSED_RESET) + + self.assertTrue(isinstance(event, stem.response.events.StreamEvent)) + self.assertEqual(STREAM_CLOSED_RESET.lstrip("650 "), str(event)) + self.assertEqual("21", event.id) + self.assertEqual(StreamStatus.CLOSED, event.status) + self.assertEqual("26", event.circ_id) + self.assertEqual("74.125.227.129:443", event.target) + self.assertEqual("74.125.227.129", event.target_address) + self.assertEqual(443, event.target_port) + self.assertEqual(StreamClosureReason.CONNRESET, event.reason) + self.assertEqual(None, event.remote_reason) + self.assertEqual(None, event.source) + self.assertEqual(None, event.source_addr) + self.assertEqual(None, event.source_address) + self.assertEqual(None, event.source_port) + self.assertEqual(None, event.purpose) + + event = _get_event(STREAM_CLOSED_DONE) + + self.assertTrue(isinstance(event, stem.response.events.StreamEvent)) + self.assertEqual(STREAM_CLOSED_DONE.lstrip("650 "), str(event)) + self.assertEqual("25", event.id) + self.assertEqual(StreamStatus.CLOSED, event.status) + self.assertEqual("26", event.circ_id) + self.assertEqual("199.7.52.72:80", event.target) + self.assertEqual("199.7.52.72", event.target_address) + self.assertEqual(80, event.target_port) + self.assertEqual(StreamClosureReason.DONE, event.reason) + self.assertEqual(None, event.remote_reason) + self.assertEqual(None, event.source) + self.assertEqual(None, event.source_addr) + self.assertEqual(None, event.source_address) + self.assertEqual(None, event.source_port) + self.assertEqual(None, event.purpose) + def test_bw_event(self): event = _get_event("650 BW 15 25")
participants (1)
-
atagar@torproject.org