[tor-commits] [stem/master] Implementation of DescriptorReader concurrency

atagar at torproject.org atagar at torproject.org
Mon Mar 26 00:10:01 UTC 2012


commit 2d620558fa0885c532e8759c497881146a89ba0a
Author: Damian Johnson <atagar at torproject.org>
Date:   Sat Mar 10 18:09:52 2012 -0800

    Implementation of DescriptorReader concurrency
    
    After much hair pulling figured out a relatively producer/consumer simple model
    for this class. It should be trivial to add stop() later, but making this
    re-runable would greatly complicate the class and probably isn't worth it.
    
    This isn't yet working, but this is a decent breaking point.
---
 stem/descriptor/reader.py |  114 ++++++++++++++++++++++++++++++++++++---------
 1 files changed, 92 insertions(+), 22 deletions(-)

diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index b0c8f9a..8bdb5b4 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -52,6 +52,7 @@ save_processed_files - Saves a listing of processed files.
 DescriptorReader - Iterator for descriptor data on the local file system.
   |- get_processed_files - provides the listing of files that we've processed
   |- set_processed_files - sets our tracking of the files we have processed
+  |- register_skip_listener - adds a listener that's notified of skipped files
   |- start - begins reading descriptor data
   |- stop - stops reading descriptor data
   |- join - joins on the thread used to process descriptor data
@@ -64,6 +65,17 @@ import threading
 import mimetypes
 import Queue
 
+# TODO: Unimplemented concurrency features...
+# - stop()
+# - restarting when __iter__ is called additional times
+# - maximum read-ahead
+
+# Maximum number of descriptors that we'll read ahead before waiting for our
+# caller to fetch some of them. This is included to avoid unbounded memory
+# usage. This condition will be removed if set to zero.
+
+MAX_STORED_DESCRIPTORS = 20
+
 def load_processed_files(path):
   """
   Loads a dictionary of 'path => last modified timestamp' mappings, as
@@ -136,10 +148,28 @@ class DescriptorReader(threading.Thread):
   text files, tarball archives (gzip or bzip2), or recurse directories.
   """
   
-  def __init__(self, targets):
-    self.targets = targets
-    self.skip_listeners = []
-    self.processed_files = {}
+  def __init__(self, targets, follow_links = False):
+    threading.Thread.__init__(self, name="Descriptor Reader")
+    self.setDaemon(True)
+    
+    # flag to indicate if we'll follow symlinks when transversing directories
+    self._follow_links = follow_links
+    
+    self._targets = targets
+    self._skip_listeners = []
+    self._processed_files = {}
+    
+    self._iter_lock = threading.RLock()
+    self._iter_notice = threading.Event()
+    
+    # targets that remain to be read in this iteration
+    self._remaining_files = list(self._targets)
+    
+    # descriptors that we have read, but not yet provided to the caller
+    self._unreturned_descriptors = Queue.Queue()
+    
+    # TODO: implement
+    # flag that's set when we're done
     self._stop_event = threading.Event()
   
   def get_processed_files(self):
@@ -156,7 +186,7 @@ class DescriptorReader(threading.Thread):
       files we have processed
     """
     
-    return dict(self.processed_files)
+    return dict(self._processed_files)
   
   def set_processed_files(self, processed_files):
     """
@@ -169,7 +199,7 @@ class DescriptorReader(threading.Thread):
                                timestamps for the last modified time (int)
     """
     
-    self.processed_files = dict(processed_files)
+    self._processed_files = dict(processed_files)
   
   def register_skip_listener(self, listener):
     """
@@ -184,7 +214,7 @@ class DescriptorReader(threading.Thread):
                            valid descriptor data
     """
     
-    self.skip_listeners.append(listener)
+    self._skip_listeners.append(listener)
   
   def stop(self):
     """
@@ -192,23 +222,63 @@ class DescriptorReader(threading.Thread):
     """
     
     self._stop_event.set()
+    self._iter_notice.set()
   
   def run(self):
-    # os.walk(path, followlinks = True)
-    #
-    # >>> mimetypes.guess_type("/home/atagar/Desktop/control-spec.txt")
-    # ('text/plain', None)
-    #
-    # >>> mimetypes.guess_type("/home/atagar/Desktop/server-descriptors-2012-03.tar.bz2")
-    # ('application/x-tar', 'bzip2')
-    #
-    # This only checks the file extension. To actually check the content (like
-    # the 'file' command) an option would be pymagic...
-    # https://github.com/cloudburst/pymagic
-    
-    
-    while not self._stop_event.isSet():
-      pass # TODO: implement
+    while self._remaining_files:
+      target = self._remaining_files.pop(0)
+      if not os.path.exists(target): continue
+      
+      if os.path.isdir(target):
+        # adds all of the files that it contains
+        for root, _, files in os.walk(target, followlinks = self._follow_links):
+          for filename in files:
+            self._remaining_files.append(os.path.join(root, filename))
+      else:
+        # This is a file. Register it's last modified timestamp and check if
+        # it's a file that we should skip.
+        
+        last_modified = os.stat(target).st_mtime
+        last_used = self._processed_files.get(target)
+        
+        if last_used and last_used >= last_modified:
+          continue
+        else:
+          self._processed_files[target] = last_modified
+        
+        # The mimetypes module only checks the file extension. To actually
+        # check the content (like the 'file' command) we'd need something like
+        # pymagic (https://github.com/cloudburst/pymagic).
+        
+        target_type = mimetypes.guess_type(target)
+        
+        if target_type[0] in (None, 'text/plain'):
+          # if either a '.txt' or unknown type then try to process it as a
+          # descriptor file
+          
+          with open(target) as target_file:
+            # TODO: replace with actual descriptor parsing when we have it
+            self._unreturned_descriptors.put(target_file.read())
+            self._iter_notice.set()
+        elif target_type[0] == 'application/x-tar':
+          if target_type[1] == 'gzip':
+            pass # TODO: implement
+          elif target_type[1] == 'bzip2':
+            pass # TODO: implement
+    
+    # TODO: bug: __iter__ should finish with the _unreturned_descriptors
+    # contents. Could be fixed by adding a 'is done reading' event.
+    self._stop_event.set()
+    self._iter_notice.set()
+  
+  def __iter__(self):
+    with self._iter_lock:
+      while not self._stop_event.isSet():
+        try:
+          yield self._unreturned_descriptors.get_nowait()
+        except Queue.Empty:
+          self._iter_notice.wait()
+          self._iter_notice.clear()
   
   def _notify_skip_listener(self, path, exception):
     for listener in self.skip_listeners:





More information about the tor-commits mailing list