[or-cvs] r9498: All your streams are now belong to us. Streams are now attac (torflow/trunk)

mikeperry at seul.org mikeperry at seul.org
Tue Feb 6 11:38:45 UTC 2007


Author: mikeperry
Date: 2007-02-06 06:38:32 -0500 (Tue, 06 Feb 2007)
New Revision: 9498

Modified:
   torflow/trunk/TorCtl.py
   torflow/trunk/metatroller.py
Log:
All your streams are now belong to us. Streams are now attached to 
circuits. Multiple concurrent streams tested (briefly).

Also, added NodeSelector interface to TorCtl.



Modified: torflow/trunk/TorCtl.py
===================================================================
--- torflow/trunk/TorCtl.py	2007-02-06 05:47:35 UTC (rev 9497)
+++ torflow/trunk/TorCtl.py	2007-02-06 11:38:32 UTC (rev 9498)
@@ -54,6 +54,21 @@
     "Filled in during NS events"
     pass
 
+class NodeSelector:
+    "Interface for node selection policies"
+    def __init__(self, target_ip, target_port):
+        self.to_ip = target_ip
+        self.to_port = target_port
+
+    def entry_chooser(self, path):
+        raise NotImplemented
+
+    def middle_chooser(self, path):
+        raise NotImplemented
+
+    def exit_chooser(self, path):
+        raise NotImplemented
+
 class ExitPolicyLine:
     def __init__(self, match, ip_mask, port_low, port_high):
         self.match = match
@@ -506,17 +521,17 @@
             raise ProtocolError("Bad extended line %r",msg)
         return int(m.group(1))
 
-    def build_circuit(self, pathlen, entry_chooser, middle_chooser, exit_chooser):
+    def build_circuit(self, pathlen, nodesel):
         circ = Circuit()
         if pathlen == 1:
-            circ.exit = exit_chooser(circ.path)
+            circ.exit = nodesel.exit_chooser(circ.path)
             circ.path = [circ.exit.idhex]
             circ.cid = self.extend_circuit(0, circ.path)
         else:
-            circ.path.append(entry_chooser(circ.path).idhex)
+            circ.path.append(nodesel.entry_chooser(circ.path).idhex)
             for i in xrange(1, pathlen-1):
-                circ.path.append(middle_chooser(circ.path).idhex)
-            circ.exit = exit_chooser(circ.path)
+                circ.path.append(nodesel.middle_chooser(circ.path).idhex)
+            circ.exit = nodesel.exit_chooser(circ.path)
             circ.path.append(circ.exit.idhex)
             circ.cid = self.extend_circuit(0, circ.path)
         circ.created_at = datetime.datetime.now()

Modified: torflow/trunk/metatroller.py
===================================================================
--- torflow/trunk/metatroller.py	2007-02-06 05:47:35 UTC (rev 9497)
+++ torflow/trunk/metatroller.py	2007-02-06 11:38:32 UTC (rev 9498)
@@ -14,6 +14,7 @@
 import traceback
 import re
 import random
+import time
 from TorUtil import *
 
 routers = {} # indexed by idhex
@@ -30,9 +31,8 @@
 
 exit_port_idx = {} # Used in ordered exits mode
 
-# XXX: Option to ignore guard flag
-
-# XXX: Move these to config file
+# TODO: Move these to config file
+# TODO: Option to ignore guard flag
 control_host = "127.0.0.1"
 control_port = 9061
 max_detach = 3
@@ -53,37 +53,49 @@
 class MetaCircuit(TorCtl.Circuit):
     def __init__(self, circuit): # Promotion
         self.__dict__ = circuit.__dict__
+        self.built = False
         self.detached_cnt = 0
         self.used_cnt = 0
-        self.created_at = 0
+        self.created_at = time.time()
+        self.pending_streams = [] # Which stream IDs are pending us
+
     
 class Stream:
-    def __init__(self):
+    def __init__(self, sid, host, port):
+        self.sid = sid
         self.detached_from = [] # circ id #'s
-        self.detached_cnt = 0
+        self.pending_circ = None
+        self.circ = None
+        self.host = host
+        self.port = port
 
-# XXX: Technically we should obey fast and valid????
-def choose_entry_uniform(path):
-    r = random.choice(sorted_g)
-    while r.idhex in path:
+# TODO: Obviously we need other node selector implementations
+
+class UniformSelector(TorCtl.NodeSelector):
+    "Uniform node selection"
+    # FIXME: Technically we should obey fast and valid
+    def entry_chooser(self, path):
         r = random.choice(sorted_g)
-    return r
+        while r.idhex in path:
+            r = random.choice(sorted_g)
+        return r
 
-def choose_middle_uniform(path):
-    r = random.choice(sorted_r)
-    while r.idhex in path:
+    def middle_chooser(self, path):
         r = random.choice(sorted_r)
-    return r
+        while r.idhex in path:
+            r = random.choice(sorted_r)
+        return r
 
-def choose_exit_uniform(path, target_ip, target_port):
-    allowed = []
-    for r in sorted_r:
-        if r.will_exit_to(target_ip, target_port):
-            allowed.append(r)
-    r = random.choice(allowed)
-    while r.idhex in path:
+    def exit_chooser(self, path):
+        allowed = []
+        for r in sorted_r:
+            if r.will_exit_to(self.to_ip, self.to_port):
+                allowed.append(r)
         r = random.choice(allowed)
-    return r
+        while r.idhex in path:
+            r = random.choice(allowed)
+        return r
+
  
 def read_routers(c, nslist):
     bad_key = 0
