[tor-commits] [stem/master] Revising implementation details for processed files

atagar at torproject.org atagar at torproject.org
Mon Mar 26 00:10:01 UTC 2012


commit d93d894a3339216b7dc1791ea8c009cd280db7ff
Author: Damian Johnson <atagar at torproject.org>
Date:   Sun Mar 4 11:24:24 2012 -0800

    Revising implementation details for processed files
    
    Like metrics-lib, the DescriptorReader will track the last modified timestamp
    for the descriptor files we have processed. Revising the pydocs for it and
    adding untested functions to save and load. Next step is to add integ tests.
---
 stem/descriptor/reader.py |  125 ++++++++++++++++++++++++++++++++++++--------
 1 files changed, 102 insertions(+), 23 deletions(-)

diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index b6404f3..6bb3112 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -8,14 +8,11 @@ Example:
   ]
   
   reader = DescriptorReader(my_descriptors)
-  reader.start()
   
-  # prints all of the descriptor contents
-  for descriptor in reader:
-    print descriptor
-  
-  reader.stop()
-  reader.join()
+  with reader:
+    # prints all of the descriptor contents
+    for descriptor in reader:
+      print descriptor
 """
 
 import os
@@ -23,11 +20,96 @@ import theading
 import mimetypes
 import Queue
 
+def load_processed_files(path):
+  """
+  Loads a dictionary of 'path => last modified timestamp' mappings, as
+  persisted by save_processed_files(), from a file.
+  
+  Arguments:
+    path (str) - location to load the processed files dictionary from
+  
+  Returns:
+    dict of 'path (str) => last modified timestamp (int)' mappings
+  
+  Raises:
+    IOError if unable to read the file
+    TypeError if unable to parse the file's contents
+  """
+  
+  processed_files = {}
+  
+  with open(path) as input_file:
+    for line in input_file.readlines():
+      line = line.strip()
+      
+      if not line: continue # skip blank lines
+      
+      if not " " in line:
+        raise TypeError("Malformed line: %s" % line)
+      
+      path, timestamp = line.rsplit(" ", 1)
+      
+      if not os.path.isabs(path):
+        raise TypeError("'%s' is not an absolute path" % path)
+      elif not timestamp.isdigit():
+        raise TypeError("'%s' is not an integer timestamp" % timestamp)
+      
+      processed_files[path] = int(timestamp)
+  
+  return processed_files
+
+def save_processed_files(processed_files, path):
+  """
+  Persists a dictionary of 'path => last modified timestamp' mappings (as
+  provided by the DescriptorReader's get_processed_files() method) so that they
+  can be loaded later and applied to another DescriptorReader.
+  
+  Arguments:
+    processed_files (dict) - 'path => last modified' mappings
+    path (str)             - location to save the processed files dictionary to
+  
+  Raises:
+    IOError if unable to write to the file
+    TypeError if processed_files is of the wrong type
+  """
+  
+  # makes the parent directory if it doesn't already exist
+  try:
+    path_dir = os.path.dirname(path)
+    if not os.path.exists(path_dir): os.makedirs(path_dir)
+  except OSError, exc: raise IOError(exc)
+  
+  with open(path, "w") as output_file:
+    for path, timestamp in processed_files.items():
+      output_file.write("%s %i" % (path, timestamp))
+
 class DescriptorReader(threading.Thread):
   """
   Iterator for the descriptor data on the local file system. This can process
   text files, tarball archives (gzip or bzip2), or recurse directories.
   
+  This keeps track the last modified timestamps for descriptor files we have
+  used, and if you call restart() then this will only provide descriptors from
+  new files or files that have changed since them.
+  
+  You can also save this listing of processed files and later apply it another
+  DescriptorReader. For instance, to only print the descriptors that have
+  changed since the last ran...
+  
+    reader = DescriptorReader(["/tmp/descriptor_data"])
+    
+    try:
+      processed_files = load_processed_files("/tmp/used_descriptors")
+      reader.set_processed_files(processed_files)
+    except: pass # could not load, mabye this is the first run
+    
+    # only prints descriptors that have changed since we last ran
+    with reader:
+      for descriptor in reader:
+        print descriptor
+    
+    save_processed_files(reader.get_processed_files(), "/tmp/used_descriptors")
+  
   This ignores files that cannot be processed (either due to read errors or
   because they don't contain descriptor data). The caller can be notified of
   files that are skipped by restering a listener with register_skip_listener().
@@ -37,13 +119,14 @@ class DescriptorReader(threading.Thread):
     self.targets = targets
     self.skip_listeners = []
     self.processed_files = {}
+    self._stop_event = threading.Event()
   
   def stop(self):
     """
     Stops further reading of descriptors.
     """
     
-    pass # TODO: implement
+    self._stop_event.set()
   
   def get_processed_files(self):
     """
@@ -59,7 +142,7 @@ class DescriptorReader(threading.Thread):
       files we have processed
     """
     
-    return self.processed_files
+    return dict(self.processed_files)
   
   def set_processed_files(self, processed_files):
     """
@@ -67,19 +150,6 @@ class DescriptorReader(threading.Thread):
     listing of processed files. With the get_processed_files() method this can
     be used to skip descriptors that we have already read. For instance...
     
-    # gets the initial descriptors
-    reader = DescriptorReader(["/tmp/descriptor_data"])
-    
-    with reader:
-      initial_descriptors = list(reader)
-      processed_files = reader.get_processed_files()
-    
-    # only gets the descriptors that have changed since we last checked
-    reader = DescriptorReader(["/tmp/descriptor_data"])
-    reader.set_processed_files(processed_files)
-    
-    with reader:
-      new_descriptors = list(reader)
     
     Arguments:
       processed_files (dict) - mapping of absolute paths (str) to unix
@@ -104,7 +174,16 @@ class DescriptorReader(threading.Thread):
     self.skip_listeners.append(listener)
   
   def run(self):
-    pass # TODO: implement
+    # os.walk(path, followlinks = True)
+    #
+    # >>> mimetypes.guess_type("/home/atagar/Desktop/control-spec.txt")
+    # ('text/plain', None)
+    #
+    # >>> mimetypes.guess_type("/home/atagar/Desktop/server-descriptors-2012-03.tar.bz2")
+    # ('application/x-tar', 'bzip2')
+    
+    while not self._stop_event.isSet():
+      pass # TODO: implement
   
   def _notify_skip_listener(self, path, exception):
     for listener in self.skip_listeners:





More information about the tor-commits mailing list