[tor-commits] [arm/master] Changing resource tracker to subclass Daemon

atagar at torproject.org atagar at torproject.org
Mon Oct 21 21:10:15 UTC 2013


commit 7b22e543fdd24f2979a912f847aa56c54978d70b
Author: Damian Johnson <atagar at torproject.org>
Date:   Fri Oct 18 14:11:53 2013 -0700

    Changing resource tracker to subclass Daemon
    
    Extending our Daemon new class so we can drop the custom pausing/stopping
    funcionality.
---
 arm/util/sysTools.py |  212 +++++++++++++++++++++-----------------------------
 1 file changed, 89 insertions(+), 123 deletions(-)

diff --git a/arm/util/sysTools.py b/arm/util/sysTools.py
index cd5814d..e8a9e49 100644
--- a/arm/util/sysTools.py
+++ b/arm/util/sysTools.py
@@ -6,6 +6,8 @@ import os
 import time
 import threading
 
+import arm.util.tracker
+
 from stem.util import conf, log, proc, str_tools, system
 
 RESOURCE_TRACKERS = {}  # mapping of pids to their resource tracker instances
@@ -60,7 +62,7 @@ def getResourceTracker(pid, noSpawn = False):
   tracker.start()
   return tracker
 
-class ResourceTracker(threading.Thread):
+class ResourceTracker(arm.util.tracker.Daemon):
   """
   Periodically fetches the resource usage (cpu and memory usage) for a given
   process.
@@ -77,8 +79,7 @@ class ResourceTracker(threading.Thread):
                     disabled if zero
     """
 
-    threading.Thread.__init__(self)
-    self.setDaemon(True)
+    arm.util.tracker.Daemon.__init__(self, resolveRate)
 
     self.processPid = processPid
     self.resolveRate = resolveRate
@@ -95,9 +96,7 @@ class ResourceTracker(threading.Thread):
     self._lastCpuTotal = 0
 
     self.lastLookup = -1
-    self._halt = False      # terminates thread if true
     self._valLock = threading.RLock()
-    self._cond = threading.Condition()  # used for pausing the thread
 
     # number of successful calls we've made
     self._runCount = 0
@@ -133,124 +132,91 @@ class ResourceTracker(threading.Thread):
 
     return self._failureCount != 0
 
-  def run(self):
-    while not self._halt:
-      timeSinceReset = time.time() - self.lastLookup
-
-      if self.resolveRate == 0:
-        self._cond.acquire()
-        if not self._halt: self._cond.wait(0.2)
-        self._cond.release()
-
-        continue
-      elif timeSinceReset < self.resolveRate:
-        sleepTime = max(0.2, self.resolveRate - timeSinceReset)
-
-        self._cond.acquire()
-        if not self._halt: self._cond.wait(sleepTime)
-        self._cond.release()
-
-        continue # done waiting, try again
-
+  def task(self):
+    timeSinceReset = time.time() - self.lastLookup
+
+    newValues = {}
+    try:
+      if self._useProc:
+        utime, stime, startTime = proc.get_stats(self.processPid, proc.Stat.CPU_UTIME, proc.Stat.CPU_STIME, proc.Stat.START_TIME)
+        totalCpuTime = float(utime) + float(stime)
+        cpuDelta = totalCpuTime - self._lastCpuTotal
+        newValues["cpuSampling"] = cpuDelta / timeSinceReset
+        newValues["cpuAvg"] = totalCpuTime / (time.time() - float(startTime))
+        newValues["_lastCpuTotal"] = totalCpuTime
+
+        memUsage = int(proc.get_memory_usage(self.processPid)[0])
+        totalMemory = proc.get_physical_memory()
+        newValues["memUsage"] = memUsage
+        newValues["memUsagePercentage"] = float(memUsage) / totalMemory
+      else:
+        # the ps call formats results as:
+        #
+        #     TIME     ELAPSED   RSS %MEM
+        # 3-08:06:32 21-00:00:12 121844 23.5
+        #
+        # or if Tor has only recently been started:
+        #
+        #     TIME      ELAPSED    RSS %MEM
+        #  0:04.40        37:57  18772  0.9
+
+        psCall = system.call("ps -p %s -o cputime,etime,rss,%%mem" % self.processPid)
+
+        isSuccessful = False
+        if psCall and len(psCall) >= 2:
+          stats = psCall[1].strip().split()
+
+          if len(stats) == 4:
+            try:
+              totalCpuTime = str_tools.parse_short_time_label(stats[0])
+              uptime = str_tools.parse_short_time_label(stats[1])
+              cpuDelta = totalCpuTime - self._lastCpuTotal
+              newValues["cpuSampling"] = cpuDelta / timeSinceReset
+              newValues["cpuAvg"] = totalCpuTime / uptime
+              newValues["_lastCpuTotal"] = totalCpuTime
+
+              newValues["memUsage"] = int(stats[2]) * 1024 # ps size is in kb
+              newValues["memUsagePercentage"] = float(stats[3]) / 100.0
+              isSuccessful = True
+            except ValueError, exc: pass
+
+        if not isSuccessful:
+          raise IOError("unrecognized output from ps: %s" % psCall)
+    except IOError, exc:
       newValues = {}
