
commit 932ded67f961c67c515750bd6e1c9ddb735f8559 Author: Damian Johnson <atagar@torproject.org> Date: Sat Mar 10 23:30:28 2012 -0800 Fix and test for DescriptorReader stop() method Adding an integraion test and some fixes for the stop() method of the DescriptorReader class. --- stem/descriptor/reader.py | 19 +++++++++++-------- test/integ/descriptor/reader.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py index 0784ce4..c3bf1aa 100644 --- a/stem/descriptor/reader.py +++ b/stem/descriptor/reader.py @@ -66,7 +66,6 @@ import mimetypes import Queue # TODO: Unimplemented concurrency features... -# - stop() # - restarting when __iter__ is called additional times # - maximum read-ahead @@ -162,15 +161,16 @@ class DescriptorReader(threading.Thread): self._iter_lock = threading.RLock() self._iter_notice = threading.Event() - # targets that remain to be read in this iteration + # files or directories that remain to be read self._remaining_files = list(self._targets) # descriptors that we have read, but not yet provided to the caller self._unreturned_descriptors = Queue.Queue() - # TODO: implement - # flag that's set when we're done - self._stop_event = threading.Event() + # flag that's set when stop() is called + self._stop_called = threading.Event() + + # flag that's set when we're out of descriptor files to read self._finished_reading = threading.Event() def get_processed_files(self): @@ -222,11 +222,11 @@ class DescriptorReader(threading.Thread): Stops further reading of descriptor files. """ - self._stop_event.set() + self._stop_called.set() self._iter_notice.set() def run(self): - while self._remaining_files: + while self._remaining_files and not self._stop_called.isSet(): target = self._remaining_files.pop(0) if not os.path.exists(target): continue @@ -235,6 +235,9 @@ class DescriptorReader(threading.Thread): for root, _, files in os.walk(target, followlinks = self._follow_links): for filename in files: self._remaining_files.append(os.path.join(root, filename)) + + # this can take a while if, say, we're including the root directory + if self._stop_called.isSet(): break else: # This is a file. Register it's last modified timestamp and check if # it's a file that we should skip. @@ -272,7 +275,7 @@ class DescriptorReader(threading.Thread): def __iter__(self): with self._iter_lock: - while not self._stop_event.isSet(): + while not self._stop_called.isSet(): try: yield self._unreturned_descriptors.get_nowait() except Queue.Empty: diff --git a/test/integ/descriptor/reader.py b/test/integ/descriptor/reader.py index ca77e26..5fa61b8 100644 --- a/test/integ/descriptor/reader.py +++ b/test/integ/descriptor/reader.py @@ -3,6 +3,8 @@ Integration tests for stem.descriptor.reader. """ import os +import time +import signal import unittest import stem.descriptor.reader @@ -134,4 +136,30 @@ class TestDescriptorReader(unittest.TestCase): # check that we've seen all of the descriptor_entries self.assertTrue(len(descriptor_entries) == 0) + + def test_stop(self): + """ + Runs a DescriptorReader over the root directory, then checks that calling + stop() makes it terminate in a timely fashion. + """ + + is_test_running = True + reader = stem.descriptor.reader.DescriptorReader(["/"]) + + # Fails the test after a couple seconds if we don't finish successfully. + # Depending on what we're blocked on this might not work when the test + # fails, requiring that we give a manual kill to the test. + + def timeout_handler(signum, frame): + if is_test_running: + self.fail() + + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(2) + + reader.start() + time.sleep(0.1) + reader.stop() + reader.join() + is_test_running = False