[tor-commits] r24473: {arm} Identifying local application attached to the control or soc (in arm/trunk: . src/interface/connections src/util)

Damian Johnson atagar1 at gmail.com
Mon Mar 28 03:21:54 UTC 2011


Author: atagar
Date: 2011-03-28 03:21:54 +0000 (Mon, 28 Mar 2011)
New Revision: 24473

Modified:
   arm/trunk/armrc.sample
   arm/trunk/src/interface/connections/connEntry.py
   arm/trunk/src/interface/connections/connPanel.py
   arm/trunk/src/util/connections.py
Log:
Identifying local application attached to the control or socks port.



Modified: arm/trunk/armrc.sample
===================================================================
--- arm/trunk/armrc.sample	2011-03-27 23:51:07 UTC (rev 24472)
+++ arm/trunk/armrc.sample	2011-03-28 03:21:54 UTC (rev 24473)
@@ -188,7 +188,7 @@
 features.connection.newPanel true
 features.connection.listingType 0
 features.connection.order 0, 2, 1
-features.connection.refreshRate 10
+features.connection.refreshRate 5
 features.connection.markInitialConnections true
 features.connection.showExitPort true
 features.connection.showColumn.fingerprint true

Modified: arm/trunk/src/interface/connections/connEntry.py
===================================================================
--- arm/trunk/src/interface/connections/connEntry.py	2011-03-27 23:51:07 UTC (rev 24472)
+++ arm/trunk/src/interface/connections/connEntry.py	2011-03-28 03:21:54 UTC (rev 24473)
@@ -18,7 +18,7 @@
 #   Directory    Fetching tor consensus information.
 #   Control      Tor controller (arm, vidalia, etc).
 