-      try:
-        if self._useProc:
-          utime, stime, startTime = proc.get_stats(self.processPid, proc.Stat.CPU_UTIME, proc.Stat.CPU_STIME, proc.Stat.START_TIME)
-          totalCpuTime = float(utime) + float(stime)
-          cpuDelta = totalCpuTime - self._lastCpuTotal
-          newValues["cpuSampling"] = cpuDelta / timeSinceReset
-          newValues["cpuAvg"] = totalCpuTime / (time.time() - float(startTime))
-          newValues["_lastCpuTotal"] = totalCpuTime
-
-          memUsage = int(proc.get_memory_usage(self.processPid)[0])
-          totalMemory = proc.get_physical_memory()
-          newValues["memUsage"] = memUsage
-          newValues["memUsagePercentage"] = float(memUsage) / totalMemory
-        else:
-          # the ps call formats results as:
-          #
-          #     TIME     ELAPSED   RSS %MEM
-          # 3-08:06:32 21-00:00:12 121844 23.5
-          #
-          # or if Tor has only recently been started:
-          #
-          #     TIME      ELAPSED    RSS %MEM
-          #  0:04.40        37:57  18772  0.9
-
-          psCall = system.call("ps -p %s -o cputime,etime,rss,%%mem" % self.processPid)
-
-          isSuccessful = False
-          if psCall and len(psCall) >= 2:
-            stats = psCall[1].strip().split()
-
-            if len(stats) == 4:
-              try:
-                totalCpuTime = str_tools.parse_short_time_label(stats[0])
-                uptime = str_tools.parse_short_time_label(stats[1])
-                cpuDelta = totalCpuTime - self._lastCpuTotal
-                newValues["cpuSampling"] = cpuDelta / timeSinceReset
-                newValues["cpuAvg"] = totalCpuTime / uptime
-                newValues["_lastCpuTotal"] = totalCpuTime
-
-                newValues["memUsage"] = int(stats[2]) * 1024 # ps size is in kb
-                newValues["memUsagePercentage"] = float(stats[3]) / 100.0
-                isSuccessful = True
-              except ValueError, exc: pass
-
-          if not isSuccessful:
-            raise IOError("unrecognized output from ps: %s" % psCall)
-      except IOError, exc:
-        newValues = {}
-        self._failureCount += 1
-
-        if self._useProc:
-          if self._failureCount >= 3:
-            # We've failed three times resolving via proc. Warn, and fall back
-            # to ps resolutions.
-            log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc)
-
-            self._useProc = False
-            self._failureCount = 1 # prevents lastQueryFailed() from thinking that we succeeded
-          else:
-            # wait a bit and try again
-            log.debug("Unable to query process resource usage from proc (%s)" % exc)
-            self._cond.acquire()
-            if not self._halt: self._cond.wait(0.5)
-            self._cond.release()
-        else:
-          # exponential backoff on making failed ps calls
-          sleepTime = 0.01 * (2 ** self._failureCount) + self._failureCount
-          log.debug("Unable to query process resource usage from ps, waiting %0.2f seconds (%s)" % (sleepTime, exc))
-          self._cond.acquire()
-          if not self._halt: self._cond.wait(sleepTime)
-          self._cond.release()
-
-      # sets the new values
-      if newValues:
-        # If this is the first run then the cpuSampling stat is meaningless
-        # (there isn't a previous tick to sample from so it's zero at this
-        # point). Setting it to the average, which is a fairer estimate.
-        if self.lastLookup == -1:
-          newValues["cpuSampling"] = newValues["cpuAvg"]
-
-        self._valLock.acquire()
-        self.cpuSampling = newValues["cpuSampling"]
-        self.cpuAvg = newValues["cpuAvg"]
-        self.memUsage = newValues["memUsage"]
-        self.memUsagePercentage = newValues["memUsagePercentage"]
-        self._lastCpuTotal = newValues["_lastCpuTotal"]
-        self.lastLookup = time.time()
-        self._runCount += 1
-        self._failureCount = 0
-        self._valLock.release()
-
-  def stop(self):
-    """
-    Halts further resolutions and terminates the thread.
-    """
+      self._failureCount += 1
 
-    self._cond.acquire()
-    self._halt = True
-    self._cond.notifyAll()
-    self._cond.release()
+      if self._useProc:
+        if self._failureCount >= 3:
+          # We've failed three times resolving via proc. Warn, and fall back
+          # to ps resolutions.
+          log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc)
 
+          self._useProc = False
+          self._failureCount = 1 # prevents lastQueryFailed() from thinking that we succeeded
+        else:
+          # wait a bit and try again
+          log.debug("Unable to query process resource usage from proc (%s)" % exc)
+      else:
+        # exponential backoff on making failed ps calls
+        sleepTime = 0.01 * (2 ** self._failureCount) + self._failureCount
+        log.debug("Unable to query process resource usage from ps, waiting %0.2f seconds (%s)" % (sleepTime, exc))
+
+    # sets the new values
+    if newValues:
+      # If this is the first run then the cpuSampling stat is meaningless
+      # (there isn't a previous tick to sample from so it's zero at this
+      # point). Setting it to the average, which is a fairer estimate.
+      if self.lastLookup == -1:
+        newValues["cpuSampling"] = newValues["cpuAvg"]
+
+      self._valLock.acquire()
+      self.cpuSampling = newValues["cpuSampling"]
+      self.cpuAvg = newValues["cpuAvg"]
+      self.memUsage = newValues["memUsage"]
+      self.memUsagePercentage = newValues["memUsagePercentage"]
+      self._lastCpuTotal = newValues["_lastCpuTotal"]
+      self.lastLookup = time.time()
+      self._runCount += 1
+      self._failureCount = 0
+      self._valLock.release()





More information about the tor-commits mailing list