[stem/master] Allowing the DescriptorReader to be run multiple times

commit ca837ddcb6f7cc6d118d4e1a774d2c4b22dbda79 Author: Damian Johnson <atagar@torproject.org> Date: Mon Mar 12 08:52:17 2012 -0700 Allowing the DescriptorReader to be run multiple times Improving the usability of the DescriptorReader class by making it so callers can reuse instance multiple times to get descriptor changes since the last run. --- stem/descriptor/reader.py | 59 ++++++++++++++++++++++++-------------- test/integ/descriptor/reader.py | 26 ++++++++++++++++- 2 files changed, 62 insertions(+), 23 deletions(-) diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py index 9ef6781..27557da 100644 --- a/stem/descriptor/reader.py +++ b/stem/descriptor/reader.py @@ -33,15 +33,15 @@ again... reader.set_processed_files(processed_files) except: pass # could not load, mabye this is the first run - with reader: - start_time = time.time() - - while time.time() - start_time < 60: - # prints any descriptors that have changed since last checked + start_time = time.time() + + while time.time() - start_time < 60: + # prints any descriptors that have changed since last checked + with reader: for descriptor in reader: print descriptor - - time.sleep(1) + + time.sleep(1) save_processed_files(reader.get_processed_files(), "/tmp/used_descriptors") @@ -55,7 +55,6 @@ DescriptorReader - Iterator for descriptor data on the local file system. |- register_skip_listener - adds a listener that's notified of skipped files |- start - begins reading descriptor data |- stop - stops reading descriptor data - |- join - joins on the thread used to process descriptor data |- __enter__ / __exit__ - manages the descriptor reader thread in the context +- __iter__ - iterates over descriptor data in unread files @@ -71,14 +70,9 @@ import threading import mimetypes import Queue -# TODO: Unimplemented concurrency features... -# - restarting when __iter__ is called additional times -# - maximum read-ahead - # TODO: Remianing impementation items... -# - impelment skip listening and add a test for it -# - remove start and join methods from header? # - implement gzip and bz2 reading +# - maximum read-ahead # Maximum number of descriptors that we'll read ahead before waiting for our # caller to fetch some of them. This is included to avoid unbounded memory @@ -182,7 +176,7 @@ def save_processed_files(processed_files, path): output_file.write("%s %i\n" % (path, timestamp)) -class DescriptorReader(threading.Thread): +class DescriptorReader: """ Iterator for the descriptor data on the local file system. This can process text files, tarball archives (gzip or bzip2), or recurse directories. @@ -194,17 +188,19 @@ class DescriptorReader(threading.Thread): """ def __init__(self, targets, follow_links = False): - threading.Thread.__init__(self, name="Descriptor Reader") - self.setDaemon(True) - self._targets = targets self._follow_links = follow_links self._skip_listeners = [] self._processed_files = {} + self._reader_thread = None + self._reader_thread_lock = threading.RLock() + self._iter_lock = threading.RLock() self._iter_notice = threading.Event() + self._is_stopped = threading.Event() + self._is_stopped.set() # Descriptors that we have read but not yet provided to the caller. A # FINISHED entry is used by the reading thread to indicate the end. @@ -256,15 +252,35 @@ class DescriptorReader(threading.Thread): self._skip_listeners.append(listener) + def start(self): + """ + Starts reading our descriptor files. + + Raises: + ValueError if we're already reading the descriptor files + """ + + with self._reader_thread_lock: + if self._reader_thread: + raise ValueError("Already running, you need to call stop() first") + else: + self._is_stopped.clear() + self._reader_thread = threading.Thread(target = self._read_descriptor_files, name="Descriptor Reader") + self._reader_thread.setDaemon(True) + self._reader_thread.start() + def stop(self): """ Stops further reading of descriptor files. """ - self._is_stopped.set() - self._iter_notice.set() + with self._reader_thread_lock: + self._is_stopped.set() + self._iter_notice.set() + self._reader_thread.join() + self._reader_thread = None - def run(self): + def _read_descriptor_files(self): remaining_files = list(self._targets) while remaining_files and not self._is_stopped.is_set(): @@ -349,5 +365,4 @@ class DescriptorReader(threading.Thread): def __exit__(self, type, value, traceback): self.stop() - self.join() diff --git a/test/integ/descriptor/reader.py b/test/integ/descriptor/reader.py index a6190ef..ded3d2d 100644 --- a/test/integ/descriptor/reader.py +++ b/test/integ/descriptor/reader.py @@ -148,6 +148,31 @@ class TestDescriptorReader(unittest.TestCase): # check that we've seen all of the descriptor_entries self.assertTrue(len(remaining_entries) == 0) + def test_multiple_runs(self): + """ + Runs a DescriptorReader instance multiple times over the same content, + making sure that it can be used repeatedly. + """ + + descriptor_path = os.path.join(DESCRIPTOR_TEST_DATA, "example_descriptor") + reader = stem.descriptor.reader.DescriptorReader([descriptor_path]) + + with reader: + self.assertEquals(len(list(reader)), 1) + + # run it a second time, this shouldn't provide any descriptors because we + # have already read it + + with reader: + self.assertEquals(len(list(reader)), 0) + + # clear the DescriptorReader's memory of seeing the file and run it again + + reader.set_processed_files([]) + + with reader: + self.assertEquals(len(list(reader)), 1) + def test_stop(self): """ Runs a DescriptorReader over the root directory, then checks that calling @@ -171,7 +196,6 @@ class TestDescriptorReader(unittest.TestCase): reader.start() time.sleep(0.1) reader.stop() - reader.join() is_test_running = False def test_get_processed_files(self):
participants (1)
-
atagar@torproject.org