[or-cvs] r19411: {torctl} Add stream tracking code and attempt to reduce stats calcula (torctl/trunk/python/TorCtl)

mikeperry at seul.org mikeperry at seul.org
Sun May 3 11:31:38 UTC 2009


Author: mikeperry
Date: 2009-05-03 07:31:38 -0400 (Sun, 03 May 2009)
New Revision: 19411

Modified:
   torctl/trunk/python/TorCtl/SQLSupport.py
Log:

Add stream tracking code and attempt to reduce stats
calculations to single SQL update statements where possible.



Modified: torctl/trunk/python/TorCtl/SQLSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/SQLSupport.py	2009-05-03 09:58:03 UTC (rev 19410)
+++ torctl/trunk/python/TorCtl/SQLSupport.py	2009-05-03 11:31:38 UTC (rev 19411)
@@ -20,10 +20,16 @@
 from TorCtl import EVENT_TYPE, EVENT_STATE, TorCtlError
 
 from sqlalchemy.orm import scoped_session, sessionmaker, eagerload, lazyload, eagerload_all
-from sqlalchemy import create_engine
+from sqlalchemy import create_engine, and_, or_, not_
 from sqlalchemy.schema import ThreadLocalMetaData,MetaData
 from elixir import *
 
+# Nodes with a ratio below this value will be removed from consideration
+# for higher-valued nodes
+MIN_RATIO=0.5
+
+NO_FPE=2**-20
+
 #################### Model #######################
 
 # In elixir, the session (DB connection) is a property of the model..
@@ -65,6 +71,7 @@
   router = Field(PickleType(mutable=False)) 
   circuits = ManyToMany('Circuit')
   streams = ManyToMany('Stream')
+  detached_streams = ManyToMany('Stream')
   bw_history = OneToMany('BwHistory')
   stats = OneToOne('RouterStats', inverse="router")
 
@@ -98,7 +105,8 @@
   using_options(order_by='-launch_time', session=tc_session, metadata=tc_metadata)
   using_mapper_options(save_on_init=False)
   routers = ManyToMany('Router')
-  streams = OneToMany('Stream')
+  streams = OneToMany('Stream', inverse='circuit')
+  detached_streams = ManyToMany('Stream', inverse='detached_circuits')
   extensions = OneToMany('Extension', inverse='circ')
   circ_id = Field(Integer, index=True)
   launch_time = Field(Float)
@@ -148,22 +156,29 @@
   using_options(session=tc_session, metadata=tc_metadata)
   using_options(order_by='-start_time')
   using_mapper_options(save_on_init=False)
+  tgt_host = Field(Text)
+  tgt_port = Field(Integer)
   circuit = ManyToOne('Circuit')
+  detached_circuits = ManyToMany('Circuit')
+  ignored = Field(Boolean) # Directory streams
   strm_id = Field(Integer, index=True)
   start_time = Field(Float)
-  tot_bytes = Field(Integer)
+  tot_read_bytes = Field(Integer)
+  tot_write_bytes = Field(Integer)
+  close_reason = Field(Text) # Shared by Failed and Closed. Unused here.
 
 class FailedStream(Stream):
   using_options(session=tc_session, metadata=tc_metadata)
   using_mapper_options(save_on_init=False)
-  reason = Field(Text)
+  fail_reason = Field(Text)
   fail_time = Field(Float)
 
 class ClosedStream(Stream):
   using_options(session=tc_session, metadata=tc_metadata)
   using_mapper_options(save_on_init=False)
   end_time = Field(Float)
-  bandwidth = Field(Float)
+  read_bandwidth = Field(Float)
+  write_bandwidth = Field(Float)
 
 class RouterStats(Entity):
   using_options(session=tc_session, metadata=tc_metadata)
@@ -200,64 +215,43 @@
   avg_first_ext = Field(Float)
   ext_ratio = Field(Float)
   
-  avg_sbw = Field(Float)
+  sbw = Field(Float)
   sbw_ratio = Field(Float)
-
-  # FIXME: Figure out how to efficiently compute these..
-  filt_to_ratio = Field(Float)
-  filt_from_ratio = Field(Float)
-  filt_bi_ratio = Field(Float)
+  filt_sbw = Field(Float)
   filt_sbw_ratio = Field(Float)
 
