[tor-commits] [arm/master] Revising ConnectionResolver

atagar at torproject.org atagar at torproject.org
Mon Sep 23 22:02:56 UTC 2013


commit f7c22d40d407ef7a2282808acc61fc2e9e7f66e8
Author: Damian Johnson <atagar at torproject.org>
Date:   Mon Sep 23 15:05:18 2013 -0700

    Revising ConnectionResolver
    
    Rewriting the ConnectionResolver to be more intuitive and conform to PEP8
    conventions. Also dropping the extra complications we had to support multiple
    connection resolvers. We only get an instance for tor so we might as well keep
    it simple.
---
 arm/connections/connPanel.py |   16 +--
 arm/controller.py            |   22 ++-
 arm/graphing/connStats.py    |    4 +-
 arm/menu/actions.py          |    4 +-
 arm/settings.cfg             |    3 +
 arm/util/connections.py      |  305 ++++++++++++++----------------------------
 6 files changed, 132 insertions(+), 222 deletions(-)

diff --git a/arm/connections/connPanel.py b/arm/connections/connPanel.py
index 7d7864e..e52e954 100644
--- a/arm/connections/connPanel.py
+++ b/arm/connections/connPanel.py
@@ -241,9 +241,9 @@ class ConnectionPanel(panel.Panel, threading.Thread):
       # provides a menu to pick the connection resolver
       title = "Resolver Util:"
       options = ["auto"] + list(connections.Resolver)
-      connResolver = connections.getResolver("tor")
+      connResolver = connections.get_resolver()
 
-      currentOverwrite = connResolver.overwriteResolver
+      currentOverwrite = connResolver.get_custom_resolver()
       if currentOverwrite == None: oldSelection = 0
       else: oldSelection = options.index(currentOverwrite)
 
@@ -252,7 +252,7 @@ class ConnectionPanel(panel.Panel, threading.Thread):
       # applies new setting
       if selection != -1:
         selectedOption = options[selection] if selection != 0 else None
-        connResolver.overwriteResolver = selectedOption
+        connResolver.set_custom_resolver(selectedOption)
     elif key == ord('l') or key == ord('L'):
       # provides a menu to pick the primary information we list connections by
       title = "List By:"
@@ -312,7 +312,7 @@ class ConnectionPanel(panel.Panel, threading.Thread):
         lastDraw += CONFIG["features.connection.refreshRate"] * drawTicks
 
   def getHelp(self):
-    resolverUtil = connections.getResolver("tor").overwriteResolver
+    resolverUtil = connections.get_resolver().get_custom_resolver()
     if resolverUtil == None: resolverUtil = "auto"
 
     options = []
@@ -426,10 +426,10 @@ class ConnectionPanel(panel.Panel, threading.Thread):
     self.appResolveSinceUpdate = False
 
     # if we don't have an initialized resolver then this is a no-op
-    if not connections.isResolverAlive("tor"): return
+    if not connections.get_resolver().is_alive(): return
 
-    connResolver = connections.getResolver("tor")
-    currentResolutionCount = connResolver.getResolutionCount()
+    connResolver = connections.get_resolver()
+    currentResolutionCount = connResolver.get_resolution_count()
 
     self.valsLock.acquire()
 
@@ -439,7 +439,7 @@ class ConnectionPanel(panel.Panel, threading.Thread):
     # newConnections  [(local ip, local port, foreign ip, foreign port)...]
     # newCircuits     {circuitID => (status, purpose, path)...}
 
-    newConnections = connResolver.getConnections()
+    newConnections = [(conn.local_address, conn.local_port, conn.remote_address, conn.remote_port) for conn in connResolver.get_connections()]
     newCircuits = {}
 
     for circuitID, status, purpose, path in torTools.getConn().getCircuits():
diff --git a/arm/controller.py b/arm/controller.py
index c10551d..29d1fea 100644
--- a/arm/controller.py
+++ b/arm/controller.py
@@ -457,7 +457,7 @@ def shutdownDaemons():
   # internal threadpools being joined might be sleeping
   hostnames.stop()
   resourceTrackers = sysTools.RESOURCE_TRACKERS.values()
-  resolver = connections.getResolver("tor") if connections.isResolverAlive("tor") else None
+  resolver = connections.get_resolver() if connections.get_resolver().is_alive() else None
   for tracker in resourceTrackers: tracker.stop()
   if resolver: resolver.stop()  # sets halt flag (returning immediately)
   for tracker in resourceTrackers: tracker.join()
