commit fbaa098f4731a6d3bd2713033efc4bfbf62b913a Author: Damian Johnson atagar@torproject.org Date: Mon Oct 28 11:45:32 2013 -0700
Better thread safety for tracker operations
Resolving tor's pid and process name under a daemon lock, which is also used for the tracker tasks (where it's used). --- arm/util/tracker.py | 61 +++++++++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 29 deletions(-)
diff --git a/arm/util/tracker.py b/arm/util/tracker.py index 8e93e58..647acee 100644 --- a/arm/util/tracker.py +++ b/arm/util/tracker.py @@ -3,7 +3,7 @@ Background tasks for gathering informatino about the tor process.
::
- get_connection_tracker - provides a ConnectionResolver for our tor process + get_connection_tracker - provides a ConnectionTracker for our tor process get_resource_tracker - provides a ResourceTracker for our tor process
Daemon - common parent for resolvers @@ -13,7 +13,7 @@ Background tasks for gathering informatino about the tor process. |- set_paused - pauses or continues work +- stop - stops further work by the daemon
- ConnectionResolver - periodically checks the connections established by tor + ConnectionTracker - periodically checks the connections established by tor |- get_custom_resolver - provide the custom conntion resolver we're using |- set_custom_resolver - overwrites automatic resolver selecion with a custom resolver +- get_connections - provides our latest connection results @@ -32,8 +32,8 @@ import arm.util.torTools from stem.control import State from stem.util import conf, connection, log, proc, str_tools, system
-CONFIG = conf.config_dict("arm", { - "queries.resourceUsage.rate": 5, +CONFIG = conf.config_dict('arm', { + 'queries.resourceUsage.rate': 5, 'queries.connections.minRate': 5, 'msg.unable_to_use_resolver': '', 'msg.unable_to_use_all_resolvers': '', @@ -64,7 +64,7 @@ def get_connection_tracker(): global CONNECTION_TRACKER
if CONNECTION_TRACKER is None: - CONNECTION_TRACKER = ConnectionResolver() + CONNECTION_TRACKER = ConnectionTracker()
return CONNECTION_TRACKER
@@ -91,6 +91,10 @@ class Daemon(threading.Thread): threading.Thread.__init__(self) self.daemon = True
+ # PID and process name of the tor process we're tracking. These should only + # be used under the daemon's lock. + + self._daemon_lock = threading.RLock() self._process_name = None self._process_pid = None
@@ -99,8 +103,8 @@ class Daemon(threading.Thread): self._run_counter = 0 # counter for the number of successful runs
self._is_paused = False + self._pause_condition = threading.Condition() self._halt = False # terminates thread if true - self._cond = threading.Condition() # used for pausing the thread
controller = arm.util.torTools.getConn().controller controller.add_status_listener(self._tor_status_listener) @@ -113,24 +117,24 @@ class Daemon(threading.Thread): if self._is_paused or time_since_last_ran < self._rate: sleep_duration = max(0.2, self._rate - time_since_last_ran)
- self._cond.acquire() - if not self._halt: - self._cond.wait(sleep_duration) - self._cond.release() + with self._pause_condition: + if not self._halt: + self._pause_condition.wait(sleep_duration)
continue # done waiting, try again
- is_successful = self.task() + with self._daemon_lock: + is_successful = self._task()
if is_successful: self._run_counter += 1
self._last_ran = time.time()
- def task(self): + def _task(self): """ Task the resolver is meant to perform. This should be implemented by - subclasses. + subclasses. This is executed under the daemon's lock. """
pass @@ -175,24 +179,24 @@ class Daemon(threading.Thread): Halts further work and terminates the thread. """
- self._cond.acquire() - self._halt = True - self._cond.notifyAll() - self._cond.release() + with self._pause_condition: + self._halt = True + self._pause_condition.notifyAll()
def _tor_status_listener(self, controller, event_type, _): - if event_type in (State.INIT, State.RESET): - tor_pid = controller.get_pid(None) - tor_cmd = system.get_name_by_pid(tor_pid) if tor_pid else None + with self._daemon_lock: + if not self._halt and event_type in (State.INIT, State.RESET): + tor_pid = controller.get_pid(None) + tor_cmd = system.get_name_by_pid(tor_pid) if tor_pid else None
- if tor_cmd is None: - tor_cmd = 'tor' + if tor_cmd is None: + tor_cmd = 'tor'
- self._process_name = tor_cmd - self._process_pid = tor_pid + self._process_name = tor_cmd + self._process_pid = tor_pid
-class ConnectionResolver(Daemon): +class ConnectionTracker(Daemon): """ Periodically retrieves the connections established by tor. """ @@ -210,7 +214,7 @@ class ConnectionResolver(Daemon): self._failure_count = 0 self._rate_too_low_count = 0
- def task(self): + def _task(self): if self._custom_resolver: resolver = self._custom_resolver is_default_resolver = False @@ -325,7 +329,6 @@ class ResourceTracker(Daemon): self._last_cpu_total = 0
self.last_lookup = -1 - self._val_lock = threading.RLock()
# sequential times we've failed with this method of resolution self._failure_count = 0 @@ -336,7 +339,7 @@ class ResourceTracker(Daemon): (cpuUsage_sampling, cpuUsage_avg, memUsage_bytes, memUsage_percent) """
- with self._val_lock: + with self._daemon_lock: return self._last_sample if self._last_sample else Resources(0.0, 0.0, 0, 0.0)
def last_query_failed(self): @@ -347,7 +350,7 @@ class ResourceTracker(Daemon):
return self._failure_count != 0
- def task(self): + def _task(self): if self._procss_pid is None: return
tor-commits@lists.torproject.org