[tor-commits] [stem/master] Allowing the DescriptorReader to be run multiple times

atagar at torproject.org atagar at torproject.org
Mon Mar 26 00:10:01 UTC 2012


commit ca837ddcb6f7cc6d118d4e1a774d2c4b22dbda79
Author: Damian Johnson <atagar at torproject.org>
Date:   Mon Mar 12 08:52:17 2012 -0700

    Allowing the DescriptorReader to be run multiple times
    
    Improving the usability of the DescriptorReader class by making it so callers
    can reuse instance multiple times to get descriptor changes since the last run.
---
 stem/descriptor/reader.py       |   59 ++++++++++++++++++++++++--------------
 test/integ/descriptor/reader.py |   26 ++++++++++++++++-
 2 files changed, 62 insertions(+), 23 deletions(-)

diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index 9ef6781..27557da 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -33,15 +33,15 @@ again...
     reader.set_processed_files(processed_files)
   except: pass # could not load, mabye this is the first run
   
-  with reader:
-    start_time = time.time()
-    
-    while time.time() - start_time < 60:
-      # prints any descriptors that have changed since last checked
+  start_time = time.time()
+  
+  while time.time() - start_time < 60:
+    # prints any descriptors that have changed since last checked
+    with reader:
       for descriptor in reader:
         print descriptor
-      
-      time.sleep(1)
+    
+    time.sleep(1)
   
   save_processed_files(reader.get_processed_files(), "/tmp/used_descriptors")
 
@@ -55,7 +55,6 @@ DescriptorReader - Iterator for descriptor data on the local file system.
   |- register_skip_listener - adds a listener that's notified of skipped files
   |- start - begins reading descriptor data
   |- stop - stops reading descriptor data
-  |- join - joins on the thread used to process descriptor data
   |- __enter__ / __exit__ - manages the descriptor reader thread in the context
   +- __iter__ - iterates over descriptor data in unread files
 
@@ -71,14 +70,9 @@ import threading
 import mimetypes
 import Queue
 
-# TODO: Unimplemented concurrency features...
-# - restarting when __iter__ is called additional times
-# - maximum read-ahead
-
 # TODO: Remianing impementation items...
-# - impelment skip listening and add a test for it
-# - remove start and join methods from header?
 # - implement gzip and bz2 reading
+# - maximum read-ahead
 
 # Maximum number of descriptors that we'll read ahead before waiting for our
 # caller to fetch some of them. This is included to avoid unbounded memory
@@ -182,7 +176,7 @@ def save_processed_files(processed_files, path):
       
       output_file.write("%s %i\n" % (path, timestamp))
 
-class DescriptorReader(threading.Thread):
+class DescriptorReader:
   """
   Iterator for the descriptor data on the local file system. This can process
   text files, tarball archives (gzip or bzip2), or recurse directories.
@@ -194,17 +188,19 @@ class DescriptorReader(threading.Thread):
   """
   
   def __init__(self, targets, follow_links = False):
-    threading.Thread.__init__(self, name="Descriptor Reader")
-    self.setDaemon(True)
-    
     self._targets = targets
     self._follow_links = follow_links
     self._skip_listeners = []
     self._processed_files = {}
     
+    self._reader_thread = None
+    self._reader_thread_lock = threading.RLock()
+    
     self._iter_lock = threading.RLock()
     self._iter_notice = threading.Event()
+    
     self._is_stopped = threading.Event()
+    self._is_stopped.set()
     
     # Descriptors that we have read but not yet provided to the caller. A
     # FINISHED entry is used by the reading thread to indicate the end.
@@ -256,15 +252,35 @@ class DescriptorReader(threading.Thread):
     
     self._skip_listeners.append(listener)
   
+  def start(self):
+    """
+    Starts reading our descriptor files.
+    
+    Raises:
+      ValueError if we're already reading the descriptor files
+    """
+    
+    with self._reader_thread_lock:
+      if self._reader_thread:
+        raise ValueError("Already running, you need to call stop() first")
+      else:
+        self._is_stopped.clear()
+        self._reader_thread = threading.Thread(target = self._read_descriptor_files, name="Descriptor Reader")
+        self._reader_thread.setDaemon(True)
+        self._reader_thread.start()
+  
   def stop(self):
     """
     Stops further reading of descriptor files.
     """
     
-    self._is_stopped.set()
-    self._iter_notice.set()
+    with self._reader_thread_lock:
+      self._is_stopped.set()
+      self._iter_notice.set()
+      self._reader_thread.join()
+      self._reader_thread = None
   
-  def run(self):
+  def _read_descriptor_files(self):
     remaining_files = list(self._targets)
     
     while remaining_files and not self._is_stopped.is_set():
@@ -349,5 +365,4 @@ class DescriptorReader(threading.Thread):
   
   def __exit__(self, type, value, traceback):
     self.stop()
-    self.join()
 
diff --git a/test/integ/descriptor/reader.py b/test/integ/descriptor/reader.py
index a6190ef..ded3d2d 100644
--- a/test/integ/descriptor/reader.py
+++ b/test/integ/descriptor/reader.py
@@ -148,6 +148,31 @@ class TestDescriptorReader(unittest.TestCase):
       # check that we've seen all of the descriptor_entries
       self.assertTrue(len(remaining_entries) == 0)
   
+  def test_multiple_runs(self):
+    """
+    Runs a DescriptorReader instance multiple times over the same content,
+    making sure that it can be used repeatedly.
+    """
+    
+    descriptor_path = os.path.join(DESCRIPTOR_TEST_DATA, "example_descriptor")
+    reader = stem.descriptor.reader.DescriptorReader([descriptor_path])
+    
+    with reader:
+      self.assertEquals(len(list(reader)), 1)
+    
+    # run it a second time, this shouldn't provide any descriptors because we
+    # have already read it
+    
+    with reader:
+      self.assertEquals(len(list(reader)), 0)
+    
+    # clear the DescriptorReader's memory of seeing the file and run it again
+    
+    reader.set_processed_files([])
+    
+    with reader:
+      self.assertEquals(len(list(reader)), 1)
+  
   def test_stop(self):
     """
     Runs a DescriptorReader over the root directory, then checks that calling
@@ -171,7 +196,6 @@ class TestDescriptorReader(unittest.TestCase):
     reader.start()
     time.sleep(0.1)
     reader.stop()
-    reader.join()
     is_test_running = False
   
   def test_get_processed_files(self):





More information about the tor-commits mailing list