commit 375cb5824652c0a34288dcdb4bb6a456c33b68ac Author: Damian Johnson atagar@torproject.org Date: Fri Oct 18 13:52:06 2013 -0700
Moving connection resolution to tracker.py
Making a new utility module that'll have both connection and resource usage resolver daemons. --- arm/connections/connPanel.py | 9 +- arm/controller.py | 12 +- arm/graphing/connStats.py | 6 +- arm/menu/actions.py | 5 +- arm/starter.py | 4 +- arm/util/__init__.py | 2 +- arm/util/connections.py | 230 +------------------------------------- arm/util/tracker.py | 254 ++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 277 insertions(+), 245 deletions(-)
diff --git a/arm/connections/connPanel.py b/arm/connections/connPanel.py index e52e954..1086fae 100644 --- a/arm/connections/connPanel.py +++ b/arm/connections/connPanel.py @@ -8,6 +8,7 @@ import curses import threading
import arm.popups +import arm.util.tracker
from arm.connections import countPopup, descriptorPopup, entries, connEntry, circEntry from arm.util import connections, panel, torTools, uiTools @@ -241,7 +242,7 @@ class ConnectionPanel(panel.Panel, threading.Thread): # provides a menu to pick the connection resolver title = "Resolver Util:" options = ["auto"] + list(connections.Resolver) - connResolver = connections.get_resolver() + connResolver = arm.util.tracker.get_connection_resolver()
currentOverwrite = connResolver.get_custom_resolver() if currentOverwrite == None: oldSelection = 0 @@ -312,7 +313,7 @@ class ConnectionPanel(panel.Panel, threading.Thread): lastDraw += CONFIG["features.connection.refreshRate"] * drawTicks
def getHelp(self): - resolverUtil = connections.get_resolver().get_custom_resolver() + resolverUtil = arm.util.tracker.get_connection_resolver().get_custom_resolver() if resolverUtil == None: resolverUtil = "auto"
options = [] @@ -426,9 +427,9 @@ class ConnectionPanel(panel.Panel, threading.Thread): self.appResolveSinceUpdate = False
# if we don't have an initialized resolver then this is a no-op - if not connections.get_resolver().is_alive(): return + if not arm.util.tracker.get_connection_resolver().is_alive(): return
- connResolver = connections.get_resolver() + connResolver = arm.util.tracker.get_connection_resolver() currentResolutionCount = connResolver.get_resolution_count()
self.valsLock.acquire() diff --git a/arm/controller.py b/arm/controller.py index 190aee5..ad3ef75 100644 --- a/arm/controller.py +++ b/arm/controller.py @@ -20,6 +20,7 @@ import arm.graphing.bandwidthStats import arm.graphing.connStats import arm.graphing.resourceStats import arm.connections.connPanel +import arm.util.tracker
from stem.control import State, Controller
@@ -100,7 +101,7 @@ def initController(stdscr, startTime):
if controller.get_conf("DisableDebuggerAttachment", None) == "1": log.notice("Tor is preventing system utilities like netstat and lsof from working. This means that arm can't provide you with connection information. You can change this by adding 'DisableDebuggerAttachment 0' to your torrc and restarting tor. For more information see...\nhttps://trac.torproject.org/3313") - connections.get_resolver().set_paused(True) + arm.util.tracker.get_connection_resolver().set_paused(True) else: # Configures connection resoultions. This is paused/unpaused according to # if Tor's connected or not. @@ -116,14 +117,14 @@ def initController(stdscr, startTime): if tor_cmd is None: tor_cmd = "tor"
- resolver = connections.get_resolver() + resolver = arm.util.tracker.get_connection_resolver() resolver.set_process(tor_pid, tor_cmd) log.info("Operating System: %s, Connection Resolvers: %s" % (os.uname()[0], ", ".join(resolver._resolvers))) resolver.start() else: # constructs singleton resolver and, if tor isn't connected, initizes # it to be paused - connections.get_resolver().set_paused(not controller.is_alive()) + arm.util.tracker.get_connection_resolver().set_paused(not controller.is_alive())
# third page: config if CONFIG["features.panels.show.config"]: @@ -496,8 +497,9 @@ def connResetListener(controller, eventType, _): pid if started again. """
- if connections.get_resolver().is_alive(): - resolver = connections.get_resolver() + resolver = arm.util.tracker.get_connection_resolver() + + if resolver.is_alive(): resolver.set_paused(eventType == State.CLOSED)
if eventType in (State.INIT, State.RESET): diff --git a/arm/graphing/connStats.py b/arm/graphing/connStats.py index b21caa6..d3fe6a5 100644 --- a/arm/graphing/connStats.py +++ b/arm/graphing/connStats.py @@ -2,8 +2,10 @@ Tracks stats concerning tor's current connections. """
+import arm.util.tracker + from arm.graphing import graphPanel -from arm.util import connections, torTools +from arm.util import torTools
from stem.control import State
@@ -39,7 +41,7 @@ class ConnStats(graphPanel.GraphStats):
inboundCount, outboundCount = 0, 0
- for entry in connections.get_resolver().get_connections(): + for entry in arm.util.tracker.get_connection_resolver().get_connections(): localPort = entry.local_port if localPort in (self.orPort, self.dirPort): inboundCount += 1 elif localPort == self.controlPort: pass # control connection diff --git a/arm/menu/actions.py b/arm/menu/actions.py index 0829e85..a98ea19 100644 --- a/arm/menu/actions.py +++ b/arm/menu/actions.py @@ -8,8 +8,9 @@ import arm.popups import arm.controller import arm.menu.item import arm.graphing.graphPanel +import arm.util.tracker
-from arm.util import connections, torTools, uiTools +from arm.util import torTools, uiTools
import stem.util.connection
@@ -238,7 +239,7 @@ def makeConnectionsMenu(connPanel): connectionsMenu.add(arm.menu.item.MenuItem("Sorting...", connPanel.showSortDialog))
# resolver submenu - connResolver = connections.get_resolver() + connResolver = arm.util.tracker.get_connection_resolver() resolverMenu = arm.menu.item.Submenu("Resolver") resolverGroup = arm.menu.item.SelectionGroup(connResolver.set_custom_resolver, connResolver.get_custom_resolver())
diff --git a/arm/starter.py b/arm/starter.py index c2e0ae1..16dc348 100644 --- a/arm/starter.py +++ b/arm/starter.py @@ -18,11 +18,11 @@ import time import arm import arm.controller import arm.logPanel -import arm.util.connections import arm.util.panel import arm.util.sysTools import arm.util.torConfig import arm.util.torTools +import arm.util.tracker import arm.util.uiTools
import stem @@ -287,7 +287,7 @@ def _shutdown_daemons(): # internal threadpools being joined might be sleeping
resource_trackers = arm.util.sysTools.RESOURCE_TRACKERS.values() - connection_resolver = arm.util.connections.get_resolver() if arm.util.connections.get_resolver().is_alive() else None + connection_resolver = arm.util.tracker.get_connection_resolver() if arm.util.tracker.get_connection_resolver().is_alive() else None
for tracker in resource_trackers: tracker.stop() diff --git a/arm/util/__init__.py b/arm/util/__init__.py index 9fa96e6..06ae312 100644 --- a/arm/util/__init__.py +++ b/arm/util/__init__.py @@ -4,5 +4,5 @@ application's status, making cross platform system calls, parsing tor data, and safely working with curses (hiding some of the gory details). """
-__all__ = ["connections", "panel", "sysTools", "textInput", "torConfig", "torTools", "uiTools"] +__all__ = ["connections", "panel", "sysTools", "textInput", "torConfig", "torTools", "tracker", "uiTools"]
diff --git a/arm/util/connections.py b/arm/util/connections.py index a2d1ed6..f4b107c 100644 --- a/arm/util/connections.py +++ b/arm/util/connections.py @@ -18,12 +18,9 @@ options that perform even better (thanks to Fabian Keil and Hans Schnehl): """
import os -import time import threading
-from stem.util import conf, connection, log, system - -RESOLVER = None +from stem.util import conf, log, system
def conf_handler(key, value): if key.startswith("port.label."): @@ -51,9 +48,6 @@ def conf_handler(key, value): log.notice(msg)
CONFIG = conf.config_dict('arm', { - 'queries.connections.minRate': 5, - 'msg.unable_to_use_resolver': '', - 'msg.unable_to_use_all_resolvers': '', }, conf_handler)
PORT_USAGE = {} @@ -69,228 +63,6 @@ def getPortUsage(port):
return PORT_USAGE.get(port)
-def get_resolver(): - """ - Singleton constructor for a connection resolver for tor's process. - """ - - global RESOLVER - - if RESOLVER is None: - RESOLVER = ConnectionResolver() - - return RESOLVER - -class Daemon(threading.Thread): - """ - Daemon that can perform a unit of work at a given rate. - """ - - def __init__(self, rate): - threading.Thread.__init__(self) - self.daemon = True - - self._rate = rate - self._last_ran = -1 # time when we last ran - - self._is_paused = False - self._halt = False # terminates thread if true - self._cond = threading.Condition() # used for pausing the thread - - def run(self): - while not self._halt: - time_since_last_ran = time.time() - self._last_ran - - if self._is_paused or time_since_last_ran < self._rate: - sleep_duration = max(0.2, self._rate - time_since_last_ran) - - self._cond.acquire() - if not self._halt: - self._cond.wait(sleep_duration) - self._cond.release() - - continue # done waiting, try again - - self.task() - self._last_ran = time.time() - - def task(self): - """ - Task the resolver is meant to perform. This should be implemented by - subclasses. - """ - - pass - - def get_rate(self): - """ - Provides the rate at which we perform our given task. - - :returns: **float** for the rate in seconds at which we perform our task - """ - - return self._rate - - def set_rate(self, rate): - """ - Sets the rate at which we perform our task in seconds. - - :param float rate: rate at which to perform work in seconds - """ - - self._rate = rate - - def set_paused(self, pause): - """ - Either resumes or holds off on doing further work. - - :param bool pause: halts work if **True**, resumes otherwise - """ - - self._is_paused = pause - - def stop(self): - """ - Halts further work and terminates the thread. - """ - - self._cond.acquire() - self._halt = True - self._cond.notifyAll() - self._cond.release() - - -class ConnectionResolver(Daemon): - """ - Daemon that periodically retrieves the connections made by a process. - """ - - def __init__(self): - Daemon.__init__(self, CONFIG["queries.connections.minRate"]) - - self._resolvers = connection.get_system_resolvers() - self._connections = [] - self._custom_resolver = None - self._resolution_counter = 0 # number of successful connection resolutions - - self._process_pid = None - self._process_name = None - - # Number of times in a row we've either failed with our current resolver or - # concluded that our rate is too low. - - self._failure_count = 0 - self._rate_too_low_count = 0 - - def task(self): - if self._custom_resolver: - resolver = self._custom_resolver - is_default_resolver = False - elif self._resolvers: - resolver = self._resolvers[0] - is_default_resolver = True - else: - return # nothing to resolve with - - try: - start_time = time.time() - self._connections = connection.get_connections(resolver, process_pid = self._process_pid, process_name = self._process_name) - runtime = time.time() - start_time - - self._resolution_counter += 1 - - if is_default_resolver: - self._failure_count = 0 - - # Reduce our rate if connection resolution is taking a long time. This is - # most often an issue for extremely busy relays. - - min_rate = 100 * runtime - - if self.get_rate() < min_rate: - self._rate_too_low_count += 1 - - if self._rate_too_low_count >= 3: - min_rate += 1 # little extra padding so we don't frequently update this - self.set_rate(min_rate) - self._rate_too_low_count = 0 - log.trace("connection lookup time increasing to %0.1f seconds per call" % min_rate) - else: - self._rate_too_low_count = 0 - except IOError as exc: - log.info(exc) - - # Fail over to another resolver if we've repeatedly been unable to use - # this one. - - if is_default_resolver: - self._failure_count += 1 - - if self._failure_count >= 3: - self._resolvers.pop() - self._failure_count = 0 - - if self._resolvers: - log.notice(CONFIG['msg.unable_to_use_resolver'].format( - old_resolver = resolver, - new_resolver = self._resolvers[0], - )) - else: - log.notice(CONFIG['msg.unable_to_use_all_resolvers']) - - def set_process(self, pid, name): - """ - Sets the process we retrieve connections for. - - :param int pid: process id - :param str name: name of the process - """ - - self._process_pid = pid - self._process_name = name - - def get_custom_resolver(self): - """ - Provides the custom resolver the user has selected. This is **None** if - we're picking resolvers dynamically. - - :returns: :data:`stem.util.connection.Resolver` we're overwritten to use - """ - - return self._custom_resolver - - def set_custom_resolver(self, resolver): - """ - Sets the resolver used for connection resolution. If **None** then this is - automatically determined based on what is available. - - :param stem.util.connection.Resolver resolver: resolver to use - """ - - self._custom_resolver = resolver - - def get_connections(self): - """ - Provides the last queried connection results, an empty list if resolver - has been stopped. - - :returns: **list** of :class:`~stem.util.connection.Connection` we last retrieved - """ - - if self._halt: - return [] - else: - return list(self._connections) - - def get_resolution_count(self): - """ - Provides the number of successful resolutions so far. This can be used to - determine if the connection results are new for the caller or not. - """ - - return self._resolution_counter - - class AppResolver: """ Provides the names and pids of appliations attached to the given ports. This diff --git a/arm/util/tracker.py b/arm/util/tracker.py new file mode 100644 index 0000000..d9bfef0 --- /dev/null +++ b/arm/util/tracker.py @@ -0,0 +1,254 @@ +""" +Background tasks for gathering informatino about the tor process. + +:: + + get_connection_resolver - provides a ConnectionResolver for our tor process + + ConnectionResolver - periodically queries tor's connection information + |- set_process - set the pid and process name used for lookups + |- get_custom_resolver - provide the custom conntion resolver we're using + |- set_custom_resolver - overwrites automatic resolver selecion with a custom resolver + |- get_connections - provides our latest connection results + +- get_resolution_count - number of times we've fetched connection information +""" + +import time +import threading + +from stem.util import conf, connection, log + +CONNECTION_RESOLVER = None + +CONFIG = conf.config_dict('arm', { + 'queries.connections.minRate': 5, + 'msg.unable_to_use_resolver': '', + 'msg.unable_to_use_all_resolvers': '', +}) + +def get_connection_resolver(): + """ + Singleton constructor for a connection resolver for tor's process. + """ + + global CONNECTION_RESOLVER + + if CONNECTION_RESOLVER is None: + CONNECTION_RESOLVER = ConnectionResolver() + + return CONNECTION_RESOLVER + +class Daemon(threading.Thread): + """ + Daemon that can perform a unit of work at a given rate. + """ + + def __init__(self, rate): + threading.Thread.__init__(self) + self.daemon = True + + self._rate = rate + self._last_ran = -1 # time when we last ran + + self._is_paused = False + self._halt = False # terminates thread if true + self._cond = threading.Condition() # used for pausing the thread + + def run(self): + while not self._halt: + time_since_last_ran = time.time() - self._last_ran + + if self._is_paused or time_since_last_ran < self._rate: + sleep_duration = max(0.2, self._rate - time_since_last_ran) + + self._cond.acquire() + if not self._halt: + self._cond.wait(sleep_duration) + self._cond.release() + + continue # done waiting, try again + + self.task() + self._last_ran = time.time() + + def task(self): + """ + Task the resolver is meant to perform. This should be implemented by + subclasses. + """ + + pass + + def get_rate(self): + """ + Provides the rate at which we perform our given task. + + :returns: **float** for the rate in seconds at which we perform our task + """ + + return self._rate + + def set_rate(self, rate): + """ + Sets the rate at which we perform our task in seconds. + + :param float rate: rate at which to perform work in seconds + """ + + self._rate = rate + + def set_paused(self, pause): + """ + Either resumes or holds off on doing further work. + + :param bool pause: halts work if **True**, resumes otherwise + """ + + self._is_paused = pause + + def stop(self): + """ + Halts further work and terminates the thread. + """ + + self._cond.acquire() + self._halt = True + self._cond.notifyAll() + self._cond.release() + + +class ConnectionResolver(Daemon): + """ + Daemon that periodically retrieves the connections made by a process. + """ + + def __init__(self): + Daemon.__init__(self, CONFIG["queries.connections.minRate"]) + + self._resolvers = connection.get_system_resolvers() + self._connections = [] + self._custom_resolver = None + self._resolution_counter = 0 # number of successful connection resolutions + + self._process_pid = None + self._process_name = None + + # Number of times in a row we've either failed with our current resolver or + # concluded that our rate is too low. + + self._failure_count = 0 + self._rate_too_low_count = 0 + + def task(self): + if self._custom_resolver: + resolver = self._custom_resolver + is_default_resolver = False + elif self._resolvers: + resolver = self._resolvers[0] + is_default_resolver = True + else: + return # nothing to resolve with + + try: + start_time = time.time() + + self._connections = connection.get_connections( + resolver, + process_pid = self._process_pid, + process_name = self._process_name, + ) + + runtime = time.time() - start_time + + self._resolution_counter += 1 + + if is_default_resolver: + self._failure_count = 0 + + # Reduce our rate if connection resolution is taking a long time. This is + # most often an issue for extremely busy relays. + + min_rate = 100 * runtime + + if self.get_rate() < min_rate: + self._rate_too_low_count += 1 + + if self._rate_too_low_count >= 3: + min_rate += 1 # little extra padding so we don't frequently update this + self.set_rate(min_rate) + self._rate_too_low_count = 0 + log.debug("connection lookup time increasing to %0.1f seconds per call" % min_rate) + else: + self._rate_too_low_count = 0 + except IOError as exc: + log.info(exc) + + # Fail over to another resolver if we've repeatedly been unable to use + # this one. + + if is_default_resolver: + self._failure_count += 1 + + if self._failure_count >= 3: + self._resolvers.pop() + self._failure_count = 0 + + if self._resolvers: + log.notice(CONFIG['msg.unable_to_use_resolver'].format( + old_resolver = resolver, + new_resolver = self._resolvers[0], + )) + else: + log.notice(CONFIG['msg.unable_to_use_all_resolvers']) + + def set_process(self, pid, name): + """ + Sets the process we retrieve connections for. + + :param int pid: process id + :param str name: name of the process + """ + + self._process_pid = pid + self._process_name = name + + def get_custom_resolver(self): + """ + Provides the custom resolver the user has selected. This is **None** if + we're picking resolvers dynamically. + + :returns: :data:`stem.util.connection.Resolver` we're overwritten to use + """ + + return self._custom_resolver + + def set_custom_resolver(self, resolver): + """ + Sets the resolver used for connection resolution. If **None** then this is + automatically determined based on what is available. + + :param stem.util.connection.Resolver resolver: resolver to use + """ + + self._custom_resolver = resolver + + def get_connections(self): + """ + Provides the last queried connection results, an empty list if resolver + has been stopped. + + :returns: **list** of :class:`~stem.util.connection.Connection` we last retrieved + """ + + if self._halt: + return [] + else: + return list(self._connections) + + def get_resolution_count(self): + """ + Provides the number of successful resolutions so far. This can be used to + determine if the connection results are new for the caller or not. + """ + + return self._resolution_counter