[tor-commits] [arm/master] Better thread safety for tracker operations

atagar at torproject.org atagar at torproject.org
Tue Oct 29 03:39:22 UTC 2013


commit fbaa098f4731a6d3bd2713033efc4bfbf62b913a
Author: Damian Johnson <atagar at torproject.org>
Date:   Mon Oct 28 11:45:32 2013 -0700

    Better thread safety for tracker operations
    
    Resolving tor's pid and process name under a daemon lock, which is also used
    for the tracker tasks (where it's used).
---
 arm/util/tracker.py |   61 +++++++++++++++++++++++++++------------------------
 1 file changed, 32 insertions(+), 29 deletions(-)

diff --git a/arm/util/tracker.py b/arm/util/tracker.py
index 8e93e58..647acee 100644
--- a/arm/util/tracker.py
+++ b/arm/util/tracker.py
@@ -3,7 +3,7 @@ Background tasks for gathering informatino about the tor process.
 
 ::
 
-  get_connection_tracker - provides a ConnectionResolver for our tor process
+  get_connection_tracker - provides a ConnectionTracker for our tor process
   get_resource_tracker - provides a ResourceTracker for our tor process
 
   Daemon - common parent for resolvers
@@ -13,7 +13,7 @@ Background tasks for gathering informatino about the tor process.
     |- set_paused - pauses or continues work
     +- stop - stops further work by the daemon
 
-  ConnectionResolver - periodically checks the connections established by tor
+  ConnectionTracker - periodically checks the connections established by tor
     |- get_custom_resolver - provide the custom conntion resolver we're using
     |- set_custom_resolver - overwrites automatic resolver selecion with a custom resolver
     +- get_connections - provides our latest connection results
@@ -32,8 +32,8 @@ import arm.util.torTools
 from stem.control import State
 from stem.util import conf, connection, log, proc, str_tools, system
 
-CONFIG = conf.config_dict("arm", {
-  "queries.resourceUsage.rate": 5,
+CONFIG = conf.config_dict('arm', {
+  'queries.resourceUsage.rate': 5,
   'queries.connections.minRate': 5,
   'msg.unable_to_use_resolver': '',
   'msg.unable_to_use_all_resolvers': '',
@@ -64,7 +64,7 @@ def get_connection_tracker():
   global CONNECTION_TRACKER
 
   if CONNECTION_TRACKER is None:
-    CONNECTION_TRACKER = ConnectionResolver()
+    CONNECTION_TRACKER = ConnectionTracker()
 
   return CONNECTION_TRACKER
 
@@ -91,6 +91,10 @@ class Daemon(threading.Thread):
     threading.Thread.__init__(self)
     self.daemon = True
 
+    # PID and process name of the tor process we're tracking. These should only
+    # be used under the daemon's lock.
+
+    self._daemon_lock = threading.RLock()
     self._process_name = None
     self._process_pid = None
 
@@ -99,8 +103,8 @@ class Daemon(threading.Thread):
     self._run_counter = 0  # counter for the number of successful runs
 
     self._is_paused = False
+    self._pause_condition = threading.Condition()
     self._halt = False  # terminates thread if true
-    self._cond = threading.Condition()  # used for pausing the thread
 
     controller = arm.util.torTools.getConn().controller
     controller.add_status_listener(self._tor_status_listener)
@@ -113,24 +117,24 @@ class Daemon(threading.Thread):
       if self._is_paused or time_since_last_ran < self._rate:
         sleep_duration = max(0.2, self._rate - time_since_last_ran)
 
-        self._cond.acquire()
-        if not self._halt:
-          self._cond.wait(sleep_duration)
-        self._cond.release()
+        with self._pause_condition:
+          if not self._halt:
+            self._pause_condition.wait(sleep_duration)
 
         continue  # done waiting, try again
 
-      is_successful = self.task()
+      with self._daemon_lock:
+        is_successful = self._task()
 
       if is_successful:
         self._run_counter += 1
 
       self._last_ran = time.time()
 
-  def task(self):
+  def _task(self):
     """
     Task the resolver is meant to perform. This should be implemented by
-    subclasses.
+    subclasses. This is executed under the daemon's lock.
     """
 
     pass
@@ -175,24 +179,24 @@ class Daemon(threading.Thread):
     Halts further work and terminates the thread.
     """
 
-    self._cond.acquire()
-    self._halt = True
-    self._cond.notifyAll()
-    self._cond.release()
+    with self._pause_condition:
+      self._halt = True
+      self._pause_condition.notifyAll()
 
   def _tor_status_listener(self, controller, event_type, _):
-    if event_type in (State.INIT, State.RESET):
-      tor_pid = controller.get_pid(None)
-      tor_cmd = system.get_name_by_pid(tor_pid) if tor_pid else None
+    with self._daemon_lock:
+      if not self._halt and event_type in (State.INIT, State.RESET):
+        tor_pid = controller.get_pid(None)
+        tor_cmd = system.get_name_by_pid(tor_pid) if tor_pid else None
 
-      if tor_cmd is None:
-        tor_cmd = 'tor'
+        if tor_cmd is None:
+          tor_cmd = 'tor'
 
-      self._process_name = tor_cmd
-      self._process_pid = tor_pid
+        self._process_name = tor_cmd
+        self._process_pid = tor_pid
 
 
-class ConnectionResolver(Daemon):
+class ConnectionTracker(Daemon):
   """
   Periodically retrieves the connections established by tor.
   """
@@ -210,7 +214,7 @@ class ConnectionResolver(Daemon):
     self._failure_count = 0
     self._rate_too_low_count = 0
 
-  def task(self):
+  def _task(self):
     if self._custom_resolver:
       resolver = self._custom_resolver
       is_default_resolver = False
@@ -325,7 +329,6 @@ class ResourceTracker(Daemon):
     self._last_cpu_total = 0
 
     self.last_lookup = -1
-    self._val_lock = threading.RLock()
 
     # sequential times we've failed with this method of resolution
     self._failure_count = 0
@@ -336,7 +339,7 @@ class ResourceTracker(Daemon):
     (cpuUsage_sampling, cpuUsage_avg, memUsage_bytes, memUsage_percent)
     """
 
-    with self._val_lock:
+    with self._daemon_lock:
       return self._last_sample if self._last_sample else Resources(0.0, 0.0, 0, 0.0)
 
   def last_query_failed(self):
@@ -347,7 +350,7 @@ class ResourceTracker(Daemon):
 
     return self._failure_count != 0
 
-  def task(self):
+  def _task(self):
     if self._procss_pid is None:
       return
 





More information about the tor-commits mailing list