-  def _compute_stats_relation(r):
-    rs = r.stats
-    rs.circ_fail_to = 0
-    rs.circ_try_to = 0
-    rs.circ_fail_from = 0
-    rs.circ_try_from = 0
-    tot_extend_time = 0
-    tot_extends = 0
-    for c in r.circuits: 
-      for e in c.extensions: 
-        if e.to_node == r:
-          rs.circ_try_to += 1
-          if isinstance(e, FailedExtension):
-            rs.circ_fail_to += 1
-          elif e.hop == 0:
-            tot_extend_time += e.delta
-            tot_extends += 1
-        elif e.from_node == r:
-          rs.circ_try_from += 1
-          if isinstance(e, FailedExtension):
-            rs.circ_fail_from += 1
-          
-      if isinstance(c, FailedCircuit):
-        pass # TODO: Also count timeouts against earlier nodes?
-      elif isinstance(c, DestroyedCircuit):
-        pass # TODO: Count these somehow..
+  def _compute_stats_relation(stats_clause):
+    for rs in RouterStats.query.\
+                   filter(stats_clause).\
+                   options(eagerload_all('router.circuits.extensions')).\
+                   all():
+      rs.circ_fail_to = 0
+      rs.circ_try_to = 0
+      rs.circ_fail_from = 0
+      rs.circ_try_from = 0
+      tot_extend_time = 0
+      tot_extends = 0
+      for c in rs.router.circuits: 
+        for e in c.extensions: 
+          if e.to_node == r:
+            rs.circ_try_to += 1
+            if isinstance(e, FailedExtension):
+              rs.circ_fail_to += 1
+            elif e.hop == 0:
+              tot_extend_time += e.delta
+              tot_extends += 1
+          elif e.from_node == r:
+            rs.circ_try_from += 1
+            if isinstance(e, FailedExtension):
+              rs.circ_fail_from += 1
+            
+        if isinstance(c, FailedCircuit):
+          pass # TODO: Also count timeouts against earlier nodes?
+        elif isinstance(c, DestroyedCircuit):
+          pass # TODO: Count these somehow..
 
-    if tot_extends > 0: rs.avg_first_ext = (1.0*tot_extend_time)/tot_extends
-    else: rs.avg_first_ext = 0
-  _compute_stats_relation = Callable(_compute_stats_relation)
-
-  def _compute_stats_query(r):
-    rs = r.stats
-    to_r = Extension.query.filter_by(to_node=r)
-    rs.circ_try_to = to_r.count()
-    rs.circ_try_from = Extension.query.filter_by(from_node=r).count()
-    rs.circ_fail_to = FailedExtension.query.filter_by(to_node=r).count()
-    rs.circ_fail_from = FailedExtension.query.filter_by(from_node=r).count()
-    rs.avg_first_ext = to_r.filter_by(hop=0,row_type='extension').avg(Extension.delta)
-  _compute_stats_query = Callable(_compute_stats_query)
-
-
-  def _compute_stats():
-    for r in Router.query.\
-                   options(eagerload_all('circuits.extensions')).\
-                   all():
-      RouterStats._compute_stats_relation(r)
-      #RouterStats._compute_stats_query(r) # Remove options if this is used
-      rs = r.stats
+      if tot_extends > 0: rs.avg_first_ext = (1.0*tot_extend_time)/tot_extends
+      else: rs.avg_first_ext = 0
       if rs.circ_try_from > 0:
         rs.circ_from_rate = (1.0*rs.circ_fail_from/rs.circ_try_from)
       if rs.circ_try_to > 0:
@@ -265,105 +259,257 @@
       if rs.circ_try_to+rs.circ_try_from > 0:
         rs.circ_bi_rate = (1.0*rs.circ_fail_to+rs.circ_fail_from)/(rs.circ_try_to+rs.circ_try_from)
 
-      #for s in r.streams:
-      #  if isinstance(c, ClosedStream):
-      #  elif isinstance(c, FailedStream):
       tc_session.update(rs)
