Clean up some duplicate code to use inheritance properly.
Several bugs were not fixed in both CircuitHandler and
StreamHandler after being fixed in PathBuilder..

Also add refcounting to router descriptors so we can wait
till all current established circuits are destroyed before
removing the descriptor from our lists.

Modified: torctl/trunk/python/TorCtl/PathSupport.py
--- torctl/trunk/python/TorCtl/PathSupport.py	2009-01-14 17:00:16 UTC (rev 18107)
+++ torctl/trunk/python/TorCtl/PathSupport.py	2009-01-14 23:56:01 UTC (rev 18108)
@@ -217,11 +217,6 @@
   def r_is_ok(self, r):
     "Returns true if r is in the percentile boundaries (by rank)"
-    # Hrmm.. technically we shouldn't count non-running routers in this..
-    # but that is tricky to do efficiently.
-    # XXX: Is there any reason why sorted_r should have non-running
-    # routers in the first place?
     if r.list_rank < len(self.sorted_r)*self.pct_skip/100: return False
     elif r.list_rank > len(self.sorted_r)*self.pct_fast/100: return False
@@ -248,7 +243,6 @@
 class ConserveExitsRestriction(NodeRestriction):
   "Restriction to reject exits from selection"
-  # XXX: Make this adaptive by ip/port
   def r_is_ok(self, r): return not "Exit" in r.flags
 class FlagsRestriction(NodeRestriction):
@@ -724,6 +718,8 @@
         entry = self.entry_gen.generate()
         mid = self.entry_gen.generate()
         ext = self.entry_gen.generate()
+    for r in path:
+      r.refcount += 1
     return path
 class SelectionManager:
@@ -1033,9 +1029,16 @@
     for ns in nslist:
       if not "Running" in ns.flags:
         if ns.idhex in self.routers:
-          plog("DEBUG", "Expiring non-running router "+ns.idhex)
-          self.sorted_r.remove(self.routers[ns.idhex])
-          del self.routers[ns.idhex]
+          self.routers[ns.idhex].down = True
+          self.routers[ns.idhex].flags = ns.flags
+          if self.routers[ns.idhex].refcount == 0:
+            self.routers[ns.idhex].deleted = True
+            plog("INFO", "Expiring non-running router "+ns.idhex)
+            self.sorted_r.remove(self.routers[ns.idhex])
+            del self.routers[ns.idhex]
+          else:
+            plog("INFO", "Postponing expiring non-running router "+ns.idhex)
+            self.routers[ns.idhex].deleted = True
     nslist = filter(lambda ns: "Running" in ns.flags, nslist)
@@ -1078,6 +1081,10 @@
     except TorCtl.ErrorReply, e: 
       plog("ERROR", "Failed closing circuit " + str(id) + ": " + str(e))
+  def circuit_list(self):
+    "Return an iterator or a list of circuits prioritized for stream selection"
+    return self.circuits.itervalues()
   def attach_stream_any(self, stream, badcircs):
     "Attach a stream to a valid circuit, avoiding any in 'badcircs'"
     # Newnym, and warn if not built plus pending
@@ -1096,7 +1103,7 @@
         # FIXME: Consider actually closing circ if no streams.
         self.circuits[key].dirty = True
-    for circ in self.circuits.itervalues():
+    for circ in self.circuit_list():
       if circ.built and not circ.requested_closed and not circ.dirty \
           and circ.circ_id not in badcircs:
         if circ.exit.will_exit_to(stream.host, stream.port):
@@ -1146,6 +1153,16 @@
     elif c.status == "FAILED" or c.status == "CLOSED":
       # XXX: Can still get a STREAM FAILED for this circ after this
       circ = self.circuits[c.circ_id]
+      for r in circ.path:
+        r.refcount -= 1
+        if r.deleted and r.refcount == 0:
+          plog("INFO", "Removing expired descriptor for "+r.idhex)
+          self.sorted_r.remove(self.routers[r.idhex])
+          del self.routers[r.idhex]
+          for i in xrange(len(self.sorted_r)): self.sorted_r[i].list_rank = i
+          self.selmgr.path_selector.entry_gen.rebuild()
+          self.selmgr.path_selector.mid_gen.rebuild()
+          self.selmgr.path_selector.exit_gen.rebuild()
       del self.circuits[c.circ_id]
       for stream in circ.pending_streams:
         plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
@@ -1273,6 +1290,7 @@
   def ns_event(self, n):
     # FIXME: Hrmm.. this is poor encapsulation..
