commit 56d0105a50809f7e272f3d0d1f3d2b25eca2d498 Author: Damian Johnson atagar@torproject.org Date: Tue Jun 14 09:13:20 2011 -0700
fix: race condition for multiple status changes
When an event caused multiple changes in Tor's status we'd run into a race condition and often pick the wrong final state. For instance, if a sighup caused Tor to crash (for instance via a bad torrc) then we'd trigger both a reset and closed event.
This enqueues events to keep the order in which they occured, expands locks to include them, and introduces a small delay to coles events if they occure in quick succession like this. --- src/util/torTools.py | 77 +++++++++++++++++++++++++++++++++++++------------ 1 files changed, 58 insertions(+), 19 deletions(-)
diff --git a/src/util/torTools.py b/src/util/torTools.py index 5a47175..8c18a9d 100644 --- a/src/util/torTools.py +++ b/src/util/torTools.py @@ -9,6 +9,7 @@ import time import socket import thread import threading +import Queue
from TorCtl import TorCtl, TorUtil
@@ -373,6 +374,15 @@ class Controller(TorCtl.PostEventListener): self._statusTime = 0 # unix time-stamp for the duration of the status self.lastHeartbeat = 0 # time of the last tor event
+ # Status signaling for when tor starts, stops, or is reset is done via + # enquing the signal then spawning a handler thread. This is to provide + # safety in race conditions, for instance if we sighup with a torrc that + # causes tor to crash then we'll get both an INIT and CLOSED signal. It's + # important in those cases that listeners get the correct signal last (in + # that case CLOSED) so they aren't confused about what tor's current state + # is. + self._notificationQueue = Queue.Queue() + self._exitPolicyChecker = None self._isExitingAllowed = False self._exitPolicyLookupCache = {} # mappings of ip/port tuples to if they were accepted by the policy or not @@ -440,14 +450,15 @@ class Controller(TorCtl.PostEventListener): # are dropped with a logged warning) self.setControllerEvents(self.controllerEvents)
- self.connLock.release() - self._status = State.INIT self._statusTime = time.time()
# notifies listeners that a new controller is available if not NO_SPAWN: - thread.start_new_thread(self._notifyStatusListeners, (State.INIT,)) + self._notificationQueue.put(State.INIT) + thread.start_new_thread(self._notifyStatusListeners, ()) + + self.connLock.release()
def close(self): """ @@ -458,14 +469,16 @@ class Controller(TorCtl.PostEventListener): if self.conn: self.conn.close() self.conn = None - self.connLock.release()
self._status = State.CLOSED self._statusTime = time.time()
# notifies listeners that the controller's been shut down if not NO_SPAWN: - thread.start_new_thread(self._notifyStatusListeners, (State.CLOSED,)) + self._notificationQueue.put(State.CLOSED) + thread.start_new_thread(self._notifyStatusListeners, ()) + + self.connLock.release() else: self.connLock.release()
def isAlive(self): @@ -1459,13 +1472,19 @@ class Controller(TorCtl.PostEventListener): """
if event.level == "NOTICE" and event.msg.startswith("Received reload signal (hup)"): - self._isReset = True + self.connLock.acquire()
- self._status = State.INIT - self._statusTime = time.time() + if self.isAlive(): + self._isReset = True + + self._status = State.INIT + self._statusTime = time.time() + + if not NO_SPAWN: + self._notificationQueue.put(State.INIT) + thread.start_new_thread(self._notifyStatusListeners, ())
- if not NO_SPAWN: - thread.start_new_thread(self._notifyStatusListeners, (State.INIT,)) + self.connLock.release()
def ns_event(self, event): self._updateHeartbeat() @@ -2006,7 +2025,7 @@ class Controller(TorCtl.PostEventListener): if result == None or result == UNKNOWN: return default else: return result
- def _notifyStatusListeners(self, eventType): + def _notifyStatusListeners(self): """ Sends a notice to all current listeners that a given change in tor's controller status has occurred. @@ -2015,16 +2034,36 @@ class Controller(TorCtl.PostEventListener): eventType - enum representing tor's new status """
- # resets cached GETINFO and GETCONF parameters - self._cachedParam = {} - self._cachedConf = {} + # If there's a quick race state (for instance a sighup causing both an init + # and close event) then give them a moment to enqueue. This way we can + # coles the events and discard the inaccurate one. + + time.sleep(0.2) + + self.connLock.acquire()
- # gives a notice that the control port has closed - if eventType == State.CLOSED: - log.log(CONFIG["log.torCtlPortClosed"], "Tor control port closed") + try: + eventType = self._notificationQueue.get(timeout=0) + + # checks that the notice is accurate for our current state + if self.isAlive() != (eventType == State.INIT): + eventType = None + except Queue.Empty: + eventType = None + + if eventType: + # resets cached GETINFO and GETCONF parameters + self._cachedParam = {} + self._cachedConf = {} + + # gives a notice that the control port has closed + if eventType == State.CLOSED: + log.log(CONFIG["log.torCtlPortClosed"], "Tor control port closed") + + for callback in self.statusListeners: + callback(self, eventType)
- for callback in self.statusListeners: - callback(self, eventType) + self.connLock.release()
class ExitPolicy: """
tor-commits@lists.torproject.org