+  _compute_stats_relation = Callable(_compute_stats_relation)
+
+  def _compute_stats_query(stats_clause):
+    # http://www.sqlalchemy.org/docs/04/sqlexpression.html#sql_update
+    to_s = select([func.count(Extension.id)], 
+        and_(stats_clause, Extension.table.c.to_node_idhex
+             == RouterStats.table.c.router_idhex)).as_scalar()
+    from_s = select([func.count(Extension.id)], 
+        and_(stats_clause, Extension.table.c.from_node_idhex
+             == RouterStats.table.c.router_idhex)).as_scalar()
+    f_to_s = select([func.count(FailedExtension.id)], 
+        and_(stats_clause, FailedExtension.table.c.to_node_idhex
+             == RouterStats.table.c.router_idhex)).as_scalar()
+    f_from_s = select([func.count(FailedExtension.id)], 
+        and_(stats_clause, FailedExtension.table.c.from_node_idhex
+             == RouterStats.table.c.router_idhex)).as_scalar()
+    avg_ext = select([func.avg(Extension.delta)], 
+        and_(stats_clause,
+             Extension.table.c.to_node_idhex==RouterStats.table.c.router_idhex,
+             Extension.table.c.hop==0, 
+             Extension.table.c.row_type=='extension')).as_scalar()
+
+    RouterStats.table.update(stats_clause, values=
+      {RouterStats.table.c.circ_try_to:to_s,
+       RouterStats.table.c.circ_try_from:from_s,
+       RouterStats.table.c.circ_fail_to:f_to_s,
+       RouterStats.table.c.circ_fail_from:f_from_s,
+       RouterStats.table.c.avg_first_ext:avg_ext}).execute()
+
+    RouterStats.table.update(values=
+      {RouterStats.table.c.circ_from_rate :
+         RouterStats.table.c.circ_fail_from/RouterStats.table.c.circ_try_from,
+       RouterStats.table.c.circ_to_rate :
+         RouterStats.table.c.circ_fail_to/RouterStats.table.c.circ_try_to,
+       RouterStats.table.c.circ_bi_rate :
+         (RouterStats.table.c.circ_fail_to+RouterStats.table.c.circ_fail_from)
+                          /
+         (RouterStats.table.c.circ_try_to+RouterStats.table.c.circ_try_from)}).execute()
+
+    tc_session.clear()
+
+    # TODO: Give the streams relation table a sane name and reduce this too
+    for rs in RouterStats.query.options(eagerload('router'), 
+                        eagerload('router.streams')).all():
+      tot_bw = 0.0
+      s_cnt = 0
+      for s in rs.router.streams:
+        if isinstance(s, ClosedStream):
+          tot_bw += s.read_bandwidth
+          s_cnt += 1
+      if s_cnt > 0: rs.sbw = tot_bw/s_cnt
+      tc_session.update(rs)
+  _compute_stats_query = Callable(_compute_stats_query)
+
+  def _compute_stats(stats_clause):
+    RouterStats._compute_stats_query(stats_clause)
+    #RouterStats._compute_stats_relation(stats_clause)
   _compute_stats = Callable(_compute_stats)
 
   def _compute_ranks():
-    min_avg_rank = 0x7fffffff
-    max_avg_rank = 0
-    # TODO: Can we optimize this further into one query/update?
-    for r in Router.query.all(): 
-      if r.stats: tc_session.delete(r.stats)
-      rs = RouterStats()
-      rs.router = r
-      r.stats = rs
-      rank_q = BwHistory.query.filter_by(router=r)
-      rs.min_rank = rank_q.min(BwHistory.rank)
-      rs.avg_rank = rank_q.avg(BwHistory.rank)
-      rs.max_rank = rank_q.max(BwHistory.rank)
-      rs.avg_bw = rank_q.avg(BwHistory.bw)
-    min_avg_rank = RouterStats.query.filer('1=1').min(RouterStats.avg_rank)
-    max_avg_rank = RouterStats.query.filer('1=1').max(RouterStats.avg_rank)
-    for rs in RouterStats.query.all():
-      rs.percentile = (100.0*rs.avg_rank)/(max_avg_rank - min_avg_rank)
+    min_r = select([func.min(BwHistory.rank)], 
+        BwHistory.table.c.router_idhex
+            == RouterStats.table.c.router_idhex).as_scalar()
+    avg_r = select([func.avg(BwHistory.rank)], 
+        BwHistory.table.c.router_idhex
+            == RouterStats.table.c.router_idhex).as_scalar()
+    max_r = select([func.max(BwHistory.rank)], 
+        BwHistory.table.c.router_idhex
+            == RouterStats.table.c.router_idhex).as_scalar()
+    avg_bw = select([func.avg(BwHistory.bw)], 
+        BwHistory.table.c.router_idhex
+            == RouterStats.table.c.router_idhex).as_scalar()
 