@@ -490,8 +490,8 @@ def connResetListener(controller, eventType, _):
   pid if started again.
   """
 
-  if connections.isResolverAlive("tor"):
-    resolver = connections.getResolver("tor")
+  if connections.get_resolver().is_alive():
+    resolver = connections.get_resolver()
     resolver.set_paused(eventType == State.CLOSED)
 
     if eventType in (State.INIT, State.RESET):
@@ -503,7 +503,12 @@ def connResetListener(controller, eventType, _):
         torConfig.getTorrc().load(True)
 
       try:
-        resolver.setPid(controller.get_pid())
+        tor_cmd = system.get_name_by_pid(tor_pid)
+
+        if tor_cmd is None:
+          tor_cmd = "tor"
+
+        resolver.set_process(controller.get_pid(), tor_cmd)
       except ValueError:
         pass
 
@@ -523,7 +528,7 @@ def start_arm(start_time):
 
     if controller.get_conf("DisableDebuggerAttachment", None) == "1":
       log.notice("Tor is preventing system utilities like netstat and lsof from working. This means that arm can't provide you with connection information. You can change this by adding 'DisableDebuggerAttachment 0' to your torrc and restarting tor. For more information see...\nhttps://trac.torproject.org/3313")
-      connections.getResolver("tor").set_paused(True)
+      connections.get_resolver().set_paused(True)
     else:
       # Configures connection resoultions. This is paused/unpaused according to
       # if Tor's connected or not.
@@ -539,11 +544,14 @@ def start_arm(start_time):
         if tor_cmd is None:
           tor_cmd = "tor"
 
-        connections.getResolver(tor_cmd, tor_pid, "tor")
+        resolver = connections.get_resolver()
+        resolver.set_process(tor_pid, tor_cmd)
+        log.info("Operating System: %s, Connection Resolvers: %s" % (os.uname()[0], ", ".join(resolver._resolvers)))
+        resolver.start()
       else:
         # constructs singleton resolver and, if tor isn't connected, initizes
         # it to be paused
-        connections.getResolver("tor").set_paused(not controller.is_alive())
+        connections.get_resolver().set_paused(not controller.is_alive())
 
   try:
     curses.wrapper(drawTorMonitor, start_time)
diff --git a/arm/graphing/connStats.py b/arm/graphing/connStats.py
index 2b1b188..b21caa6 100644
--- a/arm/graphing/connStats.py
+++ b/arm/graphing/connStats.py
@@ -39,8 +39,8 @@ class ConnStats(graphPanel.GraphStats):
 
     inboundCount, outboundCount = 0, 0
 
-    for entry in connections.getResolver("tor").getConnections():
-      localPort = entry[1]
+    for entry in connections.get_resolver().get_connections():
+      localPort = entry.local_port
       if localPort in (self.orPort, self.dirPort): inboundCount += 1
       elif localPort == self.controlPort: pass # control connection
       else: outboundCount += 1
diff --git a/arm/menu/actions.py b/arm/menu/actions.py
index 055cc46..0829e85 100644
--- a/arm/menu/actions.py
+++ b/arm/menu/actions.py
@@ -238,9 +238,9 @@ def makeConnectionsMenu(connPanel):
   connectionsMenu.add(arm.menu.item.MenuItem("Sorting...", connPanel.showSortDialog))
 
   # resolver submenu
-  connResolver = connections.getResolver("tor")
+  connResolver = connections.get_resolver()
   resolverMenu = arm.menu.item.Submenu("Resolver")
-  resolverGroup = arm.menu.item.SelectionGroup(connResolver.setOverwriteResolver, connResolver.getOverwriteResolver())
+  resolverGroup = arm.menu.item.SelectionGroup(connResolver.set_custom_resolver, connResolver.get_custom_resolver())
 
   resolverMenu.add(arm.menu.item.SelectionMenuItem("auto", resolverGroup, None))
 
diff --git a/arm/settings.cfg b/arm/settings.cfg
index 0d5227c..a42a081 100644
--- a/arm/settings.cfg
+++ b/arm/settings.cfg
@@ -62,6 +62,9 @@ msg.config_not_found No armrc loaded, using defaults. You can customize arm by p
 msg.unable_to_read_config Failed to load configuration (using defaults): "{error}"
 msg.unable_to_determine_pid Unable to determine Tor's pid. Some information, like its resource usage will be unavailable.
 
+msg.unable_to_use_resolver Unable to query connections with {old_resolver}, trying {new_resolver}
+msg.unable_to_use_all_resolvers We were unable to use any of your system's resolvers to get tor's connections. This is fine, but means that the connections page will be empty. This is usually permissions related so if you would like to fix this then run arm with the same user as tor (ie, "sudo -u <tor user> arm").
+
 # Important tor configuration options (shown by default)
 config.important BandwidthRate
 config.important BandwidthBurst
diff --git a/arm/util/connections.py b/arm/util/connections.py
index dd34efb..a2d1ed6 100644
--- a/arm/util/connections.py
+++ b/arm/util/connections.py
@@ -23,15 +23,7 @@ import threading
 
 from stem.util import conf, connection, log, system
 
-# If true this provides new instantiations for resolvers if the old one has
-# been stopped. This can make it difficult ensure all threads are terminated
-# when accessed concurrently.
-RECREATE_HALTED_RESOLVERS = False
-
-RESOLVERS = []                      # connection resolvers available via the singleton constructor
-RESOLVER_FAILURE_TOLERANCE = 3      # number of subsequent failures before moving on to another resolver
-RESOLVER_SERIAL_FAILURE_MSG = "Unable to query connections with %s, trying %s"
-RESOLVER_FINAL_FAILURE_MSG = "We were unable to use any of your system's resolvers to get tor's connections. This is fine, but means that the connections page will be empty. This is usually permissions related so if you would like to fix this then run arm with the same user as tor (ie, \"sudo -u <tor user> arm\")."
+RESOLVER = None
 
 def conf_handler(key, value):
   if key.startswith("port.label."):
@@ -58,8 +50,10 @@ def conf_handler(key, value):
         msg = "Unable to parse port range for entry: %s" % key
         log.notice(msg)
 
-CONFIG = conf.config_dict("arm", {
-  "queries.connections.minRate": 5,
+CONFIG = conf.config_dict('arm', {
+  'queries.connections.minRate': 5,
+  'msg.unable_to_use_resolver': '',
+  'msg.unable_to_use_all_resolvers': '',
 }, conf_handler)
 
 PORT_USAGE = {}
@@ -75,52 +69,17 @@ def getPortUsage(port):
 
   return PORT_USAGE.get(port)
 
-def isResolverAlive(processName, processPid = ""):
-  """
-  This provides true if a singleton resolver instance exists for the given
-  process/pid combination, false otherwise.
-
-  Arguments:
-    processName - name of the process being checked
-    processPid  - pid of the process being checked, if undefined this matches
-                  against any resolver with the process name
-  """
-
-  for resolver in RESOLVERS:
-    if not resolver._halt and resolver.processName == processName and (not processPid or resolver.processPid == processPid):
-      return True
-
-  return False
-
-def getResolver(processName, processPid = "", alias=None):
+def get_resolver():
   """
