[stem/master] Implementation of DescriptorReader concurrency

commit 2d620558fa0885c532e8759c497881146a89ba0a Author: Damian Johnson <atagar@torproject.org> Date: Sat Mar 10 18:09:52 2012 -0800 Implementation of DescriptorReader concurrency After much hair pulling figured out a relatively producer/consumer simple model for this class. It should be trivial to add stop() later, but making this re-runable would greatly complicate the class and probably isn't worth it. This isn't yet working, but this is a decent breaking point. --- stem/descriptor/reader.py | 114 ++++++++++++++++++++++++++++++++++++--------- 1 files changed, 92 insertions(+), 22 deletions(-) diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py index b0c8f9a..8bdb5b4 100644 --- a/stem/descriptor/reader.py +++ b/stem/descriptor/reader.py @@ -52,6 +52,7 @@ save_processed_files - Saves a listing of processed files. DescriptorReader - Iterator for descriptor data on the local file system. |- get_processed_files - provides the listing of files that we've processed |- set_processed_files - sets our tracking of the files we have processed + |- register_skip_listener - adds a listener that's notified of skipped files |- start - begins reading descriptor data |- stop - stops reading descriptor data |- join - joins on the thread used to process descriptor data @@ -64,6 +65,17 @@ import threading import mimetypes import Queue +# TODO: Unimplemented concurrency features... +# - stop() +# - restarting when __iter__ is called additional times +# - maximum read-ahead + +# Maximum number of descriptors that we'll read ahead before waiting for our +# caller to fetch some of them. This is included to avoid unbounded memory +# usage. This condition will be removed if set to zero. + +MAX_STORED_DESCRIPTORS = 20 + def load_processed_files(path): """ Loads a dictionary of 'path => last modified timestamp' mappings, as @@ -136,10 +148,28 @@ class DescriptorReader(threading.Thread): text files, tarball archives (gzip or bzip2), or recurse directories. """ - def __init__(self, targets): - self.targets = targets - self.skip_listeners = [] - self.processed_files = {} + def __init__(self, targets, follow_links = False): + threading.Thread.__init__(self, name="Descriptor Reader") + self.setDaemon(True) + + # flag to indicate if we'll follow symlinks when transversing directories + self._follow_links = follow_links + + self._targets = targets + self._skip_listeners = [] + self._processed_files = {} + + self._iter_lock = threading.RLock() + self._iter_notice = threading.Event() + + # targets that remain to be read in this iteration + self._remaining_files = list(self._targets) + + # descriptors that we have read, but not yet provided to the caller + self._unreturned_descriptors = Queue.Queue() + + # TODO: implement + # flag that's set when we're done self._stop_event = threading.Event() def get_processed_files(self): @@ -156,7 +186,7 @@ class DescriptorReader(threading.Thread): files we have processed """ - return dict(self.processed_files) + return dict(self._processed_files) def set_processed_files(self, processed_files): """ @@ -169,7 +199,7 @@ class DescriptorReader(threading.Thread): timestamps for the last modified time (int) """ - self.processed_files = dict(processed_files) + self._processed_files = dict(processed_files) def register_skip_listener(self, listener): """ @@ -184,7 +214,7 @@ class DescriptorReader(threading.Thread): valid descriptor data """ - self.skip_listeners.append(listener) + self._skip_listeners.append(listener) def stop(self): """ @@ -192,23 +222,63 @@ class DescriptorReader(threading.Thread): """ self._stop_event.set() + self._iter_notice.set() def run(self): - # os.walk(path, followlinks = True) - # - # >>> mimetypes.guess_type("/home/atagar/Desktop/control-spec.txt") - # ('text/plain', None) - # - # >>> mimetypes.guess_type("/home/atagar/Desktop/server-descriptors-2012-03.tar.bz2") - # ('application/x-tar', 'bzip2') - # - # This only checks the file extension. To actually check the content (like - # the 'file' command) an option would be pymagic... - # https://github.com/cloudburst/pymagic - - - while not self._stop_event.isSet(): - pass # TODO: implement + while self._remaining_files: + target = self._remaining_files.pop(0) + if not os.path.exists(target): continue + + if os.path.isdir(target): + # adds all of the files that it contains + for root, _, files in os.walk(target, followlinks = self._follow_links): + for filename in files: + self._remaining_files.append(os.path.join(root, filename)) + else: + # This is a file. Register it's last modified timestamp and check if + # it's a file that we should skip. + + last_modified = os.stat(target).st_mtime + last_used = self._processed_files.get(target) + + if last_used and last_used >= last_modified: + continue + else: + self._processed_files[target] = last_modified + + # The mimetypes module only checks the file extension. To actually + # check the content (like the 'file' command) we'd need something like + # pymagic (https://github.com/cloudburst/pymagic). + + target_type = mimetypes.guess_type(target) + + if target_type[0] in (None, 'text/plain'): + # if either a '.txt' or unknown type then try to process it as a + # descriptor file + + with open(target) as target_file: + # TODO: replace with actual descriptor parsing when we have it + self._unreturned_descriptors.put(target_file.read()) + self._iter_notice.set() + elif target_type[0] == 'application/x-tar': + if target_type[1] == 'gzip': + pass # TODO: implement + elif target_type[1] == 'bzip2': + pass # TODO: implement + + # TODO: bug: __iter__ should finish with the _unreturned_descriptors + # contents. Could be fixed by adding a 'is done reading' event. + self._stop_event.set() + self._iter_notice.set() + + def __iter__(self): + with self._iter_lock: + while not self._stop_event.isSet(): + try: + yield self._unreturned_descriptors.get_nowait() + except Queue.Empty: + self._iter_notice.wait() + self._iter_notice.clear() def _notify_skip_listener(self, path, exception): for listener in self.skip_listeners:
participants (1)
-
atagar@torproject.org