-    tc_session.update(rs)
-    tc_session.update(r)
+    RouterStats.table.update(values=
+       {RouterStats.table.c.min_rank:min_r,
+        RouterStats.table.c.avg_rank:avg_r,
+        RouterStats.table.c.max_rank:max_r,
+        RouterStats.table.c.avg_bw:avg_bw}).execute()
+
+    min_avg_rank = select([func.min(RouterStats.avg_rank)]).as_scalar()
+    max_avg_rank = select([func.max(RouterStats.avg_rank)]).as_scalar()
+
+    RouterStats.query.filter('1=1').min(RouterStats.avg_rank)
+    max_avg_rank = RouterStats.query.filter('1=1').max(RouterStats.avg_rank)
+
+    RouterStats.table.update(values=
+       {RouterStats.table.c.percentile:
+            (100.0*rs.avg_rank)/max_avg_rank}).execute()
+    
+    tc_session.clear()
   _compute_ranks = Callable(_compute_ranks)
 
-  def _compute_ratios(filter):
-    sliceq = RouterStats.query.filter(filter)
-    avg_circ_from_rate = sliceq.avg(RouterStats.circ_from_rate)
-    avg_circ_to_rate = sliceq.avg(RouterStats.circ_to_rate)
-    avg_circ_bi_rate = sliceq.avg(RouterStats.circ_bi_rate)
-    avg_ext = sliceq.avg(RouterStats.avg_first_ext)
-    for rs in sliceq.all():
-      rs.circ_from_ratio = rs.circ_from_rate/avg_circ_from_rate
-      rs.circ_to_ratio = rs.circ_to_rate/avg_circ_to_rate
-      rs.circ_bi_ratio = rs.circ_bi_rate/avg_circ_bi_rate
-      rs.ext_ratio = rs.avg_first_ext/avg_ext
-      rs.filt_from_ratio = rs.circ_from_ratio
-      rs.filt_to_ratio = rs.circ_to_ratio
-      rs.filt_bi_ratio = rs.circ_bi_ratio
+  def _compute_ratios(stats_clause):
+    avg_from_rate = select([func.avg(RouterStats.circ_from_rate)],
+                           stats_clause).as_scalar()
+    avg_to_rate = select([func.avg(RouterStats.circ_to_rate)],
+                           stats_clause).as_scalar()
+    avg_bi_rate = select([func.avg(RouterStats.circ_bi_rate)],
+                           stats_clause).as_scalar()
+    avg_ext = select([func.avg(RouterStats.avg_first_ext)],
+                           stats_clause).as_scalar()
+    avg_sbw = select([func.avg(RouterStats.sbw)],
+                           stats_clause).as_scalar()
+
+    RouterStats.update(stats_clause, values=
+       {RouterStats.table.c.circ_from_ratio:
+         (1-RouterStats.table.c.circ_from_rate)/(1-avg_from_rate),
+        RouterStats.table.c.circ_to_ratio:
+         (1-RouterStats.table.c.circ_to_rate)/(1-avg_to_rate),
+        RouterStats.table.c.circ_bi_ratio:
+         (1-RouterStats.table.c.circ_bi_rate)/(1-avg_bi_rate),
+        RouterStats.table.c.avg_first_ext:
+         (RouterStats.table.c.avg_first_ext)/(avg_ext),
+        RouterStats.table.c.sbw_ratio:
+         (RouterStats.table.c.sbw)/(avg_sbw)})
+    tc_session.clear()
   _compute_ratios = Callable(_compute_ratios)
 
-  def _compute_filtered_ratios(filter, min_ratio):
-    # XXX: Actually, we should start off simple and only
-    # do filtering for stream ratios
+  def _compute_filtered_query(min_ratio): # broken.. don't use.
     badrouters = RouterStats.query.filter(
-       RouterStats.circ_from_rate < min_ratio).column(RouterStats.router).all()
+       RouterStats.sbw_ratio < min_ratio).column(RouterStats.router).all()
+  
+    for r in Router.query.all():
+      rs = r.stats
+      # XXX: This is totally wrong:
+      strmq = Router.query.filter_by(idhex=r.idhex).add_column(Router.streams).filter_by(row_type='closedstream')
+      for br in badrouters:
+        if br != r:
+          strmq = strmq.filter(not_(ClosedStream.circuit.routers.contains(r)))
+      rs.filt_sbw = strmq.avg(ClosedStream.read_bandwidth)
+    avg_sbw = RouterStats.query.filter('1=1').avg(RouterStats.filt_sbw)
+    for rs in RouterStats.query.all():
+      rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
+  _compute_filtered_query = Callable(_compute_filtered_query)
 
