
commit 664552e72719776f17cd654928dbf5d0a12e48d9 Author: Damian Johnson <atagar@torproject.org> Date: Sun Mar 11 13:41:48 2012 -0700 Fixing sinister concurrency issue Replacing the _is_reading event flag with a 'FINISHED' entry in the _unreturned_descriptors queue. This is because python's queues stupidly have no notion of flushing, so there's no method for me to make a reliable check of 'if the reading thread is finished AND the queue is empty'. I may have called 'put'. I may have a proveably not-empty queue. But can I make that check work? Nooooo. That is... frustrating. >:( --- stem/descriptor/reader.py | 25 ++++++++++++++----------- 1 files changed, 14 insertions(+), 11 deletions(-) diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py index a8f3284..9be6e8f 100644 --- a/stem/descriptor/reader.py +++ b/stem/descriptor/reader.py @@ -81,6 +81,9 @@ import Queue MAX_STORED_DESCRIPTORS = 20 +# flag to indicate when the reader thread is out of descriptor files to read +FINISHED = "DONE" + def load_processed_files(path): """ Loads a dictionary of 'path => last modified timestamp' mappings, as @@ -169,10 +172,11 @@ class DescriptorReader(threading.Thread): self._iter_lock = threading.RLock() self._iter_notice = threading.Event() - self._is_reading = threading.Event() self._is_stopped = threading.Event() - # descriptors that we have read, but not yet provided to the caller + # Descriptors that we have read but not yet provided to the caller. A + # FINISHED entry is used by the reading thread to indicate the end. + self._unreturned_descriptors = Queue.Queue() def get_processed_files(self): @@ -229,10 +233,9 @@ class DescriptorReader(threading.Thread): self._iter_notice.set() def run(self): - self._is_reading.set() remaining_files = list(self._targets) - while remaining_files and not self._is_stopped.isSet(): + while remaining_files and not self._is_stopped.is_set(): target = remaining_files.pop(0) if not os.path.exists(target): continue @@ -243,7 +246,7 @@ class DescriptorReader(threading.Thread): remaining_files.append(os.path.join(root, filename)) # this can take a while if, say, we're including the root directory - if self._is_stopped.isSet(): break + if self._is_stopped.is_set(): break else: # This is a file. Register it's last modified timestamp and check if # it's a file that we should skip. @@ -277,18 +280,18 @@ class DescriptorReader(threading.Thread): elif target_type[1] == 'bzip2': pass # TODO: implement - self._is_reading.clear() + self._unreturned_descriptors.put(FINISHED) self._iter_notice.set() def __iter__(self): with self._iter_lock: - while not self._is_stopped.isSet(): + while not self._is_stopped.is_set(): try: - yield self._unreturned_descriptors.get_nowait() - except Queue.Empty: - # if we've finished and there aren't any descriptors then we're done - if not self._is_reading.isSet() and self._unreturned_descriptors.empty(): break + descriptor = self._unreturned_descriptors.get_nowait() + if descriptor == FINISHED: break + else: yield descriptor + except Queue.Empty: self._iter_notice.wait() self._iter_notice.clear()