-  Singleton constructor for resolver instances. If a resolver already exists
-  for the process then it's returned. Otherwise one is created and started.
-
-  Arguments:
-    processName - name of the process being resolved
-    processPid  - pid of the process being resolved, if undefined this matches
-                  against any resolver with the process name
-    alias       - alternative handle under which the resolver can be requested
+  Singleton constructor for a connection resolver for tor's process.
   """
 
-  # check if one's already been created
-  requestHandle = alias if alias else processName
-  haltedIndex = -1 # old instance of this resolver with the _halt flag set
-  for i in range(len(RESOLVERS)):
-    resolver = RESOLVERS[i]
-    if resolver.handle == requestHandle and (not processPid or resolver.processPid == processPid):
-      if resolver._halt and RECREATE_HALTED_RESOLVERS: haltedIndex = i
-      else: return resolver
+  global RESOLVER
 
-  # make a new resolver
-  r = ConnectionResolver(processName, processPid, handle = requestHandle)
-  r.start()
+  if RESOLVER is None:
+    RESOLVER = ConnectionResolver()
 
-  # overwrites halted instance of this resolver if it exists, otherwise append
-  if haltedIndex == -1: RESOLVERS.append(r)
-  else: RESOLVERS[haltedIndex] = r
-  return r
+  return RESOLVER
 
 class Daemon(threading.Thread):
   """