+    # Maybe also pass in sorted_r?
@@ -1318,7 +1336,8 @@
          str(i) + " circuits")
     # Schedule (num_circs-n) circuit-buildups
     while (n < self.num_circuits):      
-      self.build_circuit("", 80)
+      # TODO: Should mimic Tor's learning here
+      self.build_circuit("", 80) 
       plog("DEBUG", "Scheduled circuit No. " + str(n+1))
       n += 1
@@ -1362,30 +1381,15 @@
     elif c.status == "FAILED" or c.status == "CLOSED":
-      # XXX: Can still get a STREAM FAILED for this circ after this
-      circ = self.circuits[c.circ_id]
-      # Actual removal of the circ
-      del self.circuits[c.circ_id]
-      # Give away pending streams
-      for stream in circ.pending_streams:
-	plog("DEBUG", "Finding new circ for " + str(stream.strm_id))
-        self.attach_stream_any(stream, stream.detached_from)
+      PathBuilder.circ_status_event(self, c)
       # Check if there are enough circs
     # BUILT
     elif c.status == "BUILT":
-      circ = self.circuits[c.circ_id]
-      circ.built = True
-      for stream in circ.pending_streams:
-        try:
-          self.c.attach_stream(stream.strm_id, c.circ_id)
-        except TorCtl.ErrorReply, e:
-          # No need to retry here. We should get the failed
-          # event for either the circ or stream next
-          plog("WARN", "Error attaching stream: " + str(e.args))
+      PathBuilder.circ_status_event(self, c)
       # Compute duration by summing up extend_times
+      circ = self.circuits[c.circ_id]
       duration = reduce(lambda x, y: x+y, circ.extend_times, 0.0)
       plog("INFO", "Circuit " + str(c.circ_id) + " needed " + 
          str(duration) + " seconds to be built")