-    extnq = Circuit.query
-    for r in badrouters:
-      extnq.filter(not_(Circuit.routers.contains(r)))
-    extnq = sliceq.column(Circuit.extensions)
-    
-    #sliceq = RouterStats.query.filter(filter)
-    #for r in badrouters:
-    #  sliceq = sliceq.filter(not_(Circuits.routers.contains(r)))
+  def _compute_filtered_relational(min_ratio, stats_clause, filter_clause):
+    badrouters = RouterStats.query.filter(stats_clause).filter(filter_clause).\
+                   filter(RouterStats.sbw_ratio < min_ratio).all()
 
-    avg_rate = sliceq.avg(RouterStats.circ_from_rate)
+    # TODO: Turn this into a single query....
+    for rs in RouterStats.query.filter(stats_clause).\
+          options(eagerload_all('router.streams.circuit.routers')).\
+             all():
+      tot_sbw = 0
+      sbw_cnt = 0
+      for s in rs.router.streams:
+        if isinstance(s, ClosedStream):
+          skip = False
+          for br in badrouters:
+            if br != rs:
+              if br.router in s.circuit.routers:
+                skip = True
+          if not skip:
+            tot_sbw += s.read_bandwidth   
+            sbw_cnt += 1
+      rs.filt_sbw = tot_sbw/sbw_cnt
+      tc_session.update(rs)
+    avg_sbw = RouterStats.query.filter(stats_clause).avg(RouterStats.filt_sbw)
+    for rs in RouterStats.query.filter(stats_clause).all():
+      rs.filt_sbw_ratio = rs.filt_sbw/avg_sbw
+      tc_session.update(rs)
+  _compute_filtered_relational = Callable(_compute_filtered_relational)
 
-    for rs in sliceq.all():
-      rs.filt_from_ratio = rs.circ_from_rate/avg_rate
+  def _compute_filtered_ratios(min_ratio, stats_clause, filter_clause):
+    RouterStats._compute_filtered_relational(min_ratio, stats_clause, 
+                                             filter_clause)
+    #RouterStats._compute_filtered_query(filter,min_ratio)
   _compute_filtered_ratios = Callable(_compute_filtered_ratios)
 
   def reset():
-    for r in Router.query.all():
-      r.stats = None
-      tc_session.update(r)
     RouterStats.table.drop()
     RouterStats.table.create()
+    for r in Router.query.all(): # Is this needed?
+      rs = RouterStats()
+      rs.router = r
+      r.stats = rs
+      tc_session.update(r)
     tc_session.commit()
   reset = Callable(reset)
 
-  def compute(router_filter, stats_filter):
-    RouterStats._compute_ranks()
-    RouterStats._compute_stats()
-    RouterStats._compute_ratios()
-    RouterStats._compute_filtered_ratios()
+  def compute(pct_low=0, pct_high=100, stat_clause=None, filter_clause=None):
+    pct_clause = and_(RouterStats.percentile >= pct_low, 
+                         RouterStats.percentile < pct_high)
+    if stat_clause:
+      stat_clause = and_(pct_clause, stat_clause)
+    else:
+      stat_clause = pct_clause
+     
+    RouterStats.reset()
+    RouterStats._compute_ranks() # No filters. Ranks are independent
+    RouterStats._compute_stats(stat_clause)
+    RouterStats._compute_ratios(stat_clause)
+    RouterStats._compute_filtered_ratios(MIN_RATIO, stat_clause, filter_clause)
     tc_session.commit()
   compute = Callable(compute)  
 
