[or-cvs] r20281: {torctl} Initial scansupport component commit. (torctl/trunk/python/TorCtl)

mikeperry at seul.org mikeperry at seul.org
Thu Aug 13 19:43:12 UTC 2009


Author: mikeperry
Date: 2009-08-13 15:43:11 -0400 (Thu, 13 Aug 2009)
New Revision: 20281

Added:
   torctl/trunk/python/TorCtl/ScanSupport.py
Modified:
   torctl/trunk/python/TorCtl/__init__.py
Log:

Initial scansupport component commit.



Added: torctl/trunk/python/TorCtl/ScanSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/ScanSupport.py	                        (rev 0)
+++ torctl/trunk/python/TorCtl/ScanSupport.py	2009-08-13 19:43:11 UTC (rev 20281)
@@ -0,0 +1,243 @@
+import PathSupport
+import SQLSupport
+import threading
+import copy
+import time
+import shutil
+
+from TorUtil import plog
+
+# Note: be careful writing functions for this class. Remember that
+# the PathBuilder has its own thread that it recieves events on
+# independent from your thread that calls into here.
+class ScanHandler(PathSupport.PathBuilder):
+  def set_pct_rstr(self, percent_skip, percent_fast):
+    def notlambda(sm):
+      sm.percent_fast=percent_fast
+      sm.percent_skip=percent_skip
+    self.schedule_selmgr(notlambda)
+
+  def reset_stats(self):
+    def notlambda(this):
+      this.reset()
+    self.schedule_low_prio(notlambda)
+
+  def commit(self):
+    plog("INFO", "Scanner committing jobs...")
+    cond = threading.Condition()
+    def notlambda2(this):
+      cond.acquire()
+      this.run_all_jobs = False
+      plog("INFO", "Commit done.")
+      cond.notify()
+      cond.release()
+
+    def notlambda1(this):
+      plog("INFO", "Committing jobs...")
+      this.run_all_jobs = True
+      self.schedule_low_prio(notlambda2)
+
+    cond.acquire()
+    self.schedule_immediate(notlambda1)
+
+    cond.wait()
+    cond.release()
+    plog("INFO", "Scanner commit done.")
+
+  def close_circuits(self):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      this.close_all_circuits()
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def close_streams(self, reason):
+    cond = threading.Condition()
+    plog("NOTICE", "Wedged Tor stream. Closing all streams")
+    def notlambda(this):
+      cond.acquire()
+      this.close_all_streams(reason)
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def new_exit(self):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      this.new_nym = True
+      lines = this.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
+      for _,msg,more in lines:
+        plog("DEBUG", msg)
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def idhex_to_r(self, idhex):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      if idhex in self.routers:
+        cond._result = self.routers[idhex]
+      else:
+        cond._result = None
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+    return cond._result
+
+  def name_to_idhex(self, nick):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      if nick in self.name_to_key:
+        cond._result = self.name_to_key[nick]
+      else:
+        cond._result = None
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+    return cond._result
+
+  def rank_to_percent(self, rank):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      cond._pct = (100.0*rank)/len(this.sorted_r) # lol moar haxx
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+    return cond._pct
+
+  def percent_to_rank(self, pct):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      cond._rank = int(round((pct*len(this.sorted_r))/100.0,0)) # lol moar haxx
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+    return cond._rank
+
+  def get_exit_node(self):
+    return copy.copy(self.last_exit) # GIL FTW
+
+  def set_exit_node(self, arg):
+    cond = threading.Condition()
+    exit_name = arg
+    plog("DEBUG", "Got Setexit: "+exit_name)
+    def notlambda(sm):
+      plog("DEBUG", "Job for setexit: "+exit_name)
+      cond.acquire()
+      sm.set_exit(exit_name)
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_selmgr(notlambda)
+    cond.wait()
+    cond.release()
+
+class SQLScanHandler(ScanHandler):
+  def attach_sql_listener(self, db_uri):
+    plog("DEBUG", "Got sqlite: "+db_uri)
+    SQLSupport.setup_db(db_uri, echo=False, drop=True)
+    self.sql_consensus_listener = SQLSupport.ConsensusTrackerListener()
+    self.add_event_listener(self.sql_consensus_listener)
+    self.add_event_listener(SQLSupport.StreamListener())
+
+  def write_sql_stats(self, rfilename=None, stats_filter=None):
+    if not rfilename:
+      rfilename="./data/stats/sql-"+time.strftime("20%y-%m-%d-%H:%M:%S")
+    cond = threading.Condition()
+    def notlambda(h):
+      cond.acquire()
+      SQLSupport.RouterStats.write_stats(file(rfilename, "w"),
+                            0, 100, order_by=SQLSupport.RouterStats.sbw,
+                            recompute=True, disp_clause=stats_filter)
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def write_strm_bws(self, rfilename=None, slice_num=0, stats_filter=None):
+    if not rfilename:
+      rfilename="./data/stats/bws-"+time.strftime("20%y-%m-%d-%H:%M:%S")
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      f=file(rfilename, "w")
+      f.write("slicenum="+str(slice_num)+"\n")
+      SQLSupport.RouterStats.write_bws(f, 0, 100,
+                            order_by=SQLSupport.RouterStats.sbw,
+                            recompute=False, disp_clause=stats_filter)
+      f.close()
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def save_sql_file(self, sql_file, new_file):
+    cond = threading.Condition()
+    def notlambda(this):
+      cond.acquire()
+      SQLSupport.tc_session.close()
+      try:
+        shutil.copy(sql_file, new_file)
+      except Exception,e:
+        plog("WARN", "Error moving sql file: "+str(e))
+      SQLSupport.reset_all()
+      cond.notify()
+      cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
+  def wait_for_consensus(self):
+    cond = threading.Condition()
+    def notlambda(this):
+      if this.sql_consensus_listener.last_desc_at \
+                 != SQLSupport.ConsensusTrackerListener.CONSENSUS_DONE:
+        this.sql_consensus_listener.wait_for_signal = False
+        plog("INFO", "Waiting on consensus result: "+str(this.run_all_jobs))
+        this.schedule_low_prio(notlambda)
+      else:
+        cond.acquire()
+        this.sql_consensus_listener.wait_for_signal = True
+        cond.notify()
+        cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+    plog("INFO", "Consensus OK")
+
+
+

Modified: torctl/trunk/python/TorCtl/__init__.py
===================================================================
--- torctl/trunk/python/TorCtl/__init__.py	2009-08-13 19:40:01 UTC (rev 20280)
+++ torctl/trunk/python/TorCtl/__init__.py	2009-08-13 19:43:11 UTC (rev 20281)
@@ -25,4 +25,4 @@
 """
 
 __all__ = ["TorUtil", "GeoIPSupport", "PathSupport", "TorCtl", "StatsSupport",
-           "SQLSupport"]
+           "SQLSupport", "ScanSupport"]



More information about the tor-commits mailing list