[tor-commits] [stem/master] Implement Controller.attach_stream

atagar at torproject.org atagar at torproject.org
Mon Dec 31 03:13:07 UTC 2012


commit 32a54a71f8391b5300e6cae5a3f558f190160995
Author: Ravi Chandra Padmala <neenaoffline at gmail.com>
Date:   Tue Jul 3 10:35:52 2012 +0530

    Implement Controller.attach_stream
---
 stem/control.py                  |   38 ++++++++++++++++++++++++++++++++++----
 test/integ/control/controller.py |   34 ++++++++++++++++++++++++++++++++--
 test/util.py                     |   23 +++++++++++++++++------
 3 files changed, 83 insertions(+), 12 deletions(-)

diff --git a/stem/control.py b/stem/control.py
index fe45010..4cb55e0 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -37,6 +37,7 @@ providing its own for interacting at a higher level.
     |- repurpose_circuit - change a circuit's purpose
     |- map_address - maps one address to another such that connections to the original are replaced with the other
     |- get_circuits - provides a list of active circuits
+    |- attach_stream - attach a stream to a circuit
     |- get_version - convenience method to get tor version
     |- get_server_descriptor - querying the server descriptor for a relay
     |- get_server_descriptors - provides all presently available server descriptors
@@ -129,6 +130,7 @@ import stem.util.enum
 import stem.version
 
 from stem.util import log
+import stem.util.tor_tools
 
 # state changes a control socket can have
 
@@ -1538,16 +1540,16 @@ class Controller(BaseController):
       raise stem.ProtocolError("EXTENDCIRCUIT returned unexpected response code: %s" % response.code)
     
     return new_circuit
-
+  
   def get_circuits(self):
     """
     Provides the list of circuits Tor is currently handling.
-
+    
     :returns: **list** of :class:`stem.events.CircuitEvent` objects
     
     :raises: :class:`stem.ControllerError` if the call fails
     """
-     
+    
     circuits = []
     response = self.get_info("circuit-status")
     
@@ -1555,9 +1557,37 @@ class Controller(BaseController):
       circ_message = stem.socket.recv_message(StringIO.StringIO("650 CIRC " + circ + "\r\n"))
       stem.response.convert("EVENT", circ_message, arrived_at = 0)
       circuits.append(circ_message)
-
+    
     return circuits
   
+  def attach_stream(self, stream, circuit, hop = None):
+    """
+    Attaches a stream to a circuit.
+    
+    Note: Tor attaches streams to circuits automatically unless the
+    __LeaveStreamsUnattached configuration variable is set to "1"
+    
+    :param int stream: id of the stream that must be attached
+    :param int circuit: id of the circuit to which it must be attached
+    :param int hop: hop in the circuit that must be used as an exit node
+    
+    :raises:
+      * :class:`stem.InvalidRequest` if the stream or circuit id were unrecognized
+      * :class:`stem.OperationFailed` if the stream couldn't be attached for any other reason
+    """
+    
+    hop_str = " HOP=" + str(hop) if hop else ""
+    response = self.msg("ATTACHSTREAM %i %i%s" % (stream, circuit, hop_str))
+    stem.response.convert("SINGLELINE", response)
+    
+    if not response.is_ok():
+      if response.code == '552':
+        raise stem.InvalidRequest(response.code, response.message)
+      elif response.code == '551':
+        raise stem.OperationFailed(response.code, response.message)
+      else:
+        raise stem.ProtocolError("ATTACHSTREAM returned unexpected response code: %s" % response.code)
+  
   def close_circuit(self, circuit_id, flag = ''):
     """
     Closes the specified circuit.
diff --git a/test/integ/control/controller.py b/test/integ/control/controller.py
index 9b1f1c6..bced64a 100644
--- a/test/integ/control/controller.py
+++ b/test/integ/control/controller.py
@@ -13,6 +13,8 @@ import threading
 import time
 import unittest
 
+from Queue import Queue
+
 import stem.connection
 import stem.control
 import stem.descriptor.reader
@@ -721,7 +723,36 @@ class TestController(unittest.TestCase):
         
         count += 1
         if count > 10: break
-
+  
+  def test_attachstream(self):
+    
+    if test.runner.require_control(self): return
+    elif test.runner.require_online(self): return
+    
+    runner = test.runner.get_runner()
+    
+    with runner.get_tor_controller() as controller:
+      controller.set_conf("__LeaveStreamsUnattached", "1")
+      socksport = controller.get_conf("SocksPort")
+      circuits_3hop = [c.id for c in controller.get_circuits() if len(c.path) == 3]
+      self.assertTrue(len(circuits_3hop) > 0)
+      circuit_id = circuits_3hop[0]
+      
+      def handle_streamcreated(stream):
+        if stream.status == "NEW":
+          controller.attach_stream(int(stream.id), int(circuit_id))
+      
+      controller.add_event_listener(handle_streamcreated, stem.control.EventType.STREAM)
+      ip = test.util.external_ip('127.0.0.1', socksport)
+      exit_circuit = [c for c in controller.get_circuits() if c.id == circuit_id]
+      self.assertTrue(exit_circuit)
+      exit_ip = controller.get_network_status(exit_circuit[0].path[2][0]).address
+      
+      self.assertEquals(exit_ip, ip)
+      
+      controller.remove_event_listener(handle_streamcreated)
+      controller.reset_conf("__LeaveStreamsUnattached")
+  
   def test_get_circuits(self):
     """
     Fetches circuits via the get_circuits() method.
@@ -735,5 +766,4 @@ class TestController(unittest.TestCase):
       new_circ = controller.new_circuit()
       circuits = controller.get_circuits()
       self.assertTrue(new_circ in [int(circ.id) for circ in circuits])
-  
 
diff --git a/test/util.py b/test/util.py
index 74b494d..b89e48f 100644
--- a/test/util.py
+++ b/test/util.py
@@ -18,26 +18,37 @@ Accept-Encoding: identity
 
 """
 
-def external_ip(sock):
+def external_ip(host, port):
   """
   Returns the externally visible IP address when using a SOCKS4a proxy.
   Negotiates the socks connection, connects to ipconfig.me and requests
   http://ifconfig.me/ip to find out the externally visible IP.
   
-  :param socket sock: socket connected to a SOCKS4a proxy server
+  Supports only SOCKS4a proxies.
+  
+  :param str host: hostname/IP of the proxy server
+  :param int port: port on which the proxy server is listening
   
   :returns: externally visible IP address, or None if it isn't able to
+  
+  :raises: :class:`stem.socket.SocketError`: unable to connect a socket to the socks server
   """
   
   try:
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.connect((host, int(port)))
+  except Exception, exc:
+    raise SocketError("Failed to connect to the socks server: " + str(exc))
+  
+  try:
     negotiate_socks(sock, "ifconfig.me", 80)
-    s.sendall(req)
-    response = s.recv(1000)
+    sock.sendall(ip_request)
+    response = sock.recv(1000)
     
     # everything after the blank line is the 'data' in a HTTP response
     # The response data for our request for request should be an IP address + '\n'
     return response[response.find("\r\n\r\n"):].strip()
-  except:
+  except Exception, exc:
     return None
 
 def negotiate_socks(sock, host, port):
@@ -46,7 +57,7 @@ def negotiate_socks(sock, host, port):
   failure.
   
   :param socket sock: socket connected to socks4a server
-  :param str host: host to connect to
+  :param str host: hostname/IP to connect to
   :param int port: port to connect to
   
   :raises: :class:`stem.ProtocolError` if the socks server doesn't grant our request





More information about the tor-commits mailing list