-##################### End Model ####################
+  def write_stats(f, pct_low=0, pct_high=100, order_by=None, recompute=False, stat_clause=None, filter_clause=None):
+    ratio_key = """SQLSupport Statistics:
+    SR=Stream avg ratio     AR=Advertised bw ratio    BRR=Adv. bw avg ratio
+    CSR=Circ suspect ratio  CFR=Circ Fail Ratio       SSR=Stream suspect ratio
+    SFR=Stream fail ratio   CC=Circuit Count          SC=Stream Count
+    P=Percentile Rank       U=Uptime (h)\n"""
+ 
+    if not order_by:
+      order_by=RouterStats.avg_first_ext
 
-class CircuitStatsBroker:
-  pass
+    if recompute:
+      RouterStats.compute(pct_low, pct_high, stat_clause, filter_clause)
 
-class StreamStatsBroker:
-  pass
+    pct_clause = and_(RouterStats.percentile >= pct_low, 
+                         RouterStats.percentile < pct_high)
 
-class RatioBroker:
-  pass
+    circ_from_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_from_rate)
+    circ_to_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_to_rate)
+    circ_bi_rate = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.circ_bi_rate)
 
+    avg_first_ext = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.avg_first_ext)
+    sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.sbw)
+    filt_sbw = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.filt_sbw)
+    percentile = RouterStats.query.filter(pct_clause).filter(stat_clause).avg(RouterStats.percentile) 
+
+    f.write("Average Statistics:\n")
+    f.write(" CFR="+str(round(circ_from_rate,2))+"\n")
+    f.write(" CTR="+str(round(circ_to_rate,2))+"\n")
+    f.write(" CBR="+str(round(circ_bi_rate,2))+"\n")
+    f.write(" CFE="+str(round(avg_first_ext,2))+"\n")
+    f.write(" SBW="+str(round(sbw,2))+"\n")
+    f.write(" FBW="+str(round(filt_sbw,2))+"\n")
+    f.write(" PR="+str(round(percentile,2))+"\n\n")
+
+    for s in RouterStats.query.filter(pct_clause).filter(stat_clause).\
+           order_by(order_by).all():
+      f.write(s.router.idhex+"="+s.router.nickname+"\n")
+      f.write(" CFR="+str(round(s.circ_from_rate,2))+" ")
+      f.write(" CTR="+str(round(s.circ_to_rate,2))+" ")
+      f.write(" CBR="+str(round(s.circ_bi_rate,2))+" ")
+      f.write(" CFE="+str(round(s.avg_first_ext,2))+" ")
+      f.write(" SBW="+str(round(s.sbw,2))+" ")
+      f.write(" FBW="+str(round(s.filt_sbw,2))+" ")
+      f.write(" PR="+str(round(s.percentile,1))+"\n")
+  write_stats = Callable(write_stats)  
+    
+
+##################### End Model ####################
+
 #################### Model Support ################
-def reset_all_stats():
+def reset_all():
   # Need to keep routers around.. 
   for r in Router.query.all():
-    r.bw_history = [] # XXX: Is this sufficient/correct?
+    r.bw_history = [] # XXX: Is this sufficient/correct/necessary?
     r.circuits = []
     r.streams = []
     r.stats = None
@@ -391,7 +537,7 @@
     self.last_desc_at = time.time()
     self.consensus = None
 
-  # XXX: What about non-running routers and uptime information?
+  # TODO: What about non-running routers and uptime information?
   def _update_rank_history(self, idlist):
     for idhex in idlist:
       if idhex not in self.consensus.routers: continue
@@ -431,6 +577,8 @@
     TorCtl.DualEventListener.set_parent(self, parent_handler)
 
   def heartbeat_event(self, e):
+    # This sketchiness is to ensure we have an accurate history
+    # of each router's rank+bandwidth for the entire duration of the run..
     if e.state == EVENT_STATE.PRELISTEN:
       if not self.consensus: 
         global OP
@@ -522,7 +670,7 @@
 
       e.to_node = Router.query.filter_by(idhex=r_ext[1:]).one()
       if not self.track_parent:
-        # XXX: Eager load here?
+        # FIXME: Eager load here?
         circ.routers.append(e.to_node)
         e.to_node.circuits.append(circ)
         tc_session.update(e.to_node)
@@ -614,10 +762,110 @@
 
 class StreamListener(CircuitListener):
   def stream_bw_event(self, s):
-    pass
+    strm = Stream.query.filter_by(strm_id = s.strm_id).first()
+    if strm:
+      strm.tot_read_bytes += s.bytes_read
+      strm.tot_write_bytes += s.bytes_written
+      tc_session.update(strm)
+      tc_session.commit()
+ 
   def stream_status_event(self, s):