@@ -1405,7 +1409,6 @@
       in the pool. """
   def __init__(self, c, selmgr, num_circs, RouterClass):
     CircuitHandler.__init__(self, c, selmgr, num_circs, RouterClass)
-    self.sorted_circs = None    # optional sorted list
   def clear_dns_cache(self):
     """ Send signal CLEARDNSCACHE """
@@ -1417,167 +1420,6 @@
     """ Close a stream with given id and reason """
     self.c.close_stream(id, reason)
-  def create_and_attach(self, stream, unattached_streams):
-    """ Create a new circuit and attach (stream + unattached_streams) """
-    circ = self.build_circuit(stream.host, stream.port)
-    if circ:
-      for u in unattached_streams:
-        plog("DEBUG", "Attaching " + str(u.strm_id) + 
-           " pending build of circuit " + str(circ.circ_id))
-        u.pending_circ = circ      
-      circ.pending_streams.extend(unattached_streams)
-      self.circuits[circ.circ_id] = circ
-      self.last_exit = circ.exit
-  def attach_stream_any(self, stream, badcircs):
-    """ Attach a regular user stream """
-    unattached_streams = [stream]
-    if self.new_nym:
-      self.new_nym = False
-      plog("DEBUG", "Obeying new nym")
-      for key in self.circuits.keys():
-        if (not self.circuits[key].dirty
-            and len(self.circuits[key].pending_streams)):
-          plog("WARN", "New nym called, destroying circuit "+str(key)
-             +" with "+str(len(self.circuits[key].pending_streams))
-             +" pending streams")
-          unattached_streams.extend(self.circuits[key].pending_streams)
-          del self.circuits[key].pending_streams[:]
-        # FIXME: Consider actually closing circs if no streams
-        self.circuits[key].dirty = True
-    # Check if there is a sorted list of circs
-    if self.sorted_circs: list = self.sorted_circs
-    else: list = self.circuits.values()
-    for circ in list:
-      # Check each circuit
-      if circ.built and not circ.requested_closed and circ.circ_id not in badcircs \
-         and not circ.dirty:
-        if circ.exit.will_exit_to(stream.host, stream.port):
-          try:
-            self.c.attach_stream(stream.strm_id, circ.circ_id)
-            stream.pending_circ = circ # Only one possible here
-            circ.pending_streams.append(stream)
-            self.last_exit = circ.exit
-          except TorCtl.ErrorReply, e:
-            # No need to retry here. We should get the failed
-            # event for either the circ or stream next
-            plog("WARN", "Error attaching stream: " + str(e.args))
-            return
-          break
-	else:
-	  plog("DEBUG", "Circuit " + str(circ.circ_id) + " won't exit")
-    else:
-      self.create_and_attach(stream, unattached_streams)
-  def stream_status_event(self, s):
-    """ Catch user stream events """
-    # Construct debugging output
-    output = [s.event_name, str(s.strm_id), s.status, str(s.circ_id), s.target_host+':'+str(s.target_port)]
-    if s.reason: output.append("REASON=" + s.reason)
-    if s.remote_reason: output.append("REMOTE_REASON=" + s.remote_reason)
-    if s.purpose: output.append("PURPOSE=" + s.purpose)
-    plog("DEBUG", " ".join(output))
-    # If target_host is not an IP-address
-    if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
-      s.target_host = "" # ignore DNS for exit policy check
-    # Hack to ignore Tor-handled streams (Currently only directory streams)
-    if s.strm_id in self.streams and self.streams[s.strm_id].ignored:
-      plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
-      return
-    if s.status == "NEW" or s.status == "NEWRESOLVE":
-      if s.status == "NEWRESOLVE" and not s.target_port:
-        s.target_port = self.resolve_port      
-      # Set up the new stream
-      if s.circ_id == 0:
-        stream = Stream(s.strm_id, s.target_host, s.target_port, s.status)
-        self.streams[s.strm_id] = stream
-      if s.purpose and s.purpose.find("DIR_") == 0:
-        stream.ignored = True
-        plog("DEBUG", "Ignoring stream: " + str(s.strm_id))
-        return
-      elif s.circ_id == 0:
-        self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
-    elif s.status == "DETACHED":
-      # Stream not found
-      if s.strm_id not in self.streams:
-        plog("WARN", "Detached stream " + str(s.strm_id) + " not found")
-        self.streams[s.strm_id] = Stream(s.strm_id, s.target_host, s.target_port, "NEW")
-      # Circuit not found
-      if not s.circ_id:
-        plog("WARN", "Stream " + str(s.strm_id) + " detached from no circuit!")
-      else:
-        self.streams[s.strm_id].detached_from.append(s.circ_id)      
-      # Detect timeouts on user streams
-      if s.reason == "TIMEOUT":
-	# TODO: Count timeouts on streams?
-	#self.streams[s.strm_id].timeout_counter += 1
-	plog("DEBUG", "User stream timed out on circuit " + str(s.circ_id))
-      # Stream was pending
-      if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
-        self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
-      # Attach to another circ
-      self.streams[s.strm_id].pending_circ = None
-      self.attach_stream_any(self.streams[s.strm_id], self.streams[s.strm_id].detached_from)
-    if s.status == "SUCCEEDED":
-      if s.strm_id not in self.streams:
-        plog("NOTICE", "Succeeded stream " + str(s.strm_id) + " not found")
-        return
-      if s.circ_id and self.streams[s.strm_id].pending_circ.circ_id != s.circ_id:
-        # Hrmm.. this can happen on a new-nym.. Very rare, putting warn
-        # in because I'm still not sure this is correct
-        plog("WARN", "Mismatch of pending: "
-          + str(self.streams[s.strm_id].pending_circ.circ_id) + " vs "
-          + str(s.circ_id))
-	self.streams[s.strm_id].circ = self.circuits[s.circ_id]
-      else:
-        self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
-      self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
-      self.streams[s.strm_id].pending_circ = None
-      self.streams[s.strm_id].attached_at = s.arrived_at
-    elif s.status == "FAILED" or s.status == "CLOSED":
-      if s.strm_id not in self.streams:
-        plog("NOTICE", "Failed stream " + str(s.strm_id) + " not found")
-        return
-      # if not s.circ_id: 
-      # plog("WARN", "Stream " + str(s.strm_id) + " closed/failed from no circuit")
-      # We get failed and closed for each stream, let CLOSED do the cleanup
-      if s.status == "FAILED":
-        # Avoid busted circuits that will not resolve or carry traffic
-        self.streams[s.strm_id].failed = True
-	if s.circ_id in self.circuits: self.circuits[s.circ_id].dirty = True
-        elif self.streams[s.strm_id].attached_at != 0: 
-	  plog("WARN", "Failed stream on unknown circuit " + str(s.circ_id))
-	return
-      # CLOSED
-      if self.streams[s.strm_id].pending_circ:
-        self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
-      # Actual removal of the stream
-      del self.streams[s.strm_id]
-    # REMAP
-    elif s.status == "REMAP":
-      if s.strm_id not in self.streams:
-        plog("WARN", "Remap id "+str(s.strm_id)+" not found")
-      else:
-        if not re.match(r"\d+.\d+.\d+.\d+", s.target_host):
-          s.target_host = ""
-          plog("NOTICE", "Non-IP remap for "+str(s.strm_id) + 
-             " to " + s.target_host)		   
-        self.streams[s.strm_id].host = s.target_host
-        self.streams[s.strm_id].port = s.target_port
   def address_mapped_event(self, event):
     """ It is necessary to listen to ADDRMAP events to be able to 
         perform DNS lookups using Tor """
@@ -1610,7 +1452,7 @@
   loglevel = TorUtil.loglevel
   TorUtil.loglevel = "INFO"
-  #gen.rewind() - Just overhead if we create a fresh generator each time
+  gen.rewind()
   rtrs = gen.generate()
   for i in xrange(1, trials):
     r = rtrs.next()
@@ -1623,6 +1465,10 @@
   copy_rlist = copy.copy(r_list)
   copy_rlist.sort(lambda x, y: cmp(y.chosen, x.chosen))
   for r in copy_rlist:
+    if r.chosen and not gen.rstr_list.r_is_ok(r):
+      print "WARN: Restriction fail at "+r.idhex
+    if not r.chosen and gen.rstr_list.r_is_ok(r):
+      print "WARN: Generation fail at "+r.idhex
     if not gen.rstr_list.r_is_ok(r): continue
     flag = ""
     bw = int(weight_bw(gen, r))
@@ -1681,7 +1527,17 @@
       bw *= bwgen.guard_weight
     return bw
-  # XXX: Test Uniform and OrderedexitGenerators
+  def uniform_weighting(bwgen, r):
+    return 10240000
+  # XXX: Test OrderedexitGenerators
+  do_gen_unit(
+   UniformGenerator(sorted_rlist,
+                    NodeRestrictionList([PercentileRestriction(20,30,sorted_rlist),
+                    sorted_rlist, uniform_weighting, 1500)
   do_gen_unit(BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Exit"]),
                                   3, exit=True),
               sorted_rlist, flag_weighting, 500)
@@ -1693,8 +1549,7 @@
    BwWeightedGenerator(sorted_rlist, FlagsRestriction(["Valid"]), 3),
    sorted_rlist, flag_weighting, 500)
-  exit(0)
   for r in sorted_rlist:
     if r.will_exit_to("", 465):
@@ -1735,16 +1590,24 @@
   ug = UniformGenerator(sorted_rlist, exit_rstr)
+  ug.rewind()
   rlist = []
   for r in ug.generate():
     print "Checking: " + r.nickname
     for rs in rl:
       if not rs.r_is_ok(r):
         raise PathError()
-      if not "Exit" in r.flags:
-        print "No exit in flags of "+r.nickname
+    if not "Exit" in r.flags:
+      print "No exit in flags of "+r.idhex
+      for e in r.exitpolicy:
+        print " "+str(e)
+      print " 80: "+str(r.will_exit_to("", 80))
+      print " 443: "+str(r.will_exit_to("", 443))
+      print " 6667: "+str(r.will_exit_to("", 6667))
+    ug.mark_chosen(r)
   for r in sorted_rlist:
     if "Exit" in r.flags and not r in rlist:
-      print r.nickname+" is an exit not in rl!"
+      print r.idhex+" is an exit not in rl!"

Modified: torctl/trunk/python/TorCtl/TorCtl.py
--- torctl/trunk/python/TorCtl/TorCtl.py	2009-01-14 17:00:16 UTC (rev 18107)
+++ torctl/trunk/python/TorCtl/TorCtl.py	2009-01-14 23:56:01 UTC (rev 18108)
@@ -209,6 +209,17 @@
         return self.match
     return -1
+  def __str__(self):
+    retr = ""
+    if self.match:
+      retr += "accept "
+    else:
+      retr += "reject "
+    retr += socket.inet_ntoa(struct.pack(">I",self.ip)) + "/"
+    retr += socket.inet_ntoa(struct.pack(">I",self.netmask)) + ":"
+    retr += str(self.port_low)+"-"+str(self.port_high)
+    return retr
 class RouterVersion:
   """ Represents a Router's version. Overloads all comparison operators
       to check for newer, older, or equivalent versions. """
@@ -247,6 +258,8 @@
     self.os = os
     self.list_rank = 0 # position in a sorted list of routers.
     self.uptime = uptime
+    self.refcount = 0 # How many open circs are we currently in?
+    self.deleted = False # Has Tor already deleted this descriptor?
   def __str__(self):
     s = self.idhex, self.nickname