@@ -203,193 +162,133 @@ class Daemon(threading.Thread):
 
 class ConnectionResolver(Daemon):
   """
-  Service that periodically queries for a process' current connections. This
-  provides several benefits over on-demand queries:
-  - queries are non-blocking (providing cached results)
-  - falls back to use different resolution methods in case of repeated failures
-  - avoids overly frequent querying of connection data, which can be demanding
-    in terms of system resources
-
-  Unless an overriding method of resolution is requested this defaults to
-  choosing a resolver the following way:
-
-  - Checks the current PATH to determine which resolvers are available. This
-    uses the first of the following that's available:
-      netstat, ss, lsof (picks netstat if none are found)
-
-  - Attempts to resolve using the selection. Single failures are logged at the
-    INFO level, and a series of failures at NOTICE. In the later case this
-    blacklists the resolver, moving on to the next. If all resolvers fail this
-    way then resolution's abandoned and logs a WARN message.
-
-  The time between resolving connections, unless overwritten, is set to be
-  either five seconds or ten times the runtime of the resolver (whichever is
-  larger). This is to prevent systems either strapped for resources or with a
-  vast number of connections from being burdened too heavily by this daemon.
-
-  Parameters:
-    processName       - name of the process being resolved
-    processPid        - pid of the process being resolved
-    overwriteResolver - method of resolution (uses default if None)
-    * defaultResolver - resolver used by default (None if all resolution
-                        methods have been exhausted)
-    resolverOptions   - resolvers to be cycled through (differ by os)
-
-    * read-only
+  Daemon that periodically retrieves the connections made by a process.
   """
 
-  def __init__(self, processName, processPid = "", handle = None):
-    """
-    Initializes a new resolver daemon. When no longer needed it's suggested
-    that this is stopped.
+  def __init__(self):
+    Daemon.__init__(self, CONFIG["queries.connections.minRate"])
 
-    Arguments:
-      processName - name of the process being resolved
-      processPid  - pid of the process being resolved
-      handle      - name used to query this resolver, this is the processName
-                    if undefined
-    """
+    self._resolvers = connection.get_system_resolvers()
+    self._connections = []
+    self._custom_resolver = None
+    self._resolution_counter = 0  # number of successful connection resolutions
 
-    Daemon.__init__(self, CONFIG["queries.connections.minRate"])
+    self._process_pid = None
+    self._process_name = None
 
-    self.processName = processName
-    self.processPid = processPid
-    self.handle = handle if handle else processName
-    self.overwriteResolver = None
+    # Number of times in a row we've either failed with our current resolver or
+    # concluded that our rate is too low.
 
-    self.defaultResolver = None
-    self.resolverOptions = connection.get_system_resolvers()
+    self._failure_count = 0
+    self._rate_too_low_count = 0
 
-    log.info("Operating System: %s, Connection Resolvers: %s" % (os.uname()[0], ", ".join(self.resolverOptions)))
+  def task(self):
+    if self._custom_resolver:
+      resolver = self._custom_resolver
+      is_default_resolver = False
+    elif self._resolvers:
+      resolver = self._resolvers[0]
+      is_default_resolver = True
+    else:
+      return  # nothing to resolve with
 
-    if self.resolverOptions:
-      self.defaultResolver = self.resolverOptions[0]
+    try:
+      start_time = time.time()
+      self._connections = connection.get_connections(resolver, process_pid = self._process_pid, process_name = self._process_name)
+      runtime = time.time() - start_time
 
-    self._connections = []        # connection cache (latest results)
-    self._resolutionCounter = 0   # number of successful connection resolutions
-    self._subsiquentFailures = 0  # number of failed resolutions with the default in a row
-    self._resolverBlacklist = []  # resolvers that have failed to resolve
+      self._resolution_counter += 1
 
-    # Number of sequential times the threshold rate's been too low. This is to
-    # avoid having stray spikes up the rate.
-    self._rateThresholdBroken = 0
+      if is_default_resolver:
+        self._failure_count = 0
 
-  def getOverwriteResolver(self):
-    """
-    Provides the resolver connection resolution is forced to use. This returns
-    None if it's dynamically determined.
-    """
+      # Reduce our rate if connection resolution is taking a long time. This is
+      # most often an issue for extremely busy relays.
 
-    return self.overwriteResolver
+      min_rate = 100 * runtime
 
-  def setOverwriteResolver(self, overwriteResolver):
-    """
-    Sets the resolver used for connection resolution, if None then this is
-    automatically determined based on what is available.
+      if self.get_rate() < min_rate:
+        self._rate_too_low_count += 1
 
-    Arguments:
-      overwriteResolver - connection resolver to be used
-    """
+        if self._rate_too_low_count >= 3:
+          min_rate += 1  # little extra padding so we don't frequently update this
+          self.set_rate(min_rate)
+          self._rate_too_low_count = 0
+          log.trace("connection lookup time increasing to %0.1f seconds per call" % min_rate)
+      else:
+        self._rate_too_low_count = 0
+    except IOError as exc:
+      log.info(exc)
 
-    self.overwriteResolver = overwriteResolver
+      # Fail over to another resolver if we've repeatedly been unable to use
+      # this one.
 
-  def task(self):
-    isDefault = self.overwriteResolver == None
-    resolver = self.defaultResolver if isDefault else self.overwriteResolver
+      if is_default_resolver:
+        self._failure_count += 1
 