-    pass
+    if s.reason: lreason = s.reason
+    else: lreason = "NONE"
+    if s.remote_reason: rreason = s.remote_reason
+    else: rreason = "NONE"
+    reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
 
+    if s.status in ("NEW", "NEWRESOLVE"):
+      strm = Stream(strm_id=s.strm_id, tgt_host=s.target_host, 
+                    tgt_port=s.target_port, init_status=s.status,
+                    tot_read_bytes=0, tot_write_bytes=0)
+      tc_session.save(strm)
+      tc_session.commit()
+      return
+
+    strm = Stream.query.filter_by(strm_id = s.strm_id).first()
+    if not strm: 
+      plog("NOTICE", "Ignoring prior stream "+str(s.strm_id))
+      return # Ignore prior streams
+
+    if s.statis == "SENTCONNECT":
+      # New circuit
+      strm.circuit = Circuit.query.filter_by(circ_id=s.circ_id).first()
+      if not strm.circuit:
+        plog("NOTICE", "Ignoring prior stream "+str(strm.strm_id)+" with old circuit "+str(s.circ_id))
+        tc_session.delete(strm)
+        tc_session.commit()
+        return
+    else:
+      circ = None
+      if s.circ_id:
+        circ = Circuit.query.filter_by(circ_id=s.circ_id).first()
+      elif self.track_parent:
+        circ = self.parent_handler.streams[s.strm_id].circ
+        if not circ: circ = self.parent_handler.streams[s.strm_id].pending_circ
+        if circ:
+          circ = Circuit.query.filter_by(circ_id=circ.circ_id).first()
+
+      if not circ:
+        plog("WARN", "No circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
+
+      if not strm.circuit:
+        plog("WARN", "No stream circuit for "+str(s.strm_id)+" circ: "+str(s.circ_id))
+        strm.circuit = circ
+
+      # XXX: Verify circ id matches stream.circ
+    
+    if s.status == "SUCCEEDED":
+      strm.start_time = s.arrived_at
+      for r in strm.circuit.routers: 
+        r.streams.add(strm)
+        tc_session.update(r)
+      tc_session.update(strm)
+      tc_session.commit()
+    elif s.status == "DETACHED":
+      strm.detached_circuits.append(circ)
+      strm.circuit.detached_streams.append(strm)
+      for r in strm.circuit.routers: 
+        r.detached_streams.add(strm)
+        tc_session.update(r)
+      tc_session.update(circ)
+      tc_session.update(strm)
+      tc_session.commit()
+    elif s.status == "FAILED":
+      strm.expunge()
+      # Convert to destroyed circuit
+      Stream.table.update(Stream.id ==
+                strm.id).execute(row_type='failedstream')
+      strm = FailedStream.query.filter_by(id=strm.id).one()
+      strm.fail_time = s.arrived_at
+      strm.fail_reason = reason
+      tc_session.update(strm)
+      tc_session.commit()
+    elif s.status == "CLOSED":
+      if isinstance(strm, FailedStream):
+        strm.close_reason = reason
+      else:
+        strm.expunge()
+        if not (lreason == "DONE" or (lreason == "END" and rreason == "DONE")):
+          # Convert to destroyed circuit
+          Stream.table.update(Stream.id ==
+                    strm.id).execute(row_type='failedstream')
+          strm = FailedStream.query.filter_by(id=strm.id).one()
+          strm.fail_time = s.arrived_at
+        else: 
+          strm.expunge()
+          # Convert to destroyed circuit
+          Stream.table.update(Stream.id ==
+                    strm.id).execute(row_type='closedstream')
+          strm = ClosedStream.query.filter_by(id=strm.id).one()
+          strm.read_bandwidth = strm.tot_read_bytes/(s.arrived_at-strm.start_time)
+          strm.write_bandwidth = strm.tot_write_bytes/(s.arrived_at-strm.start_time)
+          strm.end_time = s.arrived_at
+        strm.close_reason = reason
+      tc_session.update(strm)
+      tc_session.commit()
+
 def run_example(host, port):
   """ Example of basic TorCtl usage. See PathSupport for more advanced
       usage.



More information about the tor-commits mailing list