[tor-commits] [arm/master] Daemon class for backing our ConnectionResolver

atagar at torproject.org atagar at torproject.org
Mon Sep 23 22:02:56 UTC 2013


commit 09786dea15a9762c2a6dc72c8df8188729d92471
Author: Damian Johnson <atagar at torproject.org>
Date:   Mon Sep 23 13:15:53 2013 -0700

    Daemon class for backing our ConnectionResolver
    
    We have a couple resolvers that pull information at a set rate. First splitting
    the ConnectionResolver between its specialized functionality and more general
    worker aspects, hopefully we'll be able to reuse this for our system resource
    fetcher without much trouble.
---
 arm/controller.py       |    6 +-
 arm/menu/actions.py     |    4 +-
 arm/util/connections.py |  256 ++++++++++++++++++++++++++---------------------
 3 files changed, 148 insertions(+), 118 deletions(-)

diff --git a/arm/controller.py b/arm/controller.py
index 3623a2f..c10551d 100644
--- a/arm/controller.py
+++ b/arm/controller.py
@@ -492,7 +492,7 @@ def connResetListener(controller, eventType, _):
 
   if connections.isResolverAlive("tor"):
     resolver = connections.getResolver("tor")
-    resolver.setPaused(eventType == State.CLOSED)
+    resolver.set_paused(eventType == State.CLOSED)
 
     if eventType in (State.INIT, State.RESET):
       # Reload the torrc contents. If the torrc panel is present then it will
@@ -523,7 +523,7 @@ def start_arm(start_time):
 
     if controller.get_conf("DisableDebuggerAttachment", None) == "1":
       log.notice("Tor is preventing system utilities like netstat and lsof from working. This means that arm can't provide you with connection information. You can change this by adding 'DisableDebuggerAttachment 0' to your torrc and restarting tor. For more information see...\nhttps://trac.torproject.org/3313")
-      connections.getResolver("tor").setPaused(True)
+      connections.getResolver("tor").set_paused(True)
     else:
       # Configures connection resoultions. This is paused/unpaused according to
       # if Tor's connected or not.
@@ -543,7 +543,7 @@ def start_arm(start_time):
       else:
         # constructs singleton resolver and, if tor isn't connected, initizes
         # it to be paused
-        connections.getResolver("tor").setPaused(not controller.is_alive())
+        connections.getResolver("tor").set_paused(not controller.is_alive())
 
   try:
     curses.wrapper(drawTorMonitor, start_time)
diff --git a/arm/menu/actions.py b/arm/menu/actions.py
index 052b249..055cc46 100644
--- a/arm/menu/actions.py
+++ b/arm/menu/actions.py
@@ -11,6 +11,8 @@ import arm.graphing.graphPanel
 
 from arm.util import connections, torTools, uiTools
 
