commit f9f3ca5746ba288bd67b7e1e2eab8bb338525c4a Author: Damian Johnson atagar@torproject.org Date: Mon Oct 28 20:39:40 2013 -0700
Rewriting the ResourceTracker class
Breaking the ResourceTracker's large, gross _task() into helper methods. This in turn greatly simplifies the whole class. --- arm/util/tracker.py | 181 ++++++++++++++++++++++++++++----------------------- 1 file changed, 98 insertions(+), 83 deletions(-)
diff --git a/arm/util/tracker.py b/arm/util/tracker.py index f8eb108..ea795d0 100644 --- a/arm/util/tracker.py +++ b/arm/util/tracker.py @@ -61,6 +61,7 @@ Resources = collections.namedtuple('Resources', [ 'timestamp', ])
+ def get_connection_tracker(): """ Singleton for tracking the connections established by tor. @@ -332,12 +333,8 @@ class ResourceTracker(Daemon): super(ResourceTracker, self).__init__(CONFIG['queries.resources.rate'])
self._resources = None - - # resolves usage via proc results if true, ps otherwise - self._use_proc = proc.is_available() - - # sequential times we've failed with this method of resolution - self._failure_count = 0 + self._use_proc = proc.is_available() # determines if we use proc or ps for lookups + self._failure_count = 0 # number of times in a row we've failed to get results
def get_resource_usage(self): """ @@ -350,97 +347,115 @@ class ResourceTracker(Daemon): return result if result else Resources(0.0, 0.0, 0.0, 0, 0.0, 0.0)
def _task(self, process_pid, process_name): - last_cpu_total = self._resources.cpu_total if self._resources else 0 - last_lookup = self._resources.timestamp if self._resources else -1 - - time_since_reset = time.time() - last_lookup - new_values = {} - try: if self._use_proc: - utime, stime, start_time = proc.get_stats(process_pid, proc.Stat.CPU_UTIME, proc.Stat.CPU_STIME, proc.Stat.START_TIME) - total_cpu_time = float(utime) + float(stime) - cpu_delta = total_cpu_time - last_cpu_total - new_values["cpuSampling"] = cpu_delta / time_since_reset - new_values["cpuAvg"] = total_cpu_time / (time.time() - float(start_time)) - new_values["_lastCpuTotal"] = total_cpu_time - - mem_usage = int(proc.get_memory_usage(process_pid)[0]) - total_memory = proc.get_physical_memory() - new_values["memUsage"] = mem_usage - new_values["memUsagePercentage"] = float(mem_usage) / total_memory + self._resources = self._proc_results(process_pid) else: - # the ps call formats results as: - # - # TIME ELAPSED RSS %MEM - # 3-08:06:32 21-00:00:12 121844 23.5 - # - # or if Tor has only recently been started: - # - # TIME ELAPSED RSS %MEM - # 0:04.40 37:57 18772 0.9 - - ps_call = system.call("ps -p {pid} -o cputime,etime,rss,%mem".format(pid = process_pid)) - - is_successful = False - if ps_call and len(ps_call) >= 2: - stats = ps_call[1].strip().split() - - if len(stats) == 4: - try: - total_cpu_time = str_tools.parse_short_time_label(stats[0]) - uptime = str_tools.parse_short_time_label(stats[1]) - cpu_delta = total_cpu_time - last_cpu_total - new_values["cpuSampling"] = cpu_delta / time_since_reset - new_values["cpuAvg"] = total_cpu_time / uptime - new_values["_lastCpuTotal"] = total_cpu_time - - new_values["memUsage"] = int(stats[2]) * 1024 # ps size is in kb - new_values["memUsagePercentage"] = float(stats[3]) / 100.0 - is_successful = True - except ValueError, exc: pass - - if not is_successful: - raise IOError("unrecognized output from ps: %s" % ps_call) - except IOError, exc: - new_values = {} + self._resources = self._ps_results(process_pid) + + self._failure_count = 0 + return True + except IOError as exc: self._failure_count += 1
if self._use_proc: if self._failure_count >= 3: # We've failed three times resolving via proc. Warn, and fall back # to ps resolutions. - log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc)
self._use_proc = False self._failure_count = 0 + log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc) else: - # wait a bit and try again log.debug("Unable to query process resource usage from proc (%s)" % exc) else: - # exponential backoff on making failed ps calls - sleep_time = 0.01 * (2 ** self._failure_count) + self._failure_count - log.debug("Unable to query process resource usage from ps, waiting %0.2f seconds (%s)" % (sleep_time, exc)) - - # sets the new values - if new_values: - # If this is the first run then the cpuSampling stat is meaningless - # (there isn't a previous tick to sample from so it's zero at this - # point). Setting it to the average, which is a fairer estimate. - if last_lookup == -1: - new_values["cpuSampling"] = new_values["cpuAvg"] - - with self.val_lock: - self._resources = Resources( - cpu_sample = new_values["cpuSampling"], - cpu_average = new_values["cpuAvg"], - cpu_total = new_values["_lastCpuTotal"], - memory_bytes = new_values["memUsage"], - memory_precent = new_values["memUsagePercentage"], - timestamp = time.time(), - ) + if self._failure_count >= 3: + # Give up on further attempts. + + log.info("Failed three attempts to get process resource usage from ps, giving up on getting resource usage information (%s)" % exc) + self.stop() + else: + log.debug("Unable to query process resource usage from ps (%s)" % exc)
- self._failure_count = 0 - return True - else: return False + + def _proc_results(self, process_pid): + """ + Resolves the process resource usage via proc. + + :returns: **Resource** instance for its present resource usage + + :throws: **IOError** if unable to retrieve information from proc + """ + + utime, stime, start_time = proc.get_stats( + process_pid, + proc.Stat.CPU_UTIME, + proc.Stat.CPU_STIME, + proc.Stat.START_TIME, + ) + + total_cpu_time = float(utime) + float(stime) + mem_usage = proc.get_memory_usage(process_pid)[0] + total_memory = proc.get_physical_memory() + + if self._resources: + cpu_sample = (total_cpu_time - self._resources.cpu_total) / self._resources.cpu_total + else: + cpu_sample = 0.0 # we need a prior datapoint to give a sampling + + return Resources( + cpu_sample = cpu_sample, + cpu_average = total_cpu_time / (time.time() - float(start_time)), + cpu_total = total_cpu_time, + memory_bytes = mem_usage, + memory_precent = float(mem_usage) / total_memory, + timestamp = time.time(), + ) + + def _ps_results(self, process_pid): + """ + Resolves the process resource usage via ps. + + :returns: **Resource** instance for its present resource usage + + :throws: **IOError** if unable to retrieve information from proc + """ + + # ps results are of the form... + # + # TIME ELAPSED RSS %MEM + # 3-08:06:32 21-00:00:12 121844 23.5 + # + # ... or if Tor has only recently been started... + # + # TIME ELAPSED RSS %MEM + # 0:04.40 37:57 18772 0.9 + + ps_call = system.call("ps -p {pid} -o cputime,etime,rss,%mem".format(pid = process_pid)) + + if ps_call and len(ps_call) >= 2: + stats = ps_call[1].strip().split() + + if len(stats) == 4: + try: + total_cpu_time = str_tools.parse_short_time_label(stats[0]) + uptime = str_tools.parse_short_time_label(stats[1]) + + if self._resources: + cpu_sample = (total_cpu_time - self._resources.cpu_total) / self._resources.cpu_total + else: + cpu_sample = 0.0 # we need a prior datapoint to give a sampling + + return Resources( + cpu_sample = cpu_sample, + cpu_average = total_cpu_time / uptime, + cpu_total = total_cpu_time, + memory_bytes = int(stats[2]) * 1024, # ps size is in kb + memory_precent = float(stats[3]) / 100.0, + timestamp = time.time(), + ) + except ValueError: + pass + + raise IOError("unrecognized output from ps: %s" % ps_call)
tor-commits@lists.torproject.org