commit 7b22e543fdd24f2979a912f847aa56c54978d70b Author: Damian Johnson atagar@torproject.org Date: Fri Oct 18 14:11:53 2013 -0700
Changing resource tracker to subclass Daemon
Extending our Daemon new class so we can drop the custom pausing/stopping funcionality. --- arm/util/sysTools.py | 212 +++++++++++++++++++++----------------------------- 1 file changed, 89 insertions(+), 123 deletions(-)
diff --git a/arm/util/sysTools.py b/arm/util/sysTools.py index cd5814d..e8a9e49 100644 --- a/arm/util/sysTools.py +++ b/arm/util/sysTools.py @@ -6,6 +6,8 @@ import os import time import threading
+import arm.util.tracker + from stem.util import conf, log, proc, str_tools, system
RESOURCE_TRACKERS = {} # mapping of pids to their resource tracker instances @@ -60,7 +62,7 @@ def getResourceTracker(pid, noSpawn = False): tracker.start() return tracker
-class ResourceTracker(threading.Thread): +class ResourceTracker(arm.util.tracker.Daemon): """ Periodically fetches the resource usage (cpu and memory usage) for a given process. @@ -77,8 +79,7 @@ class ResourceTracker(threading.Thread): disabled if zero """
- threading.Thread.__init__(self) - self.setDaemon(True) + arm.util.tracker.Daemon.__init__(self, resolveRate)
self.processPid = processPid self.resolveRate = resolveRate @@ -95,9 +96,7 @@ class ResourceTracker(threading.Thread): self._lastCpuTotal = 0
self.lastLookup = -1 - self._halt = False # terminates thread if true self._valLock = threading.RLock() - self._cond = threading.Condition() # used for pausing the thread
# number of successful calls we've made self._runCount = 0 @@ -133,124 +132,91 @@ class ResourceTracker(threading.Thread):
return self._failureCount != 0
- def run(self): - while not self._halt: - timeSinceReset = time.time() - self.lastLookup - - if self.resolveRate == 0: - self._cond.acquire() - if not self._halt: self._cond.wait(0.2) - self._cond.release() - - continue - elif timeSinceReset < self.resolveRate: - sleepTime = max(0.2, self.resolveRate - timeSinceReset) - - self._cond.acquire() - if not self._halt: self._cond.wait(sleepTime) - self._cond.release() - - continue # done waiting, try again - + def task(self): + timeSinceReset = time.time() - self.lastLookup + + newValues = {} + try: + if self._useProc: + utime, stime, startTime = proc.get_stats(self.processPid, proc.Stat.CPU_UTIME, proc.Stat.CPU_STIME, proc.Stat.START_TIME) + totalCpuTime = float(utime) + float(stime) + cpuDelta = totalCpuTime - self._lastCpuTotal + newValues["cpuSampling"] = cpuDelta / timeSinceReset + newValues["cpuAvg"] = totalCpuTime / (time.time() - float(startTime)) + newValues["_lastCpuTotal"] = totalCpuTime + + memUsage = int(proc.get_memory_usage(self.processPid)[0]) + totalMemory = proc.get_physical_memory() + newValues["memUsage"] = memUsage + newValues["memUsagePercentage"] = float(memUsage) / totalMemory + else: + # the ps call formats results as: + # + # TIME ELAPSED RSS %MEM + # 3-08:06:32 21-00:00:12 121844 23.5 + # + # or if Tor has only recently been started: + # + # TIME ELAPSED RSS %MEM + # 0:04.40 37:57 18772 0.9 + + psCall = system.call("ps -p %s -o cputime,etime,rss,%%mem" % self.processPid) + + isSuccessful = False + if psCall and len(psCall) >= 2: + stats = psCall[1].strip().split() + + if len(stats) == 4: + try: + totalCpuTime = str_tools.parse_short_time_label(stats[0]) + uptime = str_tools.parse_short_time_label(stats[1]) + cpuDelta = totalCpuTime - self._lastCpuTotal + newValues["cpuSampling"] = cpuDelta / timeSinceReset + newValues["cpuAvg"] = totalCpuTime / uptime + newValues["_lastCpuTotal"] = totalCpuTime + + newValues["memUsage"] = int(stats[2]) * 1024 # ps size is in kb + newValues["memUsagePercentage"] = float(stats[3]) / 100.0 + isSuccessful = True + except ValueError, exc: pass + + if not isSuccessful: + raise IOError("unrecognized output from ps: %s" % psCall) + except IOError, exc: newValues = {} - try: - if self._useProc: - utime, stime, startTime = proc.get_stats(self.processPid, proc.Stat.CPU_UTIME, proc.Stat.CPU_STIME, proc.Stat.START_TIME) - totalCpuTime = float(utime) + float(stime) - cpuDelta = totalCpuTime - self._lastCpuTotal - newValues["cpuSampling"] = cpuDelta / timeSinceReset - newValues["cpuAvg"] = totalCpuTime / (time.time() - float(startTime)) - newValues["_lastCpuTotal"] = totalCpuTime - - memUsage = int(proc.get_memory_usage(self.processPid)[0]) - totalMemory = proc.get_physical_memory() - newValues["memUsage"] = memUsage - newValues["memUsagePercentage"] = float(memUsage) / totalMemory - else: - # the ps call formats results as: - # - # TIME ELAPSED RSS %MEM - # 3-08:06:32 21-00:00:12 121844 23.5 - # - # or if Tor has only recently been started: - # - # TIME ELAPSED RSS %MEM - # 0:04.40 37:57 18772 0.9 - - psCall = system.call("ps -p %s -o cputime,etime,rss,%%mem" % self.processPid) - - isSuccessful = False - if psCall and len(psCall) >= 2: - stats = psCall[1].strip().split() - - if len(stats) == 4: - try: - totalCpuTime = str_tools.parse_short_time_label(stats[0]) - uptime = str_tools.parse_short_time_label(stats[1]) - cpuDelta = totalCpuTime - self._lastCpuTotal - newValues["cpuSampling"] = cpuDelta / timeSinceReset - newValues["cpuAvg"] = totalCpuTime / uptime - newValues["_lastCpuTotal"] = totalCpuTime - - newValues["memUsage"] = int(stats[2]) * 1024 # ps size is in kb - newValues["memUsagePercentage"] = float(stats[3]) / 100.0 - isSuccessful = True - except ValueError, exc: pass - - if not isSuccessful: - raise IOError("unrecognized output from ps: %s" % psCall) - except IOError, exc: - newValues = {} - self._failureCount += 1 - - if self._useProc: - if self._failureCount >= 3: - # We've failed three times resolving via proc. Warn, and fall back - # to ps resolutions. - log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc) - - self._useProc = False - self._failureCount = 1 # prevents lastQueryFailed() from thinking that we succeeded - else: - # wait a bit and try again - log.debug("Unable to query process resource usage from proc (%s)" % exc) - self._cond.acquire() - if not self._halt: self._cond.wait(0.5) - self._cond.release() - else: - # exponential backoff on making failed ps calls - sleepTime = 0.01 * (2 ** self._failureCount) + self._failureCount - log.debug("Unable to query process resource usage from ps, waiting %0.2f seconds (%s)" % (sleepTime, exc)) - self._cond.acquire() - if not self._halt: self._cond.wait(sleepTime) - self._cond.release() - - # sets the new values - if newValues: - # If this is the first run then the cpuSampling stat is meaningless - # (there isn't a previous tick to sample from so it's zero at this - # point). Setting it to the average, which is a fairer estimate. - if self.lastLookup == -1: - newValues["cpuSampling"] = newValues["cpuAvg"] - - self._valLock.acquire() - self.cpuSampling = newValues["cpuSampling"] - self.cpuAvg = newValues["cpuAvg"] - self.memUsage = newValues["memUsage"] - self.memUsagePercentage = newValues["memUsagePercentage"] - self._lastCpuTotal = newValues["_lastCpuTotal"] - self.lastLookup = time.time() - self._runCount += 1 - self._failureCount = 0 - self._valLock.release() - - def stop(self): - """ - Halts further resolutions and terminates the thread. - """ + self._failureCount += 1
- self._cond.acquire() - self._halt = True - self._cond.notifyAll() - self._cond.release() + if self._useProc: + if self._failureCount >= 3: + # We've failed three times resolving via proc. Warn, and fall back + # to ps resolutions. + log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc)
+ self._useProc = False + self._failureCount = 1 # prevents lastQueryFailed() from thinking that we succeeded + else: + # wait a bit and try again + log.debug("Unable to query process resource usage from proc (%s)" % exc) + else: + # exponential backoff on making failed ps calls + sleepTime = 0.01 * (2 ** self._failureCount) + self._failureCount + log.debug("Unable to query process resource usage from ps, waiting %0.2f seconds (%s)" % (sleepTime, exc)) + + # sets the new values + if newValues: + # If this is the first run then the cpuSampling stat is meaningless + # (there isn't a previous tick to sample from so it's zero at this + # point). Setting it to the average, which is a fairer estimate. + if self.lastLookup == -1: + newValues["cpuSampling"] = newValues["cpuAvg"] + + self._valLock.acquire() + self.cpuSampling = newValues["cpuSampling"] + self.cpuAvg = newValues["cpuAvg"] + self.memUsage = newValues["memUsage"] + self.memUsagePercentage = newValues["memUsagePercentage"] + self._lastCpuTotal = newValues["_lastCpuTotal"] + self.lastLookup = time.time() + self._runCount += 1 + self._failureCount = 0 + self._valLock.release()