commit f9f3ca5746ba288bd67b7e1e2eab8bb338525c4a
Author: Damian Johnson <atagar(a)torproject.org>
Date: Mon Oct 28 20:39:40 2013 -0700
Rewriting the ResourceTracker class
Breaking the ResourceTracker's large, gross _task() into helper methods. This
in turn greatly simplifies the whole class.
---
arm/util/tracker.py | 181 ++++++++++++++++++++++++++++-----------------------
1 file changed, 98 insertions(+), 83 deletions(-)
diff --git a/arm/util/tracker.py b/arm/util/tracker.py
index f8eb108..ea795d0 100644
--- a/arm/util/tracker.py
+++ b/arm/util/tracker.py
@@ -61,6 +61,7 @@ Resources = collections.namedtuple('Resources', [
'timestamp',
])
+
def get_connection_tracker():
"""
Singleton for tracking the connections established by tor.
@@ -332,12 +333,8 @@ class ResourceTracker(Daemon):
super(ResourceTracker, self).__init__(CONFIG['queries.resources.rate'])
self._resources = None
-
- # resolves usage via proc results if true, ps otherwise
- self._use_proc = proc.is_available()
-
- # sequential times we've failed with this method of resolution
- self._failure_count = 0
+ self._use_proc = proc.is_available() # determines if we use proc or ps for lookups
+ self._failure_count = 0 # number of times in a row we've failed to get results
def get_resource_usage(self):
"""
@@ -350,97 +347,115 @@ class ResourceTracker(Daemon):
return result if result else Resources(0.0, 0.0, 0.0, 0, 0.0, 0.0)
def _task(self, process_pid, process_name):
- last_cpu_total = self._resources.cpu_total if self._resources else 0
- last_lookup = self._resources.timestamp if self._resources else -1
-
- time_since_reset = time.time() - last_lookup
- new_values = {}
-
try:
if self._use_proc:
- utime, stime, start_time = proc.get_stats(process_pid, proc.Stat.CPU_UTIME, proc.Stat.CPU_STIME, proc.Stat.START_TIME)
- total_cpu_time = float(utime) + float(stime)
- cpu_delta = total_cpu_time - last_cpu_total
- new_values["cpuSampling"] = cpu_delta / time_since_reset
- new_values["cpuAvg"] = total_cpu_time / (time.time() - float(start_time))
- new_values["_lastCpuTotal"] = total_cpu_time
-
- mem_usage = int(proc.get_memory_usage(process_pid)[0])
- total_memory = proc.get_physical_memory()
- new_values["memUsage"] = mem_usage
- new_values["memUsagePercentage"] = float(mem_usage) / total_memory
+ self._resources = self._proc_results(process_pid)
else:
- # the ps call formats results as:
- #
- # TIME ELAPSED RSS %MEM
- # 3-08:06:32 21-00:00:12 121844 23.5
- #
- # or if Tor has only recently been started:
- #
- # TIME ELAPSED RSS %MEM
- # 0:04.40 37:57 18772 0.9
-
- ps_call = system.call("ps -p {pid} -o cputime,etime,rss,%mem".format(pid = process_pid))
-
- is_successful = False
- if ps_call and len(ps_call) >= 2:
- stats = ps_call[1].strip().split()
-
- if len(stats) == 4:
- try:
- total_cpu_time = str_tools.parse_short_time_label(stats[0])
- uptime = str_tools.parse_short_time_label(stats[1])
- cpu_delta = total_cpu_time - last_cpu_total
- new_values["cpuSampling"] = cpu_delta / time_since_reset
- new_values["cpuAvg"] = total_cpu_time / uptime
- new_values["_lastCpuTotal"] = total_cpu_time
-
- new_values["memUsage"] = int(stats[2]) * 1024 # ps size is in kb
- new_values["memUsagePercentage"] = float(stats[3]) / 100.0
- is_successful = True
- except ValueError, exc: pass
-
- if not is_successful:
- raise IOError("unrecognized output from ps: %s" % ps_call)
- except IOError, exc:
- new_values = {}
+ self._resources = self._ps_results(process_pid)
+
+ self._failure_count = 0
+ return True
+ except IOError as exc:
self._failure_count += 1
if self._use_proc:
if self._failure_count >= 3:
# We've failed three times resolving via proc. Warn, and fall back
# to ps resolutions.
- log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc)
self._use_proc = False
self._failure_count = 0
+ log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc)
else:
- # wait a bit and try again
log.debug("Unable to query process resource usage from proc (%s)" % exc)
else:
- # exponential backoff on making failed ps calls
- sleep_time = 0.01 * (2 ** self._failure_count) + self._failure_count
- log.debug("Unable to query process resource usage from ps, waiting %0.2f seconds (%s)" % (sleep_time, exc))
-
- # sets the new values
- if new_values:
- # If this is the first run then the cpuSampling stat is meaningless
- # (there isn't a previous tick to sample from so it's zero at this
- # point). Setting it to the average, which is a fairer estimate.
- if last_lookup == -1:
- new_values["cpuSampling"] = new_values["cpuAvg"]
-
- with self.val_lock:
- self._resources = Resources(
- cpu_sample = new_values["cpuSampling"],
- cpu_average = new_values["cpuAvg"],
- cpu_total = new_values["_lastCpuTotal"],
- memory_bytes = new_values["memUsage"],
- memory_precent = new_values["memUsagePercentage"],
- timestamp = time.time(),
- )
+ if self._failure_count >= 3:
+ # Give up on further attempts.
+
+ log.info("Failed three attempts to get process resource usage from ps, giving up on getting resource usage information (%s)" % exc)
+ self.stop()
+ else:
+ log.debug("Unable to query process resource usage from ps (%s)" % exc)
- self._failure_count = 0
- return True
- else:
return False
+
+ def _proc_results(self, process_pid):
+ """
+ Resolves the process resource usage via proc.
+
+ :returns: **Resource** instance for its present resource usage
+
+ :throws: **IOError** if unable to retrieve information from proc
+ """
+
+ utime, stime, start_time = proc.get_stats(
+ process_pid,
+ proc.Stat.CPU_UTIME,
+ proc.Stat.CPU_STIME,
+ proc.Stat.START_TIME,
+ )
+
+ total_cpu_time = float(utime) + float(stime)
+ mem_usage = proc.get_memory_usage(process_pid)[0]
+ total_memory = proc.get_physical_memory()
+
+ if self._resources:
+ cpu_sample = (total_cpu_time - self._resources.cpu_total) / self._resources.cpu_total
+ else:
+ cpu_sample = 0.0 # we need a prior datapoint to give a sampling
+
+ return Resources(
+ cpu_sample = cpu_sample,
+ cpu_average = total_cpu_time / (time.time() - float(start_time)),
+ cpu_total = total_cpu_time,
+ memory_bytes = mem_usage,
+ memory_precent = float(mem_usage) / total_memory,
+ timestamp = time.time(),
+ )
+
+ def _ps_results(self, process_pid):
+ """
+ Resolves the process resource usage via ps.
+
+ :returns: **Resource** instance for its present resource usage
+
+ :throws: **IOError** if unable to retrieve information from proc
+ """
+
+ # ps results are of the form...
+ #
+ # TIME ELAPSED RSS %MEM
+ # 3-08:06:32 21-00:00:12 121844 23.5
+ #
+ # ... or if Tor has only recently been started...
+ #
+ # TIME ELAPSED RSS %MEM
+ # 0:04.40 37:57 18772 0.9
+
+ ps_call = system.call("ps -p {pid} -o cputime,etime,rss,%mem".format(pid = process_pid))
+
+ if ps_call and len(ps_call) >= 2:
+ stats = ps_call[1].strip().split()
+
+ if len(stats) == 4:
+ try:
+ total_cpu_time = str_tools.parse_short_time_label(stats[0])
+ uptime = str_tools.parse_short_time_label(stats[1])
+
+ if self._resources:
+ cpu_sample = (total_cpu_time - self._resources.cpu_total) / self._resources.cpu_total
+ else:
+ cpu_sample = 0.0 # we need a prior datapoint to give a sampling
+
+ return Resources(
+ cpu_sample = cpu_sample,
+ cpu_average = total_cpu_time / uptime,
+ cpu_total = total_cpu_time,
+ memory_bytes = int(stats[2]) * 1024, # ps size is in kb
+ memory_precent = float(stats[3]) / 100.0,
+ timestamp = time.time(),
+ )
+ except ValueError:
+ pass
+
+ raise IOError("unrecognized output from ps: %s" % ps_call)