commit 726982e84028587ef9dcc6a543493d112851a632
Author: Damian Johnson <atagar(a)torproject.org>
Date: Tue Jan 7 09:18:03 2014 -0800
Replacing AppResolver with a tracker counterpart
Finally polishing off replacement of the connections util. The last class, the
AppResolver, is being replaced by a new Daemon subclass called the
PortUsageTracker.
---
arm/config/strings.cfg | 2 +
arm/connections/connPanel.py | 17 ++--
arm/util/connections.py | 170 -------------------------------
arm/util/tracker.py | 77 +++++++++++++-
armrc.sample | 3 +-
test/util/tracker/port_usage_tracker.py | 38 ++++++-
6 files changed, 126 insertions(+), 181 deletions(-)
diff --git a/arm/config/strings.cfg b/arm/config/strings.cfg
index 85c23f4..60194d2 100644
--- a/arm/config/strings.cfg
+++ b/arm/config/strings.cfg
@@ -36,7 +36,9 @@ msg.setup.unable_to_determine_pid Unable to determine Tor's pid. Some informatio
msg.setup.unknown_event_types arm doesn't recognize the following event types: {event_types} (log 'UNKNOWN' events to see them)
msg.tracker.abort_getting_resources Failed three attempts to get process resource usage from {resolver}, {response} ({exc})
+msg.tracker.abort_getting_port_usage Failed three attempts to determine the process using active ports ({exc})
msg.tracker.lookup_rate_increased connection lookup time increasing to {seconds} seconds per call
+msg.tracker.unable_to_get_port_usages Unable to query the processes using ports usage lsof ({exc})
msg.tracker.unable_to_get_resources Unable to query process resource usage from {resolver} ({exc})
msg.tracker.unable_to_use_all_resolvers We were unable to use any of your system's resolvers to get tor's connections. This is fine, but means that the connections page will be empty. This is usually permissions related so if you would like to fix this then run arm with the same user as tor (ie, "sudo -u <tor user> arm").
msg.tracker.unable_to_use_resolver Unable to query connections with {old_resolver}, trying {new_resolver}
diff --git a/arm/connections/connPanel.py b/arm/connections/connPanel.py
index cbf9902..bded9bc 100644
--- a/arm/connections/connPanel.py
+++ b/arm/connections/connPanel.py
@@ -11,10 +11,10 @@ import arm.popups
import arm.util.tracker
from arm.connections import countPopup, descriptorPopup, entries, connEntry, circEntry
-from arm.util import connections, panel, torTools, uiTools
+from arm.util import panel, torTools, tracker, uiTools
from stem.control import State
-from stem.util import conf, enum
+from stem.util import conf, connection, enum
# height of the detail panel content, not counting top and bottom border
DETAILS_HEIGHT = 7
@@ -106,7 +106,7 @@ class ConnectionPanel(panel.Panel, threading.Thread):
self._lastResourceFetch = -1
# resolver for the command/pid associated with SOCKS, HIDDEN, and CONTROL connections
- self._appResolver = connections.AppResolver("arm")
+ self._appResolver = tracker.get_port_usage_tracker()
# rate limits appResolver queries to once per update
self.appResolveSinceUpdate = False
@@ -241,7 +241,7 @@ class ConnectionPanel(panel.Panel, threading.Thread):
elif key == ord('u') or key == ord('U'):
# provides a menu to pick the connection resolver
title = "Resolver Util:"
- options = ["auto"] + list(connections.Resolver)
+ options = ["auto"] + list(connection.Resolver)
connResolver = arm.util.tracker.get_connection_tracker()
currentOverwrite = connResolver.get_custom_resolver()
@@ -553,8 +553,8 @@ class ConnectionPanel(panel.Panel, threading.Thread):
# Queue up resolution for the unresolved ports (skips if it's still working
# on the last query).
- if appPorts and not self._appResolver.isResolving:
- self._appResolver.resolve(appPorts)
+ if appPorts and not self._appResolver.is_alive():
+ self._appResolver.get_processes_using_ports(appPorts)
# Fetches results. If the query finishes quickly then this is what we just
# asked for, otherwise these belong to an earlier resolution.
@@ -564,7 +564,8 @@ class ConnectionPanel(panel.Panel, threading.Thread):
# The isAppResolving flag lets the unresolved entries indicate if there's
# a lookup in progress for them or not.
- appResults = self._appResolver.getResults(0.2)
+ time.sleep(0.2) # TODO: previous resolver only blocked while awaiting a lookup
+ appResults = self._appResolver.get_processes_using_ports(appPorts)
for line in unresolvedLines:
isLocal = line.getType() == connEntry.Category.HIDDEN
@@ -581,7 +582,7 @@ class ConnectionPanel(panel.Panel, threading.Thread):
line.appPid = pid
line.isAppResolving = False
else:
- line.isAppResolving = self._appResolver.isResolving
+ line.isAppResolving = self._appResolver.is_alive
if flagQuery:
self.appResolveSinceUpdate = True
diff --git a/arm/util/connections.py b/arm/util/connections.py
deleted file mode 100644
index 82ed62e..0000000
--- a/arm/util/connections.py
+++ /dev/null
@@ -1,170 +0,0 @@
-import os
-import threading
-
-from stem.util import system
-
-class AppResolver:
- """
- Provides the names and pids of appliations attached to the given ports. This
- stops attempting to query if it fails three times without successfully
- getting lsof results.
- """
-
- def __init__(self, scriptName = "python"):
- """
- Constructs a resolver instance.
-
- Arguments:
- scriptName - name by which to all our own entries
- """
-
- self.scriptName = scriptName
- self.queryResults = {}
- self.resultsLock = threading.RLock()
- self._cond = threading.Condition() # used for pausing when waiting for results
- self.isResolving = False # flag set if we're in the process of making a query
- self.failureCount = 0 # -1 if we've made a successful query
-
- def getResults(self, maxWait=0):
- """
- Provides the last queried results. If we're in the process of making a
- query then we can optionally block for a time to see if it finishes.
-
- Arguments:
- maxWait - maximum second duration to block on getting results before
- returning
- """
-
- self._cond.acquire()
- if self.isResolving and maxWait > 0:
- self._cond.wait(maxWait)
- self._cond.release()
-
- self.resultsLock.acquire()
- results = dict(self.queryResults)
- self.resultsLock.release()
-
- return results
-
- def resolve(self, ports):
- """
- Queues the given listing of ports to be resolved. This clears the last set
- of results when completed.
-
- Arguments:
- ports - list of ports to be resolved to applications
- """
-
- if self.failureCount < 3:
- self.isResolving = True
- t = threading.Thread(target = self._queryApplications, kwargs = {"ports": ports})
- t.setDaemon(True)
- t.start()
-
- def _queryApplications(self, ports=[]):
- """
- Performs an lsof lookup on the given ports to get the command/pid tuples.
-
- Arguments:
- ports - list of ports to be resolved to applications
- """
-
- # atagar@fenrir:~/Desktop/arm$ lsof -i tcp:51849 -i tcp:37277
- # COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
- # tor 2001 atagar 14u IPv4 14048 0t0 TCP localhost:9051->localhost:37277 (ESTABLISHED)
- # tor 2001 atagar 15u IPv4 22024 0t0 TCP localhost:9051->localhost:51849 (ESTABLISHED)
- # python 2462 atagar 3u IPv4 14047 0t0 TCP localhost:37277->localhost:9051 (ESTABLISHED)
- # python 3444 atagar 3u IPv4 22023 0t0 TCP localhost:51849->localhost:9051 (ESTABLISHED)
-
- if not ports:
- self.resultsLock.acquire()
- self.queryResults = {}
- self.isResolving = False
- self.resultsLock.release()
-
- # wakes threads waiting on results
- self._cond.acquire()
- self._cond.notifyAll()
- self._cond.release()
-
- return
-
- results = {}
- lsofArgs = []
-
- # Uses results from the last query if we have any, otherwise appends the
- # port to the lsof command. This has the potential for persisting dirty
- # results but if we're querying by the dynamic port on the local tcp
- # connections then this should be very rare (and definitely worth the
- # chance of being able to skip an lsof query altogether).
- for port in ports:
- if port in self.queryResults:
- results[port] = self.queryResults[port]
- else: lsofArgs.append("-i tcp:%s" % port)
-
- if lsofArgs:
- lsofResults = system.call("lsof -nP " + " ".join(lsofArgs))
- else: lsofResults = None
-
- if not lsofResults and self.failureCount != -1:
- # lsof query failed and we aren't yet sure if it's possible to
- # successfully get results on this platform
- self.failureCount += 1
- self.isResolving = False
- return
- elif lsofResults:
- # (iPort, oPort) tuple for our own process, if it was fetched
- ourConnection = None
-
- for line in lsofResults:
- lineComp = line.split()
-
- if len(lineComp) == 10 and lineComp[9] == "(ESTABLISHED)":
- cmd, pid, _, _, _, _, _, _, portMap, _ = lineComp
-
- if "->" in portMap:
- iPort, oPort = portMap.split("->")
- iPort = iPort.split(":")[1]
- oPort = oPort.split(":")[1]
-
- # entry belongs to our own process
- if pid == str(os.getpid()):
- cmd = self.scriptName
- ourConnection = (iPort, oPort)
-
- if iPort.isdigit() and oPort.isdigit():
- newEntry = (iPort, oPort, cmd, pid)
-
- # adds the entry under the key of whatever we queried it with
- # (this might be both the inbound _and_ outbound ports)
- for portMatch in (iPort, oPort):
- if portMatch in ports:
- if portMatch in results:
- results[portMatch].append(newEntry)
- else: results[portMatch] = [newEntry]
-
- # making the lsof call generated an extraneous sh entry for our own connection
- if ourConnection:
- for ourPort in ourConnection:
- if ourPort in results:
- shIndex = None
-
- for i in range(len(results[ourPort])):
- if results[ourPort][i][2] == "sh":
- shIndex = i
- break
-
- if shIndex != None:
- del results[ourPort][shIndex]
-
- self.resultsLock.acquire()
- self.failureCount = -1
- self.queryResults = results
- self.isResolving = False
- self.resultsLock.release()
-
- # wakes threads waiting on results
- self._cond.acquire()
- self._cond.notifyAll()
- self._cond.release()
-
diff --git a/arm/util/tracker.py b/arm/util/tracker.py
index fe6b822..e56630d 100644
--- a/arm/util/tracker.py
+++ b/arm/util/tracker.py
@@ -45,12 +45,14 @@ from stem.util import conf, connection, log, proc, str_tools, system
from arm.util import tor_controller, debug, info, notice
CONFIG = conf.config_dict('arm', {
- 'queries.resources.rate': 5,
'queries.connections.rate': 5,
+ 'queries.resources.rate': 5,
+ 'queries.port_usage.rate': 5,
})
CONNECTION_TRACKER = None
RESOURCE_TRACKER = None
+PORT_USAGE_TRACKER = None
Resources = collections.namedtuple('Resources', [
'cpu_sample',
@@ -88,6 +90,19 @@ def get_resource_tracker():
return RESOURCE_TRACKER
+def get_port_usage_tracker():
+ """
+ Singleton for tracking the process using a set of ports.
+ """
+
+ global PORT_USAGE_TRACKER
+
+ if PORT_USAGE_TRACKER is None:
+ PORT_USAGE_TRACKER = PortUsageTracker(CONFIG['queries.port_usage.rate'])
+
+ return PORT_USAGE_TRACKER
+
+
def stop_trackers():
"""
Halts active trackers, providing back the thread shutting them down.
@@ -585,3 +600,63 @@ class ResourceTracker(Daemon):
debug('tracker.unable_to_get_resources', resolver = 'ps', exc = exc)
return False
+
+
+class PortUsageTracker(Daemon):
+ """
+ Periodically retrieves the processes using a set of ports.
+ """
+
+ def __init__(self, rate):
+ super(PortUsageTracker, self).__init__(rate)
+
+ self._last_requested_ports = []
+ self._processes_for_ports = {}
+ self._failure_count = 0 # number of times in a row we've failed to get results
+
+ def get_processes_using_ports(self, ports):
+ """
+ Registers a given set of ports for further lookups, and returns the last
+ set of 'port => process' mappings we retrieved. Note that this means that
+ we will not return the requested ports unless they're requested again after
+ a successful lookup has been performed.
+
+ :param list ports: port numbers to look up
+
+ :returns: **dict** mapping port numbers to the process using it
+ """
+
+ self._last_requested_ports = ports
+ return self._processes_for_ports
+
+ def _task(self, process_pid, process_name):
+ ports = self._last_requested_ports
+
+ if not ports:
+ return True
+
+ result = {}
+
+ # Use cached results from our last lookup if available.
+
+ for port, process in self._processes_for_ports.items():
+ if port in ports:
+ result[port] = process
+ ports.remove(port)
+
+ try:
+ result.update(_process_for_ports(ports, ports))
+
+ self._processes_for_ports = result
+ self._failure_count = 0
+ return True
+ except IOError as exc:
+ self._failure_count += 1
+
+ if self._failure_count >= 3:
+ info('tracker.abort_getting_port_usage', exc = exc)
+ self.stop()
+ else:
+ debug('tracker.unable_to_get_port_usages', exc = exc)
+
+ return False
diff --git a/armrc.sample b/armrc.sample
index a4f7a71..540bb6d 100644
--- a/armrc.sample
+++ b/armrc.sample
@@ -5,8 +5,9 @@ startup.dataDirectory ~/.arm
# Seconds between querying information
-queries.resources.rate 5
queries.connections.rate 5
+queries.resources.rate 5
+queries.port_usage.rate 5
queries.refreshRate.rate 5
diff --git a/test/util/tracker/port_usage_tracker.py b/test/util/tracker/port_usage_tracker.py
index a0fecdb..2f3856b 100644
--- a/test/util/tracker/port_usage_tracker.py
+++ b/test/util/tracker/port_usage_tracker.py
@@ -1,6 +1,7 @@
+import time
import unittest
-from arm.util.tracker import _process_for_ports
+from arm.util.tracker import PortUsageTracker, _process_for_ports
from mock import Mock, patch
@@ -79,3 +80,38 @@ class TestPortUsageTracker(unittest.TestCase):
for test_input in test_inputs:
call_mock.return_value = test_input.split('\n')
self.assertRaises(IOError, _process_for_ports, [80], [443])
+
+ @patch('arm.util.tracker.tor_controller')
+ @patch('arm.util.tracker._process_for_ports')
+ @patch('arm.util.tracker.system', Mock(return_value = Mock()))
+ def test_fetching_samplings(self, process_for_ports_mock, tor_controller_mock):
+ tor_controller_mock().get_pid.return_value = 12345
+ process_for_ports_mock.return_value = {37277: 'python', 51849: 'tor'}
+
+ with PortUsageTracker(0.02) as daemon:
+ time.sleep(0.01)
+
+ self.assertEqual({}, daemon.get_processes_using_ports([37277, 51849]))
+ time.sleep(0.04)
+
+ self.assertEqual({37277: 'python', 51849: 'tor'}, daemon.get_processes_using_ports([37277, 51849]))
+
+ @patch('arm.util.tracker.tor_controller')
+ @patch('arm.util.tracker._process_for_ports')
+ @patch('arm.util.tracker.system', Mock(return_value = Mock()))
+ def test_resolver_failover(self, process_for_ports_mock, tor_controller_mock):
+ tor_controller_mock().get_pid.return_value = 12345
+ process_for_ports_mock.side_effect = IOError()
+
+ with PortUsageTracker(0.01) as daemon:
+ # We shouldn't attempt lookups (nor encounter failures) without ports to
+ # query.
+
+ time.sleep(0.05)
+ self.assertEqual(0, daemon._failure_count)
+
+ daemon.get_processes_using_ports([37277, 51849])
+ time.sleep(0.03)
+ self.assertTrue(daemon.is_alive())
+ time.sleep(0.1)
+ self.assertFalse(daemon.is_alive())