[stem/master] Limiting the number of buffered descriptors

commit adbc1991fdae353825b893f5e90aee6ee882e0a4 Author: Damian Johnson <atagar@torproject.org> Date: Mon Mar 12 19:17:33 2012 -0700 Limiting the number of buffered descriptors Preventing the DescriptorReader from having unbounded memory usage by limiting the number of descriptors that we'll store before we wait for our caller to request some. This doesn't technically make our memory usage bounded since a single descriptor doesn't have a limited size, but if one descripter can trigger the OOM killer then we have a problem. :) This isn't yet tested because we only have a single descriptor in our test data (we need at least two before we can test this). Adding a todo note for now. --- stem/descriptor/reader.py | 34 ++++++++++++++++++++-------------- test/integ/descriptor/reader.py | 1 + 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py index 6026408..6af2fc6 100644 --- a/stem/descriptor/reader.py +++ b/stem/descriptor/reader.py @@ -71,16 +71,6 @@ import threading import mimetypes import Queue -# TODO: Remianing impementation items... -# - implement gzip and bz2 reading -# - maximum read-ahead - -# Maximum number of descriptors that we'll read ahead before waiting for our -# caller to fetch some of them. This is included to avoid unbounded memory -# usage. This condition will be removed if set to zero. - -MAX_STORED_DESCRIPTORS = 20 - # flag to indicate when the reader thread is out of descriptor files to read FINISHED = "DONE" @@ -189,13 +179,19 @@ class DescriptorReader: Iterator for the descriptor data on the local file system. This can process text files, tarball archives (gzip or bzip2), or recurse directories. + By default this limits the number of descriptors that we'll read ahead before + waiting for our caller to fetch some of them. This is included to avoid + unbounded memory usage. + Arguments: targets (list) - paths for files or directories to be read from follow_links (bool) - determines if we'll follow symlinks when transversing directories + buffer_size (int) - descriptors we'll buffer before waiting for some to + be read, this is unbounded if zero """ - def __init__(self, targets, follow_links = False): + def __init__(self, targets, follow_links = False, buffer_size = 100): self._targets = targets self._follow_links = follow_links self._skip_listeners = [] @@ -213,7 +209,7 @@ class DescriptorReader: # Descriptors that we have read but not yet provided to the caller. A # FINISHED entry is used by the reading thread to indicate the end. - self._unreturned_descriptors = Queue.Queue() + self._unreturned_descriptors = Queue.Queue(buffer_size) def get_processed_files(self): """ @@ -335,7 +331,7 @@ class DescriptorReader: else: self._notify_skip_listeners(target, UnrecognizedType(target_type)) - self._unreturned_descriptors.put(FINISHED) + self._enqueue_descriptor(FINISHED) self._iter_notice.set() def __iter__(self): @@ -354,7 +350,7 @@ class DescriptorReader: try: # TODO: replace with actual descriptor parsing when we have it target_file = open(target) - self._unreturned_descriptors.put(target_file.read()) + self._enqueue_descriptor(target_file.read()) self._iter_notice.set() except IOError, exc: self._notify_skip_listeners(target, ReadFailed(exc)) @@ -365,6 +361,16 @@ class DescriptorReader: def _handle_archive_bzip(self, target): pass # TODO: implement + def _enqueue_descriptor(self, descriptor): + # blocks until their is either room for the descriptor or we're stopped + + while True: + try: + self._unreturned_descriptors.put(descriptor, timeout = 0.1) + return + except Queue.Full: + if self._is_stopped.is_set(): return + def _notify_skip_listeners(self, path, exception): for listener in self._skip_listeners: listener(path, exception) diff --git a/test/integ/descriptor/reader.py b/test/integ/descriptor/reader.py index a2383ee..d071db6 100644 --- a/test/integ/descriptor/reader.py +++ b/test/integ/descriptor/reader.py @@ -44,6 +44,7 @@ class SkipListener: def listener(self, path, exception): self.results.append((path, exception)) +# TODO: test buffer_size when we have more descriptor examples class TestDescriptorReader(unittest.TestCase): def tearDown(self): # cleans up 'processed file' listings that we made
participants (1)
-
atagar@torproject.org