commit 32a54a71f8391b5300e6cae5a3f558f190160995 Author: Ravi Chandra Padmala neenaoffline@gmail.com Date: Tue Jul 3 10:35:52 2012 +0530
Implement Controller.attach_stream --- stem/control.py | 38 ++++++++++++++++++++++++++++++++++---- test/integ/control/controller.py | 34 ++++++++++++++++++++++++++++++++-- test/util.py | 23 +++++++++++++++++------ 3 files changed, 83 insertions(+), 12 deletions(-)
diff --git a/stem/control.py b/stem/control.py index fe45010..4cb55e0 100644 --- a/stem/control.py +++ b/stem/control.py @@ -37,6 +37,7 @@ providing its own for interacting at a higher level. |- repurpose_circuit - change a circuit's purpose |- map_address - maps one address to another such that connections to the original are replaced with the other |- get_circuits - provides a list of active circuits + |- attach_stream - attach a stream to a circuit |- get_version - convenience method to get tor version |- get_server_descriptor - querying the server descriptor for a relay |- get_server_descriptors - provides all presently available server descriptors @@ -129,6 +130,7 @@ import stem.util.enum import stem.version
from stem.util import log +import stem.util.tor_tools
# state changes a control socket can have
@@ -1538,16 +1540,16 @@ class Controller(BaseController): raise stem.ProtocolError("EXTENDCIRCUIT returned unexpected response code: %s" % response.code)
return new_circuit - + def get_circuits(self): """ Provides the list of circuits Tor is currently handling. - + :returns: **list** of :class:`stem.events.CircuitEvent` objects
:raises: :class:`stem.ControllerError` if the call fails """ - + circuits = [] response = self.get_info("circuit-status")
@@ -1555,9 +1557,37 @@ class Controller(BaseController): circ_message = stem.socket.recv_message(StringIO.StringIO("650 CIRC " + circ + "\r\n")) stem.response.convert("EVENT", circ_message, arrived_at = 0) circuits.append(circ_message) - + return circuits
+ def attach_stream(self, stream, circuit, hop = None): + """ + Attaches a stream to a circuit. + + Note: Tor attaches streams to circuits automatically unless the + __LeaveStreamsUnattached configuration variable is set to "1" + + :param int stream: id of the stream that must be attached + :param int circuit: id of the circuit to which it must be attached + :param int hop: hop in the circuit that must be used as an exit node + + :raises: + * :class:`stem.InvalidRequest` if the stream or circuit id were unrecognized + * :class:`stem.OperationFailed` if the stream couldn't be attached for any other reason + """ + + hop_str = " HOP=" + str(hop) if hop else "" + response = self.msg("ATTACHSTREAM %i %i%s" % (stream, circuit, hop_str)) + stem.response.convert("SINGLELINE", response) + + if not response.is_ok(): + if response.code == '552': + raise stem.InvalidRequest(response.code, response.message) + elif response.code == '551': + raise stem.OperationFailed(response.code, response.message) + else: + raise stem.ProtocolError("ATTACHSTREAM returned unexpected response code: %s" % response.code) + def close_circuit(self, circuit_id, flag = ''): """ Closes the specified circuit. diff --git a/test/integ/control/controller.py b/test/integ/control/controller.py index 9b1f1c6..bced64a 100644 --- a/test/integ/control/controller.py +++ b/test/integ/control/controller.py @@ -13,6 +13,8 @@ import threading import time import unittest
+from Queue import Queue + import stem.connection import stem.control import stem.descriptor.reader @@ -721,7 +723,36 @@ class TestController(unittest.TestCase):
count += 1 if count > 10: break - + + def test_attachstream(self): + + if test.runner.require_control(self): return + elif test.runner.require_online(self): return + + runner = test.runner.get_runner() + + with runner.get_tor_controller() as controller: + controller.set_conf("__LeaveStreamsUnattached", "1") + socksport = controller.get_conf("SocksPort") + circuits_3hop = [c.id for c in controller.get_circuits() if len(c.path) == 3] + self.assertTrue(len(circuits_3hop) > 0) + circuit_id = circuits_3hop[0] + + def handle_streamcreated(stream): + if stream.status == "NEW": + controller.attach_stream(int(stream.id), int(circuit_id)) + + controller.add_event_listener(handle_streamcreated, stem.control.EventType.STREAM) + ip = test.util.external_ip('127.0.0.1', socksport) + exit_circuit = [c for c in controller.get_circuits() if c.id == circuit_id] + self.assertTrue(exit_circuit) + exit_ip = controller.get_network_status(exit_circuit[0].path[2][0]).address + + self.assertEquals(exit_ip, ip) + + controller.remove_event_listener(handle_streamcreated) + controller.reset_conf("__LeaveStreamsUnattached") + def test_get_circuits(self): """ Fetches circuits via the get_circuits() method. @@ -735,5 +766,4 @@ class TestController(unittest.TestCase): new_circ = controller.new_circuit() circuits = controller.get_circuits() self.assertTrue(new_circ in [int(circ.id) for circ in circuits]) -
diff --git a/test/util.py b/test/util.py index 74b494d..b89e48f 100644 --- a/test/util.py +++ b/test/util.py @@ -18,26 +18,37 @@ Accept-Encoding: identity
"""
-def external_ip(sock): +def external_ip(host, port): """ Returns the externally visible IP address when using a SOCKS4a proxy. Negotiates the socks connection, connects to ipconfig.me and requests http://ifconfig.me/ip to find out the externally visible IP.
- :param socket sock: socket connected to a SOCKS4a proxy server + Supports only SOCKS4a proxies. + + :param str host: hostname/IP of the proxy server + :param int port: port on which the proxy server is listening
:returns: externally visible IP address, or None if it isn't able to + + :raises: :class:`stem.socket.SocketError`: unable to connect a socket to the socks server """
try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, int(port))) + except Exception, exc: + raise SocketError("Failed to connect to the socks server: " + str(exc)) + + try: negotiate_socks(sock, "ifconfig.me", 80) - s.sendall(req) - response = s.recv(1000) + sock.sendall(ip_request) + response = sock.recv(1000)
# everything after the blank line is the 'data' in a HTTP response # The response data for our request for request should be an IP address + '\n' return response[response.find("\r\n\r\n"):].strip() - except: + except Exception, exc: return None
def negotiate_socks(sock, host, port): @@ -46,7 +57,7 @@ def negotiate_socks(sock, host, port): failure.
:param socket sock: socket connected to socks4a server - :param str host: host to connect to + :param str host: hostname/IP to connect to :param int port: port to connect to
:raises: :class:`stem.ProtocolError` if the socks server doesn't grant our request
tor-commits@lists.torproject.org