+import stem.util.connection
+
 from stem.util import conf, str_tools
 
 CONFIG = conf.config_dict("arm", {
@@ -242,7 +244,7 @@ def makeConnectionsMenu(connPanel):
 
   resolverMenu.add(arm.menu.item.SelectionMenuItem("auto", resolverGroup, None))
 
-  for option in connections.Resolver:
+  for option in stem.util.connection.Resolver:
     resolverMenu.add(arm.menu.item.SelectionMenuItem(option, resolverGroup, option))
 
   connectionsMenu.add(resolverMenu)
diff --git a/arm/util/connections.py b/arm/util/connections.py
index 2eb7095..dd34efb 100644
--- a/arm/util/connections.py
+++ b/arm/util/connections.py
@@ -122,7 +122,86 @@ def getResolver(processName, processPid = "", alias=None):
   else: RESOLVERS[haltedIndex] = r
   return r
 
-class ConnectionResolver(threading.Thread):
+class Daemon(threading.Thread):
+  """
+  Daemon that can perform a unit of work at a given rate.
+  """
+
+  def __init__(self, rate):
+    threading.Thread.__init__(self)
+    self.daemon = True
+
+    self._rate = rate
+    self._last_ran = -1  # time when we last ran
+
+    self._is_paused = False
+    self._halt = False  # terminates thread if true
+    self._cond = threading.Condition()  # used for pausing the thread
+
+  def run(self):
+    while not self._halt:
+      time_since_last_ran = time.time() - self._last_ran
+
+      if self._is_paused or time_since_last_ran < self._rate:
+        sleep_duration = max(0.2, self._rate - time_since_last_ran)
+
+        self._cond.acquire()
+        if not self._halt:
+          self._cond.wait(sleep_duration)
+        self._cond.release()
+
+        continue  # done waiting, try again
+
+      self.task()
+      self._last_ran = time.time()
+
+  def task(self):
+    """
+    Task the resolver is meant to perform. This should be implemented by
+    subclasses.
+    """
+
+    pass
+
+  def get_rate(self):
+    """
+    Provides the rate at which we perform our given task.
+
+    :returns: **float** for the rate in seconds at which we perform our task
+    """
+
+    return self._rate
+
+  def set_rate(self, rate):
+    """
+    Sets the rate at which we perform our task in seconds.
+
+    :param float rate: rate at which to perform work in seconds
+    """
+
+    self._rate = rate
+
+  def set_paused(self, pause):
+    """
+    Either resumes or holds off on doing further work.
+
+    :param bool pause: halts work if **True**, resumes otherwise
+    """
+
+    self._is_paused = pause
+
+  def stop(self):
+    """
+    Halts further work and terminates the thread.
+    """
+
+    self._cond.acquire()
+    self._halt = True
+    self._cond.notifyAll()
+    self._cond.release()
+
+
+class ConnectionResolver(Daemon):
   """
   Service that periodically queries for a process' current connections. This
   provides several benefits over on-demand queries:
@@ -151,11 +230,6 @@ class ConnectionResolver(threading.Thread):
   Parameters:
     processName       - name of the process being resolved
     processPid        - pid of the process being resolved
-    resolveRate       - minimum time between resolving connections (in seconds,
-                        None if using the default)
-    * defaultRate     - default time between resolving connections
-    lastLookup        - time connections were last resolved (unix time, -1 if
-                        no resolutions have yet been successful)
     overwriteResolver - method of resolution (uses default if None)
     * defaultResolver - resolver used by default (None if all resolution
                         methods have been exhausted)
@@ -164,7 +238,7 @@ class ConnectionResolver(threading.Thread):
     * read-only
   """
 
-  def __init__(self, processName, processPid = "", resolveRate = None, handle = None):
+  def __init__(self, processName, processPid = "", handle = None):
     """
     Initializes a new resolver daemon. When no longer needed it's suggested
     that this is stopped.
@@ -172,21 +246,15 @@ class ConnectionResolver(threading.Thread):
     Arguments:
       processName - name of the process being resolved
       processPid  - pid of the process being resolved
-      resolveRate - time between resolving connections (in seconds, None if
-                    chosen dynamically)
       handle      - name used to query this resolver, this is the processName
                     if undefined
     """
 
-    threading.Thread.__init__(self)
-    self.setDaemon(True)
+    Daemon.__init__(self, CONFIG["queries.connections.minRate"])
 
     self.processName = processName
     self.processPid = processPid
-    self.resolveRate = resolveRate
     self.handle = handle if handle else processName
-    self.defaultRate = CONFIG["queries.connections.minRate"]
-    self.lastLookup = -1
     self.overwriteResolver = None
 
     self.defaultResolver = None
@@ -199,9 +267,6 @@ class ConnectionResolver(threading.Thread):
 
     self._connections = []        # connection cache (latest results)
     self._resolutionCounter = 0   # number of successful connection resolutions
-    self._isPaused = False
-    self._halt = False            # terminates thread if true
-    self._cond = threading.Condition()  # used for pausing the thread
     self._subsiquentFailures = 0  # number of failed resolutions with the default in a row
     self._resolverBlacklist = []  # resolvers that have failed to resolve
 
@@ -228,83 +293,68 @@ class ConnectionResolver(threading.Thread):
 
     self.overwriteResolver = overwriteResolver
 
-  def run(self):
-    while not self._halt:
-      minWait = self.resolveRate if self.resolveRate else self.defaultRate
-      timeSinceReset = time.time() - self.lastLookup
-
-      if self._isPaused or timeSinceReset < minWait:
-        sleepTime = max(0.2, minWait - timeSinceReset)
-
-        self._cond.acquire()
-        if not self._halt: self._cond.wait(sleepTime)
-        self._cond.release()
-
-        continue # done waiting, try again
-
-      isDefault = self.overwriteResolver == None
-      resolver = self.defaultResolver if isDefault else self.overwriteResolver
-
-      # checks if there's nothing to resolve with
-      if not resolver:
-        self.lastLookup = time.time() # avoids a busy wait in this case
-        continue
+  def task(self):
+    isDefault = self.overwriteResolver == None
+    resolver = self.defaultResolver if isDefault else self.overwriteResolver
 
-      try:
-        resolveStart = time.time()
-        time.sleep(2)
-        from stem.util import log
-        connResults = [(conn.local_address, conn.local_port, conn.remote_address, conn.remote_port) for conn in connection.get_connections(resolver, process_pid = self.processPid, process_name = self.processName)]
-
-        lookupTime = time.time() - resolveStart
-
-        self._connections = connResults
-        self._resolutionCounter += 1
-
-        newMinDefaultRate = 100 * lookupTime
-        if self.defaultRate < newMinDefaultRate:
-          if self._rateThresholdBroken >= 3:
-            # adding extra to keep the rate from frequently changing
-            self.defaultRate = newMinDefaultRate + 0.5
-
-            log.trace("connection lookup time increasing to %0.1f seconds per call" % self.defaultRate)
-          else: self._rateThresholdBroken += 1
-        else: self._rateThresholdBroken = 0
-
-        if isDefault: self._subsiquentFailures = 0
-      except (ValueError, IOError), exc:
-        # this logs in a couple of cases:
-        # - special failures noted by getConnections (most cases are already
-        # logged via system)
-        # - note fail-overs for default resolution methods
-        if str(exc).startswith("No results found using:"):
-          log.info(exc)
-
-        if isDefault:
-          self._subsiquentFailures += 1
-
-          if self._subsiquentFailures >= RESOLVER_FAILURE_TOLERANCE:
-            # failed several times in a row - abandon resolver and move on to another
-            self._resolverBlacklist.append(resolver)
-            self._subsiquentFailures = 0
-
-            # pick another (non-blacklisted) resolver
-            newResolver = None
-            for r in self.resolverOptions:
-              if not r in self._resolverBlacklist:
-                newResolver = r
-                break
-
-            if newResolver:
-              # provide notice that failures have occurred and resolver is changing
-              log.notice(RESOLVER_SERIAL_FAILURE_MSG % (resolver, newResolver))
-            else:
-              # exhausted all resolvers, give warning
-              log.notice(RESOLVER_FINAL_FAILURE_MSG)
+    # checks if there's nothing to resolve with
+    if not resolver:
+      self.stop()
+      return
 
-            self.defaultResolver = newResolver
-      finally:
-        self.lastLookup = time.time()
+    try:
+      resolveStart = time.time()
+      time.sleep(2)
+      from stem.util import log
+      connResults = [(conn.local_address, conn.local_port, conn.remote_address, conn.remote_port) for conn in connection.get_connections(resolver, process_pid = self.processPid, process_name = self.processName)]
+
+      lookupTime = time.time() - resolveStart
+
+      self._connections = connResults
+      self._resolutionCounter += 1
+
+      newMinDefaultRate = 100 * lookupTime
+      if self.get_rate() < newMinDefaultRate:
+        if self._rateThresholdBroken >= 3:
+          # adding extra to keep the rate from frequently changing
+          self.set_rate(newMinDefaultRate + 0.5)
+
+          log.trace("connection lookup time increasing to %0.1f seconds per call" % newMinDefaultRate)
+        else: self._rateThresholdBroken += 1
+      else: self._rateThresholdBroken = 0
+
+      if isDefault: self._subsiquentFailures = 0
+    except (ValueError, IOError), exc:
+      # this logs in a couple of cases:
+      # - special failures noted by getConnections (most cases are already
+      # logged via system)
+      # - note fail-overs for default resolution methods
+      if str(exc).startswith("No results found using:"):
+        log.info(exc)
+
+      if isDefault:
+        self._subsiquentFailures += 1
+
+        if self._subsiquentFailures >= RESOLVER_FAILURE_TOLERANCE:
+          # failed several times in a row - abandon resolver and move on to another
+          self._resolverBlacklist.append(resolver)
+          self._subsiquentFailures = 0
+
+          # pick another (non-blacklisted) resolver
+          newResolver = None
+          for r in self.resolverOptions:
+            if not r in self._resolverBlacklist:
+              newResolver = r
+              break
+
+          if newResolver:
+            # provide notice that failures have occurred and resolver is changing
+            log.notice(RESOLVER_SERIAL_FAILURE_MSG % (resolver, newResolver))
+          else:
+            # exhausted all resolvers, give warning
+            log.notice(RESOLVER_FINAL_FAILURE_MSG)
+
+          self.defaultResolver = newResolver
 
   def getConnections(self):
     """
@@ -341,28 +391,6 @@ class ConnectionResolver(threading.Thread):
 
     self.processPid = processPid
 
-  def setPaused(self, isPause):
-    """
-    Allows or prevents further connection resolutions (this still makes use of
-    cached results).
-
-    Arguments:
-      isPause - puts a freeze on further resolutions if true, allows them to
-                continue otherwise
-    """
-
-    if isPause == self._isPaused: return
-    self._isPaused = isPause
-
-  def stop(self):
-    """
-    Halts further resolutions and terminates the thread.
-    """
-
-    self._cond.acquire()
-    self._halt = True
-    self._cond.notifyAll()
-    self._cond.release()
 
 class AppResolver:
   """





More information about the tor-commits mailing list