-    # checks if there's nothing to resolve with
-    if not resolver:
-      self.stop()
-      return
+        if self._failure_count >= 3:
+          self._resolvers.pop()
+          self._failure_count = 0
 
-    try:
-      resolveStart = time.time()
-      time.sleep(2)
-      from stem.util import log
-      connResults = [(conn.local_address, conn.local_port, conn.remote_address, conn.remote_port) for conn in connection.get_connections(resolver, process_pid = self.processPid, process_name = self.processName)]
-
-      lookupTime = time.time() - resolveStart
-
-      self._connections = connResults
-      self._resolutionCounter += 1
-
-      newMinDefaultRate = 100 * lookupTime
-      if self.get_rate() < newMinDefaultRate:
-        if self._rateThresholdBroken >= 3:
-          # adding extra to keep the rate from frequently changing
-          self.set_rate(newMinDefaultRate + 0.5)
-
-          log.trace("connection lookup time increasing to %0.1f seconds per call" % newMinDefaultRate)
-        else: self._rateThresholdBroken += 1
-      else: self._rateThresholdBroken = 0
-
-      if isDefault: self._subsiquentFailures = 0
-    except (ValueError, IOError), exc:
-      # this logs in a couple of cases:
-      # - special failures noted by getConnections (most cases are already
-      # logged via system)
-      # - note fail-overs for default resolution methods
-      if str(exc).startswith("No results found using:"):
-        log.info(exc)
-
-      if isDefault:
-        self._subsiquentFailures += 1
-
-        if self._subsiquentFailures >= RESOLVER_FAILURE_TOLERANCE:
-          # failed several times in a row - abandon resolver and move on to another
-          self._resolverBlacklist.append(resolver)
-          self._subsiquentFailures = 0
-
-          # pick another (non-blacklisted) resolver
-          newResolver = None
-          for r in self.resolverOptions:
-            if not r in self._resolverBlacklist:
-              newResolver = r
-              break
-
-          if newResolver:
-            # provide notice that failures have occurred and resolver is changing
-            log.notice(RESOLVER_SERIAL_FAILURE_MSG % (resolver, newResolver))
+          if self._resolvers:
+            log.notice(CONFIG['msg.unable_to_use_resolver'].format(
+              old_resolver = resolver,
+              new_resolver = self._resolvers[0],
+            ))
           else:
-            # exhausted all resolvers, give warning
-            log.notice(RESOLVER_FINAL_FAILURE_MSG)
-
-          self.defaultResolver = newResolver
+            log.notice(CONFIG['msg.unable_to_use_all_resolvers'])
 
-  def getConnections(self):
+  def set_process(self, pid, name):
     """
-    Provides the last queried connection results, an empty list if resolver
-    has been halted.
+    Sets the process we retrieve connections for.
+
+    :param int pid: process id
+    :param str name: name of the process
     """
 
-    if self._halt: return []
-    else: return list(self._connections)
+    self._process_pid = pid
+    self._process_name = name
 
-  def getResolutionCount(self):
+  def get_custom_resolver(self):
     """
-    Provides the number of successful resolutions so far. This can be used to
-    determine if the connection results are new for the caller or not.
+    Provides the custom resolver the user has selected. This is **None** if
+    we're picking resolvers dynamically.
+
+    :returns: :data:`stem.util.connection.Resolver` we're overwritten to use
     """
 
-    return self._resolutionCounter
+    return self._custom_resolver
 
-  def getPid(self):
+  def set_custom_resolver(self, resolver):
     """
-    Provides the pid used to narrow down connection resolution. This is an
-    empty string if undefined.
+    Sets the resolver used for connection resolution. If **None** then this is
+    automatically determined based on what is available.
+
+    :param stem.util.connection.Resolver resolver: resolver to use
     """
 
-    return self.processPid
+    self._custom_resolver = resolver
+
+  def get_connections(self):
+    """
+    Provides the last queried connection results, an empty list if resolver
+    has been stopped.
 
-  def setPid(self, processPid):
+    :returns: **list** of :class:`~stem.util.connection.Connection` we last retrieved
     """
-    Sets the pid used to narrow down connection resultions.
 
-    Arguments:
-      processPid - pid for the process we're fetching connections for
+    if self._halt:
+      return []
+    else:
+      return list(self._connections)
+
+  def get_resolution_count(self):
+    """
+    Provides the number of successful resolutions so far. This can be used to
+    determine if the connection results are new for the caller or not.
     """
 
-    self.processPid = processPid
+    return self._resolution_counter
 
 
 class AppResolver:



More information about the tor-commits mailing list