[stem/master] Adding tarball support to the DescriptorReader

commit 828d5dac1c0eda9db996438af45515c0dec0cef0 Author: Damian Johnson <atagar@torproject.org> Date: Mon Mar 12 20:28:16 2012 -0700 Adding tarball support to the DescriptorReader Adding support for reading directly from tarballs (which is how metrics are commonly fetched). This supports all forms of compression that the tarfile module does (gzip and bz2 among other). Including some tests and archives to read against. --- stem/descriptor/reader.py | 25 +++++--- test/integ/descriptor/data/descriptor_archive.tar | Bin 0 -> 20480 bytes .../descriptor/data/descriptor_archive.tar.bz2 | Bin 0 -> 3322 bytes .../descriptor/data/descriptor_archive.tar.gz | Bin 0 -> 2844 bytes test/integ/descriptor/reader.py | 59 +++++++++++++++++++- 5 files changed, 74 insertions(+), 10 deletions(-) diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py index 3b6e04a..198605b 100644 --- a/stem/descriptor/reader.py +++ b/stem/descriptor/reader.py @@ -67,6 +67,7 @@ FileSkipped - Base exception for a file that was skipped. """ import os +import tarfile import threading import mimetypes import Queue @@ -328,10 +329,9 @@ class DescriptorReader: if target_type[0] in (None, 'text/plain'): # either '.txt' or an unknown type self._handle_descriptor_file(target) - elif target_type == ('application/x-tar', 'gzip'): - self._handle_archive_gzip(target) - elif target_type == ('application/x-tar', 'bzip2'): - self._handle_archive_gzip(target) + elif tarfile.is_tarfile(target): + # handles gzip, bz2, and decompressed tarballs among others + self._handle_archive(target) else: self._notify_skip_listeners(target, UnrecognizedType(target_type)) @@ -355,15 +355,22 @@ class DescriptorReader: # TODO: replace with actual descriptor parsing when we have it target_file = open(target) self._enqueue_descriptor(target_file.read()) + target_file.close() + self._iter_notice.set() except IOError, exc: self._notify_skip_listeners(target, ReadFailed(exc)) - def _handle_archive_gzip(self, target): - pass # TODO: implement - - def _handle_archive_bzip(self, target): - pass # TODO: implement + def _handle_archive(self, target): + with tarfile.open(target) as tar_file: + for tar_entry in tar_file: + if tar_entry.isfile(): + # TODO: replace with actual descriptor parsing when we have it + entry = tar_file.extractfile(tar_entry) + self._enqueue_descriptor(entry.read()) + entry.close() + + self._iter_notice.set() def _enqueue_descriptor(self, descriptor): # blocks until their is either room for the descriptor or we're stopped diff --git a/test/integ/descriptor/data/descriptor_archive.tar b/test/integ/descriptor/data/descriptor_archive.tar new file mode 100644 index 0000000..2c40716 Binary files /dev/null and b/test/integ/descriptor/data/descriptor_archive.tar differ diff --git a/test/integ/descriptor/data/descriptor_archive.tar.bz2 b/test/integ/descriptor/data/descriptor_archive.tar.bz2 new file mode 100644 index 0000000..ba1f239 Binary files /dev/null and b/test/integ/descriptor/data/descriptor_archive.tar.bz2 differ diff --git a/test/integ/descriptor/data/descriptor_archive.tar.gz b/test/integ/descriptor/data/descriptor_archive.tar.gz new file mode 100644 index 0000000..63a6a57 Binary files /dev/null and b/test/integ/descriptor/data/descriptor_archive.tar.gz differ diff --git a/test/integ/descriptor/reader.py b/test/integ/descriptor/reader.py index d071db6..aa204e9 100644 --- a/test/integ/descriptor/reader.py +++ b/test/integ/descriptor/reader.py @@ -6,6 +6,7 @@ import os import sys import time import signal +import tarfile import unittest import stem.descriptor.reader @@ -20,6 +21,8 @@ BASIC_LISTING = """ my_dir = os.path.dirname(__file__) DESCRIPTOR_TEST_DATA = os.path.join(my_dir, "data") +TAR_DESCRIPTORS = [] + def _get_processed_files_path(): return os.path.join(test.runner.get_runner().get_test_dir(), "descriptor_processed_files") @@ -37,6 +40,21 @@ def _make_processed_files_listing(contents): return test_listing_path +def _get_raw_tar_descriptors(): + global TAR_DESCRIPTORS + + if not TAR_DESCRIPTORS: + test_path = os.path.join(DESCRIPTOR_TEST_DATA, "descriptor_archive.tar") + + with tarfile.open(test_path) as tar_file: + for tar_entry in tar_file: + if tar_entry.isfile(): + entry = tar_file.extractfile(tar_entry) + TAR_DESCRIPTORS.append(entry.read()) + entry.close() + + return TAR_DESCRIPTORS + class SkipListener: def __init__(self): self.results = [] # (path, exception) tuples that we've received @@ -133,7 +151,7 @@ class TestDescriptorReader(unittest.TestCase): # running this test multiple times to flush out concurrency issues for i in xrange(15): - reader = stem.descriptor.reader.DescriptorReader([DESCRIPTOR_TEST_DATA]) + reader = stem.descriptor.reader.DescriptorReader([descriptor_path]) remaining_entries = list(descriptor_entries) with reader: @@ -174,6 +192,45 @@ class TestDescriptorReader(unittest.TestCase): with reader: self.assertEquals(1, len(list(reader))) + def test_archived_uncompressed(self): + """ + Checks that we can read descriptors from an uncompressed archive. + """ + + expected_results = _get_raw_tar_descriptors() + test_path = os.path.join(DESCRIPTOR_TEST_DATA, "descriptor_archive.tar") + reader = stem.descriptor.reader.DescriptorReader([test_path]) + + with reader: + read_descriptors = [str(desc) for desc in list(reader)] + self.assertEquals(expected_results, read_descriptors) + + def test_archived_gzip(self): + """ + Checks that we can read descriptors from a gzipped archive. + """ + + expected_results = _get_raw_tar_descriptors() + test_path = os.path.join(DESCRIPTOR_TEST_DATA, "descriptor_archive.tar.gz") + reader = stem.descriptor.reader.DescriptorReader([test_path]) + + with reader: + read_descriptors = [str(desc) for desc in list(reader)] + self.assertEquals(expected_results, read_descriptors) + + def test_archived_bz2(self): + """ + Checks that we can read descriptors from an bzipped archive. + """ + + expected_results = _get_raw_tar_descriptors() + test_path = os.path.join(DESCRIPTOR_TEST_DATA, "descriptor_archive.tar.bz2") + reader = stem.descriptor.reader.DescriptorReader([test_path]) + + with reader: + read_descriptors = [str(desc) for desc in list(reader)] + self.assertEquals(expected_results, read_descriptors) + def test_stop(self): """ Runs a DescriptorReader over the root directory, then checks that calling
participants (1)
-
atagar@torproject.org