commit 56cd312f9a2dac933f9db7e09d88f4dc8fed0613 Author: Damian Johnson atagar@torproject.org Date: Sun Oct 20 19:48:14 2013 -0700
Moving resource usage tracker to tracker module
Little renaming for PEP8 compliance but otherwise pretty much a bulk move of the resource usage tracker. More refactoring needed, but the sysTools module is now close to being droppable. --- arm/connections/connPanel.py | 8 +- arm/controller.py | 8 +- arm/graphing/connStats.py | 2 +- arm/graphing/resourceStats.py | 11 ++- arm/headerPanel.py | 10 +- arm/menu/actions.py | 2 +- arm/starter.py | 4 +- arm/util/sysTools.py | 193 -------------------------------------- arm/util/tracker.py | 205 ++++++++++++++++++++++++++++++++++++++--- 9 files changed, 215 insertions(+), 228 deletions(-)
diff --git a/arm/connections/connPanel.py b/arm/connections/connPanel.py index 7db0e8e..4abc068 100644 --- a/arm/connections/connPanel.py +++ b/arm/connections/connPanel.py @@ -242,7 +242,7 @@ class ConnectionPanel(panel.Panel, threading.Thread): # provides a menu to pick the connection resolver title = "Resolver Util:" options = ["auto"] + list(connections.Resolver) - connResolver = arm.util.tracker.get_connection_resolver() + connResolver = arm.util.tracker.get_connection_tracker()
currentOverwrite = connResolver.get_custom_resolver() if currentOverwrite == None: oldSelection = 0 @@ -313,7 +313,7 @@ class ConnectionPanel(panel.Panel, threading.Thread): lastDraw += CONFIG["features.connection.refreshRate"] * drawTicks
def getHelp(self): - resolverUtil = arm.util.tracker.get_connection_resolver().get_custom_resolver() + resolverUtil = arm.util.tracker.get_connection_tracker().get_custom_resolver() if resolverUtil == None: resolverUtil = "auto"
options = [] @@ -427,9 +427,9 @@ class ConnectionPanel(panel.Panel, threading.Thread): self.appResolveSinceUpdate = False
# if we don't have an initialized resolver then this is a no-op - if not arm.util.tracker.get_connection_resolver().is_alive(): return + if not arm.util.tracker.get_connection_tracker().is_alive(): return
- connResolver = arm.util.tracker.get_connection_resolver() + connResolver = arm.util.tracker.get_connection_tracker() currentResolutionCount = connResolver.run_counter()
self.valsLock.acquire() diff --git a/arm/controller.py b/arm/controller.py index ad3ef75..69e52ce 100644 --- a/arm/controller.py +++ b/arm/controller.py @@ -101,7 +101,7 @@ def initController(stdscr, startTime):
if controller.get_conf("DisableDebuggerAttachment", None) == "1": log.notice("Tor is preventing system utilities like netstat and lsof from working. This means that arm can't provide you with connection information. You can change this by adding 'DisableDebuggerAttachment 0' to your torrc and restarting tor. For more information see...\nhttps://trac.torproject.org/3313") - arm.util.tracker.get_connection_resolver().set_paused(True) + arm.util.tracker.get_connection_tracker().set_paused(True) else: # Configures connection resoultions. This is paused/unpaused according to # if Tor's connected or not. @@ -117,14 +117,14 @@ def initController(stdscr, startTime): if tor_cmd is None: tor_cmd = "tor"
- resolver = arm.util.tracker.get_connection_resolver() + resolver = arm.util.tracker.get_connection_tracker() resolver.set_process(tor_pid, tor_cmd) log.info("Operating System: %s, Connection Resolvers: %s" % (os.uname()[0], ", ".join(resolver._resolvers))) resolver.start() else: # constructs singleton resolver and, if tor isn't connected, initizes # it to be paused - arm.util.tracker.get_connection_resolver().set_paused(not controller.is_alive()) + arm.util.tracker.get_connection_tracker().set_paused(not controller.is_alive())
# third page: config if CONFIG["features.panels.show.config"]: @@ -497,7 +497,7 @@ def connResetListener(controller, eventType, _): pid if started again. """
- resolver = arm.util.tracker.get_connection_resolver() + resolver = arm.util.tracker.get_connection_tracker()
if resolver.is_alive(): resolver.set_paused(eventType == State.CLOSED) diff --git a/arm/graphing/connStats.py b/arm/graphing/connStats.py index d3fe6a5..3f23b33 100644 --- a/arm/graphing/connStats.py +++ b/arm/graphing/connStats.py @@ -41,7 +41,7 @@ class ConnStats(graphPanel.GraphStats):
inboundCount, outboundCount = 0, 0
- for entry in arm.util.tracker.get_connection_resolver().get_connections(): + for entry in arm.util.tracker.get_connection_tracker().get_connections(): localPort = entry.local_port if localPort in (self.orPort, self.dirPort): inboundCount += 1 elif localPort == self.controlPort: pass # control connection diff --git a/arm/graphing/resourceStats.py b/arm/graphing/resourceStats.py index 843ef41..03a8654 100644 --- a/arm/graphing/resourceStats.py +++ b/arm/graphing/resourceStats.py @@ -2,8 +2,10 @@ Tracks the system resource usage (cpu and memory) of the tor process. """
+import arm.util.tracker + from arm.graphing import graphPanel -from arm.util import sysTools, torTools +from arm.util import torTools
from stem.util import str_tools
@@ -42,12 +44,13 @@ class ResourceStats(graphPanel.GraphStats):
primary, secondary = 0, 0 if self.queryPid: - resourceTracker = sysTools.getResourceTracker() + resourceTracker = arm.util.tracker.get_resource_tracker()
- if resourceTracker and not resourceTracker.lastQueryFailed(): - primary, _, secondary, _ = resourceTracker.getResourceUsage() + if resourceTracker and not resourceTracker.last_query_failed(): + primary, _, secondary, _ = resourceTracker.get_resource_usage() primary *= 100 # decimal percentage to whole numbers secondary /= 1048576 # translate size to MB so axis labels are short + self.runCount = resourceTracker.run_counter()
self._processEvent(primary, secondary)
diff --git a/arm/headerPanel.py b/arm/headerPanel.py index b27a858..b559ec8 100644 --- a/arm/headerPanel.py +++ b/arm/headerPanel.py @@ -19,6 +19,8 @@ import time import curses import threading
+import arm.util.tracker + import stem import stem.connection
@@ -414,7 +416,7 @@ class HeaderPanel(panel.Panel, threading.Thread):
isChanged = False if self.vals["tor/pid"]: - resourceTracker = sysTools.getResourceTracker() + resourceTracker = arm.util.tracker.get_resource_tracker() resourceTracker.set_process(self.vals["tor/pid"]) isChanged = self._lastResourceFetch != resourceTracker.run_counter()
@@ -561,15 +563,15 @@ class HeaderPanel(panel.Panel, threading.Thread):
# ps or proc derived resource usage stats if self.vals["tor/pid"]: - resourceTracker = sysTools.getResourceTracker() + resourceTracker = arm.util.tracker.get_resource_tracker() resourceTracker.set_process(self.vals["tor/pid"])
- if resourceTracker.lastQueryFailed(): + if resourceTracker.last_query_failed(): self.vals["stat/%torCpu"] = "0" self.vals["stat/rss"] = "0" self.vals["stat/%mem"] = "0" else: - cpuUsage, _, memUsage, memUsagePercent = resourceTracker.getResourceUsage() + cpuUsage, _, memUsage, memUsagePercent = resourceTracker.get_resource_usage() self._lastResourceFetch = resourceTracker.run_counter() self.vals["stat/%torCpu"] = "%0.1f" % (100 * cpuUsage) self.vals["stat/rss"] = str(memUsage) diff --git a/arm/menu/actions.py b/arm/menu/actions.py index a98ea19..6602b6b 100644 --- a/arm/menu/actions.py +++ b/arm/menu/actions.py @@ -239,7 +239,7 @@ def makeConnectionsMenu(connPanel): connectionsMenu.add(arm.menu.item.MenuItem("Sorting...", connPanel.showSortDialog))
# resolver submenu - connResolver = arm.util.tracker.get_connection_resolver() + connResolver = arm.util.tracker.get_connection_tracker() resolverMenu = arm.menu.item.Submenu("Resolver") resolverGroup = arm.menu.item.SelectionGroup(connResolver.set_custom_resolver, connResolver.get_custom_resolver())
diff --git a/arm/starter.py b/arm/starter.py index e728bfb..035fa70 100644 --- a/arm/starter.py +++ b/arm/starter.py @@ -286,8 +286,8 @@ def _shutdown_daemons(): # joins on utility daemon threads - this might take a moment since the # internal threadpools being joined might be sleeping
- resource_tracker = arm.util.sysTools.getResourceTracker() if arm.util.sysTools.getResourceTracker().is_alive() else None - connection_resolver = arm.util.tracker.get_connection_resolver() if arm.util.tracker.get_connection_resolver().is_alive() else None + resource_tracker = arm.util.tracker.get_resource_tracker() if arm.util.tracker.get_resource_tracker().is_alive() else None + connection_resolver = arm.util.tracker.get_connection_tracker() if arm.util.tracker.get_connection_tracker().is_alive() else None
if resource_tracker: resource_tracker.stop() diff --git a/arm/util/sysTools.py b/arm/util/sysTools.py index e2ec45e..668a56a 100644 --- a/arm/util/sysTools.py +++ b/arm/util/sysTools.py @@ -2,15 +2,7 @@ Helper functions for working with the underlying system. """
-import collections import time -import threading - -import arm.util.tracker - -from stem.util import conf, log, proc, str_tools, system - -RESOURCE_TRACKER = None
# Runtimes for system calls, used to estimate cpu usage. Entries are tuples of # the form: @@ -18,27 +10,6 @@ RESOURCE_TRACKER = None RUNTIMES = [] SAMPLING_PERIOD = 5 # time of the sampling period
-# Process resources we poll... -# -# cpu_sample - average cpu usage since we last checked -# cpu_average - average cpu usage since we first started tracking the process -# memory_bytes - memory usage of the process in bytes -# memory_precent - percentage of our memory used by this process - -Resources = collections.namedtuple('Resources', [ - 'cpu_sample', - 'cpu_average', - 'memory_bytes', - 'memory_percent', -]) - -CONFIG = conf.config_dict("arm", { - "queries.resourceUsage.rate": 5, -}) - -# TODO: This was a bit of a hack, and one that won't work now that we lack our -# call() method to populate RUNTIMES. - def getSysCpuUsage(): """ Provides an estimate of the cpu usage for system calls made through this @@ -56,167 +27,3 @@ def getSysCpuUsage(): runtimeSum = sum([entry[1] for entry in RUNTIMES]) return runtimeSum / SAMPLING_PERIOD
-def getResourceTracker(): - """ - Singleton for tracking the resource usage of our tor process. - """ - - global RESOURCE_TRACKER - - if RESOURCE_TRACKER is None: - RESOURCE_TRACKER = ResourceTracker() - - return RESOURCE_TRACKER - -class ResourceTracker(arm.util.tracker.Daemon): - """ - Periodically fetches the resource usage (cpu and memory usage) for a given - process. - """ - - def __init__(self): - """ - Initializes a new resolver daemon. When no longer needed it's suggested - that this is stopped. - """ - - arm.util.tracker.Daemon.__init__(self, CONFIG["queries.resourceUsage.rate"]) - - self._procss_pid = None - self._last_sample = None - - # resolves usage via proc results if true, ps otherwise - self._useProc = proc.is_available() - - # used to get the deltas when querying cpu time - self._lastCpuTotal = 0 - - self.lastLookup = -1 - self._valLock = threading.RLock() - - # sequential times we've failed with this method of resolution - self._failureCount = 0 - - def set_process(self, pid): - """ - Sets the process we retrieve resources for. - - :param int pid: process id - """ - - self._process_pid = pid - - def getResourceUsage(self): - """ - Provides the last cached resource usage as a named tuple of the form: - (cpuUsage_sampling, cpuUsage_avg, memUsage_bytes, memUsage_percent) - """ - - self._valLock.acquire() - - if self._last_sample is None: - result = Resources(0.0, 0.0, 0, 0.0) - else: - result = self._last_sample - self._valLock.release() - - return result - - def lastQueryFailed(self): - """ - Provides true if, since we fetched the currently cached results, we've - failed to get new results. False otherwise. - """ - - return self._failureCount != 0 - - def task(self): - if self._procss_pid is None: - return - - timeSinceReset = time.time() - self.lastLookup - - newValues = {} - try: - if self._useProc: - utime, stime, startTime = proc.get_stats(self._procss_pid, proc.Stat.CPU_UTIME, proc.Stat.CPU_STIME, proc.Stat.START_TIME) - totalCpuTime = float(utime) + float(stime) - cpuDelta = totalCpuTime - self._lastCpuTotal - newValues["cpuSampling"] = cpuDelta / timeSinceReset - newValues["cpuAvg"] = totalCpuTime / (time.time() - float(startTime)) - newValues["_lastCpuTotal"] = totalCpuTime - - memUsage = int(proc.get_memory_usage(self._procss_pid)[0]) - totalMemory = proc.get_physical_memory() - newValues["memUsage"] = memUsage - newValues["memUsagePercentage"] = float(memUsage) / totalMemory - else: - # the ps call formats results as: - # - # TIME ELAPSED RSS %MEM - # 3-08:06:32 21-00:00:12 121844 23.5 - # - # or if Tor has only recently been started: - # - # TIME ELAPSED RSS %MEM - # 0:04.40 37:57 18772 0.9 - - psCall = system.call("ps -p %s -o cputime,etime,rss,%%mem" % self._procss_pid) - - isSuccessful = False - if psCall and len(psCall) >= 2: - stats = psCall[1].strip().split() - - if len(stats) == 4: - try: - totalCpuTime = str_tools.parse_short_time_label(stats[0]) - uptime = str_tools.parse_short_time_label(stats[1]) - cpuDelta = totalCpuTime - self._lastCpuTotal - newValues["cpuSampling"] = cpuDelta / timeSinceReset - newValues["cpuAvg"] = totalCpuTime / uptime - newValues["_lastCpuTotal"] = totalCpuTime - - newValues["memUsage"] = int(stats[2]) * 1024 # ps size is in kb - newValues["memUsagePercentage"] = float(stats[3]) / 100.0 - isSuccessful = True - except ValueError, exc: pass - - if not isSuccessful: - raise IOError("unrecognized output from ps: %s" % psCall) - except IOError, exc: - newValues = {} - self._failureCount += 1 - - if self._useProc: - if self._failureCount >= 3: - # We've failed three times resolving via proc. Warn, and fall back - # to ps resolutions. - log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc) - - self._useProc = False - self._failureCount = 1 # prevents lastQueryFailed() from thinking that we succeeded - else: - # wait a bit and try again - log.debug("Unable to query process resource usage from proc (%s)" % exc) - else: - # exponential backoff on making failed ps calls - sleepTime = 0.01 * (2 ** self._failureCount) + self._failureCount - log.debug("Unable to query process resource usage from ps, waiting %0.2f seconds (%s)" % (sleepTime, exc)) - - # sets the new values - if newValues: - # If this is the first run then the cpuSampling stat is meaningless - # (there isn't a previous tick to sample from so it's zero at this - # point). Setting it to the average, which is a fairer estimate. - if self.lastLookup == -1: - newValues["cpuSampling"] = newValues["cpuAvg"] - - self._valLock.acquire() - self._last_sample = Resources(newValues["cpuSampling"], newValues["cpuAvg"], newValues["memUsage"], newValues["memUsagePercentage"]) - self._lastCpuTotal = newValues["_lastCpuTotal"] - self.lastLookup = time.time() - self._failureCount = 0 - self._valLock.release() - return True - else: - return False diff --git a/arm/util/tracker.py b/arm/util/tracker.py index b75e5ff..c027975 100644 --- a/arm/util/tracker.py +++ b/arm/util/tracker.py @@ -3,7 +3,8 @@ Background tasks for gathering informatino about the tor process.
::
- get_connection_resolver - provides a ConnectionResolver for our tor process + get_connection_tracker - provides a ConnectionResolver for our tor process + get_resource_tracker - provides a ResourceTracker for our tor process
Daemon - common parent for resolvers |- run_counter - number of successful runs @@ -12,38 +13,73 @@ Background tasks for gathering informatino about the tor process. |- set_paused - pauses or continues work +- stop - stops further work by the daemon
- ConnectionResolver - periodically queries tor's connection information + ConnectionResolver - periodically checks the connections established by tor |- set_process - set the pid and process name used for lookups |- get_custom_resolver - provide the custom conntion resolver we're using |- set_custom_resolver - overwrites automatic resolver selecion with a custom resolver - |- get_connections - provides our latest connection results - +- get_resolution_count - number of times we've fetched connection information + +- get_connections - provides our latest connection results + + ResourceTracker - periodically checks the resource usage of tor + |- set_process - set the pid used for lookups + |- get_resource_usage - provides our latest resource usage results + +- last_query_failed - checks if we failed to fetch newer results """
+import collections import time import threading
-from stem.util import conf, connection, log - -CONNECTION_RESOLVER = None +from stem.util import conf, connection, log, proc, str_tools, system
-CONFIG = conf.config_dict('arm', { +CONFIG = conf.config_dict("arm", { + "queries.resourceUsage.rate": 5, 'queries.connections.minRate': 5, 'msg.unable_to_use_resolver': '', 'msg.unable_to_use_all_resolvers': '', })
-def get_connection_resolver(): +CONNECTION_TRACKER = None +RESOURCE_TRACKER = None + +# Process resources we poll... +# +# cpu_sample - average cpu usage since we last checked +# cpu_average - average cpu usage since we first started tracking the process +# memory_bytes - memory usage of the process in bytes +# memory_precent - percentage of our memory used by this process + +Resources = collections.namedtuple('Resources', [ + 'cpu_sample', + 'cpu_average', + 'memory_bytes', + 'memory_percent', +]) + +def get_connection_tracker(): """ - Singleton constructor for a connection resolver for tor's process. + Singleton for tracking the connections established by tor. """
- global CONNECTION_RESOLVER + global CONNECTION_TRACKER + + if CONNECTION_TRACKER is None: + CONNECTION_TRACKER = ConnectionResolver() + + return CONNECTION_TRACKER + + +def get_resource_tracker(): + """ + Singleton for tracking the resource usage of our tor process. + """ + + global RESOURCE_TRACKER + + if RESOURCE_TRACKER is None: + RESOURCE_TRACKER = ResourceTracker()
- if CONNECTION_RESOLVER is None: - CONNECTION_RESOLVER = ConnectionResolver() + return RESOURCE_TRACKER
- return CONNECTION_RESOLVER
class Daemon(threading.Thread): """ @@ -139,7 +175,7 @@ class Daemon(threading.Thread):
class ConnectionResolver(Daemon): """ - Daemon that periodically retrieves the connections made by a process. + Periodically retrieves the connections established by tor. """
def __init__(self): @@ -265,3 +301,142 @@ class ConnectionResolver(Daemon): return [] else: return list(self._connections) + +class ResourceTracker(Daemon): + """ + Periodically retrieves the resource usage of tor. + """ + + def __init__(self): + Daemon.__init__(self, CONFIG["queries.resourceUsage.rate"]) + + self._procss_pid = None + self._last_sample = None + + # resolves usage via proc results if true, ps otherwise + self._use_proc = proc.is_available() + + # used to get the deltas when querying cpu time + self._last_cpu_total = 0 + + self.last_lookup = -1 + self._val_lock = threading.RLock() + + # sequential times we've failed with this method of resolution + self._failure_count = 0 + + def set_process(self, pid): + """ + Sets the process we retrieve resources for. + + :param int pid: process id + """ + + self._process_pid = pid + + def get_resource_usage(self): + """ + Provides the last cached resource usage as a named tuple of the form: + (cpuUsage_sampling, cpuUsage_avg, memUsage_bytes, memUsage_percent) + """ + + with self._val_lock: + return self._last_sample if self._last_sample else Resources(0.0, 0.0, 0, 0.0) + + def last_query_failed(self): + """ + Provides true if, since we fetched the currently cached results, we've + failed to get new results. False otherwise. + """ + + return self._failure_count != 0 + + def task(self): + if self._procss_pid is None: + return + + time_since_reset = time.time() - self.last_lookup + new_values = {} + + try: + if self._use_proc: + utime, stime, start_time = proc.get_stats(self._procss_pid, proc.Stat.CPU_UTIME, proc.Stat.CPU_STIME, proc.Stat.START_TIME) + total_cpu_time = float(utime) + float(stime) + cpu_delta = total_cpu_time - self._last_cpu_total + new_values["cpuSampling"] = cpu_delta / time_since_reset + new_values["cpuAvg"] = total_cpu_time / (time.time() - float(start_time)) + new_values["_lastCpuTotal"] = total_cpu_time + + mem_usage = int(proc.get_memory_usage(self._procss_pid)[0]) + total_memory = proc.get_physical_memory() + new_values["memUsage"] = mem_usage + new_values["memUsagePercentage"] = float(mem_usage) / total_memory + else: + # the ps call formats results as: + # + # TIME ELAPSED RSS %MEM + # 3-08:06:32 21-00:00:12 121844 23.5 + # + # or if Tor has only recently been started: + # + # TIME ELAPSED RSS %MEM + # 0:04.40 37:57 18772 0.9 + + ps_call = system.call("ps -p %s -o cputime,etime,rss,%%mem" % self._procss_pid) + + is_successful = False + if ps_call and len(ps_call) >= 2: + stats = ps_call[1].strip().split() + + if len(stats) == 4: + try: + total_cpu_time = str_tools.parse_short_time_label(stats[0]) + uptime = str_tools.parse_short_time_label(stats[1]) + cpu_delta = total_cpu_time - self._last_cpu_total + new_values["cpuSampling"] = cpu_delta / time_since_reset + new_values["cpuAvg"] = total_cpu_time / uptime + new_values["_lastCpuTotal"] = total_cpu_time + + new_values["memUsage"] = int(stats[2]) * 1024 # ps size is in kb + new_values["memUsagePercentage"] = float(stats[3]) / 100.0 + is_successful = True + except ValueError, exc: pass + + if not is_successful: + raise IOError("unrecognized output from ps: %s" % ps_call) + except IOError, exc: + new_values = {} + self._failure_count += 1 + + if self._use_proc: + if self._failure_count >= 3: + # We've failed three times resolving via proc. Warn, and fall back + # to ps resolutions. + log.info("Failed three attempts to get process resource usage from proc, falling back to ps (%s)" % exc) + + self._use_proc = False + self._failure_count = 1 # prevents last_query_failed() from thinking that we succeeded + else: + # wait a bit and try again + log.debug("Unable to query process resource usage from proc (%s)" % exc) + else: + # exponential backoff on making failed ps calls + sleep_time = 0.01 * (2 ** self._failure_count) + self._failure_count + log.debug("Unable to query process resource usage from ps, waiting %0.2f seconds (%s)" % (sleep_time, exc)) + + # sets the new values + if new_values: + # If this is the first run then the cpuSampling stat is meaningless + # (there isn't a previous tick to sample from so it's zero at this + # point). Setting it to the average, which is a fairer estimate. + if self.last_lookup == -1: + new_values["cpuSampling"] = new_values["cpuAvg"] + + with self.val_lock: + self._last_sample = Resources(new_values["cpuSampling"], new_values["cpuAvg"], new_values["memUsage"], new_values["memUsagePercentage"]) + self._last_cpu_total = new_values["_lastCpuTotal"] + self.last_lookup = time.time() + self._failure_count = 0 + return True + else: + return False