-Category = enum.Enum("INBOUND", "OUTBOUND", "EXIT", "CLIENT", "PROGRAM", "DIRECTORY", "CONTROL")
+Category = enum.Enum("INBOUND", "OUTBOUND", "EXIT", "CLIENT", "DIRECTORY", "PROGRAM", "CONTROL")
 CATEGORY_COLOR = {Category.INBOUND: "green",      Category.OUTBOUND: "blue",
                   Category.EXIT: "red",           Category.CLIENT: "cyan",
                   Category.PROGRAM: "yellow",     Category.DIRECTORY: "magenta",
@@ -199,6 +199,11 @@
     self._possibleClient = True
     self._possibleDirectory = True
     
+    # attributes for PROGRAM and CONTROL connections
+    self.appName = None
+    self.appPid = None
+    self.isAppResolving = False
+    
     conn = torTools.getConn()
     myOrPort = conn.getOption("ORPort")
     myDirPort = conn.getOption("DirPort")
@@ -278,6 +283,13 @@
     
     return myListing
   
+  def isUnresolvedApp(self):
+    """
+    True if our display uses application information that hasn't yet been resolved.
+    """
+    
+    return self.appName == None and self.getType() in (Category.PROGRAM, Category.CONTROL)
+  
   def _getListingEntry(self, width, currentTime, listingType):
     entryType = self.getType()
     
@@ -415,6 +427,20 @@
       listingType - primary attribute we're listing connections by
     """
     
+    # for applications show the command/pid
+    if self.getType() in (Category.PROGRAM, Category.CONTROL):
+      displayLabel = ""
+      
+      if self.appName:
+        if self.appPid: displayLabel = "%s (%s)" % (self.appName, self.appPid)
+        else: displayLabel = self.appName
+      elif self.isAppResolving:
+        displayLabel = "resolving..."
+      else: displayLabel = "UNKNOWN"
+      
+      return displayLabel
+    
+    # for everything else display connection/consensus information
     dstAddress = self.getDestinationLabel(26, includeLocale = True)
     etc, usedSpace = "", 0
     if listingType == entries.ListingType.IP_ADDRESS:

Modified: arm/trunk/src/interface/connections/connPanel.py
===================================================================
--- arm/trunk/src/interface/connections/connPanel.py	2011-03-27 23:51:07 UTC (rev 24472)
+++ arm/trunk/src/interface/connections/connPanel.py	2011-03-28 03:21:54 UTC (rev 24473)
@@ -10,7 +10,7 @@
 from util import connections, enum, panel, torTools, uiTools
 
 DEFAULT_CONFIG = {"features.connection.listingType": 0,
-                  "features.connection.refreshRate": 10}
+                  "features.connection.refreshRate": 5}
 
 # height of the detail panel content, not counting top and bottom border
 DETAILS_HEIGHT = 7
@@ -63,8 +63,15 @@
     # it changes.
     self._lastResourceFetch = -1
     
-    self._update() # populates initial entries
+    # resolver for the command/pid associated with PROGRAM and CONTROL connections
+    self._appResolver = connections.AppResolver("arm")
     
+    # rate limits appResolver queries to once per update
+    self.appResolveSinceUpdate = False
+    
+    self._update()            # populates initial entries
+    self._resolveApps(False)  # resolves initial PROGRAM and CONTROL applications
+    
     # mark the initially exitsing connection uptimes as being estimates
     for entry in self._entries:
       if isinstance(entry, connEntry.ConnectionEntry):
@@ -155,7 +162,11 @@
         # updates content if their's new results, otherwise just redraws
         self._update()
         self.redraw(True)
-        lastDraw += self._config["features.connection.refreshRate"]
+        
+        # we may have missed multiple updates due to being paused, showing
+        # another panel, etc so lastDraw might need to jump multiple ticks
+        drawTicks = (time.time() - lastDraw) / self._config["features.connection.refreshRate"]
+        lastDraw += self._config["features.connection.refreshRate"] * drawTicks
   
   def draw(self, width, height):
     self.valsLock.acquire()
@@ -191,6 +202,11 @@
     for lineNum in range(scrollLoc, len(self._entryLines)):
       entryLine = self._entryLines[lineNum]
       
+      # if this is an unresolved PROGRAM or CONTROL entry then queue up
+      # resolution for the applicaitions they belong to
+      if isinstance(entryLine, connEntry.ConnectionLine) and entryLine.isUnresolvedApp():
+        self._resolveApps()
+      
       # hilighting if this is the selected line
       extraFormat = curses.A_STANDOUT if entryLine == cursorSelection else curses.A_NORMAL
       
@@ -218,6 +234,7 @@
     
     connResolver = connections.getResolver("tor")
     currentResolutionCount = connResolver.getResolutionCount()
+    self.appResolveSinceUpdate = False
     
     if self._lastResourceFetch != currentResolutionCount:
       self.valsLock.acquire()
@@ -304,4 +321,55 @@
       self.setSortOrder()
       self._lastResourceFetch = currentResolutionCount
       self.valsLock.release()
+  
+  def _resolveApps(self, flagQuery = True):
+    """
+    Triggers an asynchronous query for all unresolved PROGRAM and CONTROL
+    entries.
+    
+    Arguments:
+      flagQuery - sets a flag to prevent further call from being respected
+                  until the next update if true
+    """
+    
+    if self.appResolveSinceUpdate: return
+    
+    # fetch the unresolved PROGRAM and CONTROL lines
+    unresolvedLines = []
+    
+    for line in self._entryLines:
+      if isinstance(line, connEntry.ConnectionLine) and line.isUnresolvedApp():
+        unresolvedLines.append(line)
+    
+    # Queue up resolution for the unresolved ports (skips if it's still working
+    # on the last query).
+    if not self._appResolver.isResolving:
+      self._appResolver.resolve([line.foreign.getPort() for line in unresolvedLines])
+    
+    # The application resolver might have given up querying (for instance, if
+    # the lsof lookups aren't working on this platform or lacks permissions).
+    # The isAppResolving flag lets the unresolved entries indicate if there's
+    # a lookup in progress for them or not.
+    
+    for line in unresolvedLines:
+      line.isAppResolving = self._appResolver.isResolving
+    
+    # Fetches results. If the query finishes quickly then this is what we just
+    # asked for, otherwise these belong to the last resolution.
+    appResults = self._appResolver.getResults(0.02)
+    
+    for line in unresolvedLines:
+      linePort = line.foreign.getPort()
+      
+      if linePort in appResults:
+        # sets application attributes if there's a result with this as the
+        # inbound port
+        for inboundPort, outboundPort, cmd, pid in appResults[linePort]:
+          if linePort == inboundPort:
+            line.appName = cmd
+            line.appPid = pid
+            line.isAppResolving = False
+    
+    if flagQuery:
+      self.appResolveSinceUpdate = True
 

Modified: arm/trunk/src/util/connections.py
===================================================================
--- arm/trunk/src/util/connections.py	2011-03-27 23:51:07 UTC (rev 24472)
+++ arm/trunk/src/util/connections.py	2011-03-28 03:21:54 UTC (rev 24473)
@@ -559,3 +559,168 @@
     self._cond.notifyAll()
     self._cond.release()
 
+class AppResolver:
+  """
+  Provides the names and pids of appliations attached to the given ports. This
+  stops attempting to query if it fails three times without successfully
+  getting lsof results.
+  """
+  
+  def __init__(self, scriptName = "python"):
+    """
+    Constructs a resolver instance.
+    
+    Arguments:
+      scriptName - name by which to all our own entries
+    """
+    
+    self.scriptName = scriptName
+    self.queryResults = {}
+    self.resultsLock = threading.RLock()
+    self._cond = threading.Condition()  # used for pausing when waiting for results
+    self.isResolving = False  # flag set if we're in the process of making a query
+    self.failureCount = 0     # -1 if we've made a successful query
+  
+  def getResults(self, maxWait=0):
+    """
+    Provides the last queried results. If we're in the process of making a
+    query then we can optionally block for a time to see if it finishes.
+    
+    Arguments:
+      maxWait - maximum second duration to block on getting results before
+                returning
+    """
+    
+    self._cond.acquire()
+    if self.isResolving and maxWait > 0:
+      self._cond.wait(maxWait)
+    self._cond.release()
+    
+    self.resultsLock.acquire()
+    results = dict(self.queryResults)
+    self.resultsLock.release()
+    
+    return results
+  
+  def resolve(self, ports):
+    """
+    Queues the given listing of ports to be resolved. This clears the last set
+    of results when completed.
+    
+    Arguments:
+      ports - list of ports to be resolved to applications
+    """
+    
+    if self.failureCount < 3:
+      self.isResolving = True
+      t = threading.Thread(target = self._queryApplications, kwargs = {"ports": ports})
+      t.setDaemon(True)
+      t.start()
+  
+  def _queryApplications(self, ports=[]):
+    """
+    Performs an lsof lookup on the given ports to get the command/pid tuples.
+    
+    Arguments:
+      ports - list of ports to be resolved to applications
+    """
+    
+    # atagar at fenrir:~/Desktop/arm$ lsof -i tcp:51849 -i tcp:37277
+    # COMMAND  PID   USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
+    # tor     2001 atagar   14u  IPv4  14048      0t0  TCP localhost:9051->localhost:37277 (ESTABLISHED)
+    # tor     2001 atagar   15u  IPv4  22024      0t0  TCP localhost:9051->localhost:51849 (ESTABLISHED)
+    # python  2462 atagar    3u  IPv4  14047      0t0  TCP localhost:37277->localhost:9051 (ESTABLISHED)
+    # python  3444 atagar    3u  IPv4  22023      0t0  TCP localhost:51849->localhost:9051 (ESTABLISHED)
+    
+    if not ports:
+      self.resultsLock.acquire()
+      self.queryResults = {}
+      self.isResolving = False
+      self.resultsLock.release()
+      
+      # wakes threads waiting on results
+      self._cond.acquire()
+      self._cond.notifyAll()
+      self._cond.release()
+      
+      return
+    
+    results = {}
+    lsofArgs = []
+    
+    # Uses results from the last query if we have any, otherwise appends the
+    # port to the lsof command. This has the potential for persisting dirty
+    # results but if we're querying by the dynamic port on the local tcp
+    # connections then this should be very rare (and definitely worth the
+    # chance of being able to skip an lsof query altogether).
+    for port in ports:
+      if port in self.queryResults:
+        results[port] = self.queryResults[port]
+      else: lsofArgs.append("-i tcp:%s" % port)
+    
+    if lsofArgs:
+      lsofResults = sysTools.call("lsof " + " ".join(lsofArgs))
+    else: lsofResults = None
+    
+    if not lsofResults and self.failureCount != -1:
+      # lsof query failed and we aren't yet sure if it's possible to
+      # successfuly get results on this platform
+      self.failureCount += 1
+      self.isResolving = False
+      return
+    elif lsofResults:
+      # (iPort, oPort) tuple for our own process, if it was fetched
+      ourConnection = None
+      
+      for line in lsofResults:
+        lineComp = line.split()
+        
+        if len(lineComp) == 10 and lineComp[9] == "(ESTABLISHED)":
+          cmd, pid, _, _, _, _, _, _, portMap, _ = lineComp
+          
+          if "->" in portMap:
+            iPort, oPort = portMap.split("->")
+            iPort = iPort.replace("localhost:", "")
+            oPort = oPort.replace("localhost:", "")
+            
+            # entry belongs to our own process
+            if pid == str(os.getpid()):
+              cmd = self.scriptName
+              ourConnection = (iPort, oPort)
+            
+            if iPort.isdigit() and oPort.isdigit():
+              newEntry = (iPort, oPort, cmd, pid)
+              
+              # adds the entry under the key of whatever we queried it with
+              # (this might be both the inbound _and_ outbound ports)
+              for portMatch in (iPort, oPort):
+                if portMatch in ports:
+                  if portMatch in results:
+                    results[portMatch].append(newEntry)
+                  else: results[portMatch] = [newEntry]
+      
+      # making the lsof call generated an extranious sh entry for our own connection
+      if ourConnection:
+        for ourPort in ourConnection:
+          if ourPort in results:
+            shIndex = None
+            
+            for i in range(len(results[ourPort])):
+              if results[ourPort][i][2] == "sh":
+                shIndex = i
+                break
+            
+            if shIndex != None:
+              del results[ourPort][shIndex]
+    
+    self.resultsLock.acquire()
+    self.failureCount = -1
+    self.queryResults = results
+    self.isResolving = False
+    self.resultsLock.release()
+    
+    # wakes threads waiting on results
+    self._cond.acquire()
+    self._cond.notifyAll()
+    self._cond.release()
+



More information about the tor-commits mailing list