@@ -124,42 +136,84 @@
         TorCtl.EventHandler.__init__(self)
         self.c = c
 
+    def attach_stream_any(self, stream, badcircs):
+        for circ in circuits.itervalues():
+            if circ.built and circ.cid not in badcircs:
+                if circ.exit.will_exit_to(stream.host, stream.port):
+                    self.c.attach_stream(stream.sid, circ.cid)
+                    stream.pending_circ = None
+                    stream.circ = circ
+                    circ.used_cnt += 1
+                    break
+        else:
+            circ = MetaCircuit(self.c.build_circuit(3,
+                                 UniformSelector(stream.host, stream.port)))
+            stream.pending_circ = circ
+            circ.pending_streams.append(stream)
+            circuits[circ.cid] = circ
+
     def circ_status(self, eventtype, circID, status, path, reason, remote):
         output = [eventtype, str(circID), status]
         if path: output.append(",".join(path))
         if reason: output.append("REASON=" + reason)
         if remote: output.append("REMOTE_REASON=" + remote)
         plog("DEBUG", " ".join(output))
+        # Circuits we don't control get built by Tor
+        if circID not in circuits: return
+        if status == "FAILED" or status == "CLOSED":
+            circ = circuits[circID]
+            del circuits[circID]
+            for stream in circ.pending_streams:
+                self.attach_stream_any(stream, stream.detached_from)
+        elif status == "BUILT":
+            circuits[circID].built = True
+            for stream in circuits[circID].pending_streams:
+                self.c.attach_stream(stream.sid, circID)
+                circuits[circID].used_cnt += 1
 
     def stream_status(self, eventtype, streamID, status, circID, target_host, target_port, reason, remote):
         output = [eventtype, str(streamID), status, str(circID), target_host,
-str(target_port)]
+                  str(target_port)]
         if reason: output.append("REASON=" + reason)
         if remote: output.append("REMOTE_REASON=" + remote)
         plog("DEBUG", " ".join(output))
         if not re.match(r"\d+.\d+.\d+.\d+", target_host):
             target_host = "255.255.255.255" # ignore DNS for exit policy check
-        if status == "NEW":
-            attach_circ = 0
+        if status == "NEW" or status == "NEWRESOLVE":
             global circuits
-            for circ in circuits.itervalues():
-                if circ.exit.will_exit_to(target_host, target_port):
-                    attach_circ = circ
-                    break
+            streams[streamID] = Stream(streamID, target_host, target_port)
+
+            self.attach_stream_any(streams[streamID],
+                                   streams[streamID].detached_from)
+        elif status == "DETACHED":
+            global circuits
+            if streamID not in streams:
+                plog("WARN", "Detached stream "+str(streamID)+" not found")
+                streams[streamID] = Stream(streamID, target_host, target_port)
+            # FIXME Stats
+            if not circID:
+                plog("WARN", "Stream "+str(streamID)+" detached from no circuit!")
             else:
-                attach_circ = MetaCircuit(
-                                self.c.build_circuit(3, choose_entry_uniform,
-                                      choose_middle_uniform,
-                                      lambda path:
-                                          choose_exit_uniform(path,
-                                          target_host, target_port)))
-                circuits[attach_circ.cid] = attach_circ
-            # TODO: attach
-        elif status == "DETACHED":
-            pass
-        elif status == "FAILED":
-            pass
+                streams[streamID].detached_from.append(circID)
 
+            self.attach_stream_any(streams[streamID],
+                                   streams[streamID].detached_from)
+        elif status == "FAILED" or status == "CLOSED":
+            # FIXME stats
+            if streams[streamID].pending_circ:
+                streams[streamID].pending_circ.pending_streams.remove(streams[streamID])
+            del streams[streamID]
+        elif status == "REMAP":
+            if streamID not in streams:
+                plog("WARN", "Remap id "+str(streamID)+" not found")
+            else:
+                if not re.match(r"\d+.\d+.\d+.\d+", target_host):
+                   target_host = "255.255.255.255"
+                   plog("NOTICE", "Non-IP remap for "+str(streamID)+" to "
+                                   + target_host)
+                streams[streamID].host = target_host
+                streams[streamID].port = target_port
+
     def ns(self, eventtype, nslist):
         read_routers(self.c, nslist)
         plog("DEBUG", "Read " + str(len(nslist)) + " NS dox => " 
@@ -171,23 +225,23 @@
         plog("DEBUG", "Read " + str(len(identities)) + " desc => " 
              + str(len(sorted_r)) + " routers")
         
-def deconf():
-    pass
-
 def metaloop(c):
     """Loop that handles metatroller commands"""
     nslist = c.get_network_status()
     read_routers(c, nslist)
     plog("INFO", "Read "+str(len(sorted_r))+"/"+str(len(nslist))+" routers")
+    # XXX: Loop for commands on socket
 
 def main(argv):
-    atexit.register(deconf)
     s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
     s.connect((control_host,control_port))
     c = TorCtl.get_connection(s)
     c.set_event_handler(SnakeHandler(c))
     th = c.launch_thread()
     c.authenticate()
+    atexit.register(lambda:
+                       c.set_option("__LeaveStreamsUnattached", "0"))
+    c.set_option("__LeaveStreamsUnattached", "1")
     c.set_events([TorCtl.EVENT_TYPE.STREAM,
                   TorCtl.EVENT_TYPE.NS,
                   TorCtl.EVENT_TYPE.CIRC,



More information about the tor-commits mailing list