commit 6c07fe554ef6a7d40b196d314101153eacfb41af Author: Damian Johnson atagar@torproject.org Date: Fri Dec 27 15:13:44 2019 -0800
CollecTor publication time filtering
When I first wrote this module I played fast and lose with the 'start' and 'end' parameters, guessing relevance purely based on filenames.
When Karsten added timestamps to the index he better defined the relevant timestamp to be a descriptor's publication, which everything except microdescriptors contains.
Interestingly, archives can contain publications both before and after its filename date. For example...
recent/relay-descriptors/server-descriptors/2019-12-27-22-04-59-server-descriptors
Old filename derived timestamps: start: 2019-12-27 22:04:59 end: 2019-12-27 23:04:59
Index's publication timpestamps: start: 2019-12-27 20:30:00 end: 2019-12-27 22:45:00
If the file was created at 22:04 how does it contain something published at 22:45?
Regardless, now that the index contains publication times for our purposes filenames dates are moot. Our 'start' and 'end' arguments provide the subset of archives that reside within the given publication range.
For example, the following downloads descriptors that were published up to two hours ago...
recent = datetime.datetime.utcnow() - datetime.timedelta(minutes = 120) descriptors = stem.descriptor.collector.get_server_descriptors(start = recent)
If we make this more sophisticated we can demonstrate how many descriptors we pull from each archive...
import datetime import stem.descriptor.collector
collector = stem.descriptor.collector.get_instance() recent = datetime.datetime.utcnow() - datetime.timedelta(minutes = 120)
# This effectively does the same thing as get_server_descriptors(), # but in a way we can also determine the full counts.
for f in collector.files('server-descriptor', start = recent): all_desc = list(f.read()) recent_desc = list(f.read(start = recent))
print('%s (%s => %s)' % (f.path, f.start, f.end)) print(' %i of %i descriptors were published recently' % (len(recent_desc), len(all_desc)))
# Download them again, but through our more prevalently used # get_server_descriptors() method.
print('\nIn total there are %i server descriptors published recently' % len(list(collector.get_server_descriptors(start = recent))))
----------------------------------------------------------------------
% python demo.py
recent/relay-descriptors/server-descriptors/2019-12-27-21-04-59-server-descriptors (2019-12-27 17:59:00 => 2019-12-27 22:13:00) 3 of 817 descriptors were published recently recent/relay-descriptors/server-descriptors/2019-12-27-22-04-59-server-descriptors (2019-12-27 20:30:00 => 2019-12-27 22:45:00) 297 of 776 descriptors were published recently recent/relay-descriptors/server-descriptors/2019-12-27-23-04-59-server-descriptors (2019-12-27 21:49:00 => 2019-12-27 23:01:00) 800 of 800 descriptors were published recently
In total there are 1100 server descriptors published recently --- stem/descriptor/collector.py | 86 ++++++++++++++++++++++---------------- test/integ/descriptor/collector.py | 32 +++++++------- test/unit/descriptor/collector.py | 1 + 3 files changed, 69 insertions(+), 50 deletions(-)
diff --git a/stem/descriptor/collector.py b/stem/descriptor/collector.py index 05826c49..3ee0e1aa 100644 --- a/stem/descriptor/collector.py +++ b/stem/descriptor/collector.py @@ -181,10 +181,10 @@ class File(object): :var int size: size of the file :var str sha256: file's sha256 checksum
- :var datetime start: beginning of the time range descriptors are for, - **None** if this cannot be determined - :var datetime end: ending of the time range descriptors are for, - **None** if this cannot be determined + :var datetime start: first publication within the file, **None** if this + cannot be determined + :var datetime end: last publication within the file, **None** if this cannot + be determined :var datetime last_modified: when the file was last modified """
@@ -206,7 +206,7 @@ class File(object): else: self.start, self.end = File._guess_time_range(path)
- def read(self, directory = None, descriptor_type = None, document_handler = DocumentHandler.ENTRIES, timeout = None, retries = 3): + def read(self, directory = None, descriptor_type = None, start = None, end = None, document_handler = DocumentHandler.ENTRIES, timeout = None, retries = 3): """ Provides descriptors from this archive. Descriptors are downloaded or read from disk as follows... @@ -229,6 +229,8 @@ class File(object): :param str descriptor_type: `descriptor type https://metrics.torproject.org/collector.html#data-formats`_, this is guessed if not provided + :param datetime.datetime start: publication time to begin with + :param datetime.datetime end: publication time to end with :param stem.descriptor.__init__.DocumentHandler document_handler: method in which to parse a :class:`~stem.descriptor.networkstatus.NetworkStatusDocument` :param int timeout: timeout when connection becomes idle, no timeout @@ -267,7 +269,7 @@ class File(object):
tmp_directory = tempfile.mkdtemp()
- for desc in self.read(tmp_directory, descriptor_type, document_handler, timeout, retries): + for desc in self.read(tmp_directory, descriptor_type, start, end, document_handler, timeout, retries): yield desc
shutil.rmtree(tmp_directory) @@ -281,6 +283,17 @@ class File(object):
for desc in stem.descriptor.parse_file(path, document_handler = document_handler): if descriptor_type is None or descriptor_type.startswith(desc.type_annotation().name): + # TODO: This can filter server and extrainfo times, but other + # descriptor types may use other attribute names. + + published = getattr(desc, 'published', None) + + if published: + if start and published < start: + continue + elif end and published > end: + continue + yield desc
def download(self, directory, decompress = True, timeout = None, retries = 3, overwrite = False): @@ -405,8 +418,8 @@ class CollecTor(object): Provides server descriptors published during the given time range, sorted oldest to newest.
- :param datetime.datetime start: time range to begin with - :param datetime.datetime end: time range to end with + :param datetime.datetime start: publication time to begin with + :param datetime.datetime end: publication time to end with :param str cache_to: directory to cache archives into, if an archive is available here it is not downloaded :param bool bridge: standard descriptors if **False**, bridge if **True** @@ -424,7 +437,7 @@ class CollecTor(object): desc_type = 'server-descriptor' if not bridge else 'bridge-server-descriptor'
for f in self.files(desc_type, start, end): - for desc in f.read(cache_to, desc_type, timeout = timeout, retries = retries): + for desc in f.read(cache_to, desc_type, start, end, timeout = timeout, retries = retries): yield desc
def get_extrainfo_descriptors(self, start = None, end = None, cache_to = None, bridge = False, timeout = None, retries = 3): @@ -432,8 +445,8 @@ class CollecTor(object): Provides extrainfo descriptors published during the given time range, sorted oldest to newest.
- :param datetime.datetime start: time range to begin with - :param datetime.datetime end: time range to end with + :param datetime.datetime start: publication time to begin with + :param datetime.datetime end: publication time to end with :param str cache_to: directory to cache archives into, if an archive is available here it is not downloaded :param bool bridge: standard descriptors if **False**, bridge if **True** @@ -451,13 +464,13 @@ class CollecTor(object): desc_type = 'extra-info' if not bridge else 'bridge-extra-info'
for f in self.files(desc_type, start, end): - for desc in f.read(cache_to, desc_type, timeout = timeout, retries = retries): + for desc in f.read(cache_to, desc_type, start, end, timeout = timeout, retries = retries): yield desc
def get_microdescriptors(self, start = None, end = None, cache_to = None, timeout = None, retries = 3): """ - Provides microdescriptors published during the given time range, - sorted oldest to newest. Unlike server/extrainfo descriptors, + Provides microdescriptors estimated to be published during the given time + range, sorted oldest to newest. Unlike server/extrainfo descriptors, microdescriptors change very infrequently...
:: @@ -466,10 +479,11 @@ class CollecTor(object): about once per week." -dir-spec section 3.3
CollecTor archives only contain microdescriptors that *change*, so hourly - tarballs often contain very few. + tarballs often contain very few. Microdescriptors also do not contain + their publication timestamp, so this is estimated.
- :param datetime.datetime start: time range to begin with - :param datetime.datetime end: time range to end with + :param datetime.datetime start: publication time to begin with + :param datetime.datetime end: publication time to end with :param str cache_to: directory to cache archives into, if an archive is available here it is not downloaded :param int timeout: timeout for downloading each individual archive when @@ -484,7 +498,7 @@ class CollecTor(object): """
for f in self.files('microdescriptor', start, end): - for desc in f.read(cache_to, 'microdescriptor', timeout = timeout, retries = retries): + for desc in f.read(cache_to, 'microdescriptor', start, end, timeout = timeout, retries = retries): yield desc
def get_consensus(self, start = None, end = None, cache_to = None, document_handler = DocumentHandler.ENTRIES, version = 3, microdescriptor = False, bridge = False, timeout = None, retries = 3): @@ -492,8 +506,8 @@ class CollecTor(object): Provides consensus router status entries published during the given time range, sorted oldest to newest.
- :param datetime.datetime start: time range to begin with - :param datetime.datetime end: time range to end with + :param datetime.datetime start: publication time to begin with + :param datetime.datetime end: publication time to end with :param str cache_to: directory to cache archives into, if an archive is available here it is not downloaded :param stem.descriptor.__init__.DocumentHandler document_handler: method in @@ -528,7 +542,7 @@ class CollecTor(object): raise ValueError('Only v2 and v3 router status entries are available (not version %s)' % version)
for f in self.files(desc_type, start, end): - for desc in f.read(cache_to, desc_type, document_handler, timeout = timeout, retries = retries): + for desc in f.read(cache_to, desc_type, start, end, document_handler, timeout = timeout, retries = retries): yield desc
def get_key_certificates(self, start = None, end = None, cache_to = None, timeout = None, retries = 3): @@ -536,8 +550,8 @@ class CollecTor(object): Directory authority key certificates for the given time range, sorted oldest to newest.
- :param datetime.datetime start: time range to begin with - :param datetime.datetime end: time range to end with + :param datetime.datetime start: publication time to begin with + :param datetime.datetime end: publication time to end with :param str cache_to: directory to cache archives into, if an archive is available here it is not downloaded :param int timeout: timeout for downloading each individual archive when @@ -552,7 +566,7 @@ class CollecTor(object): """
for f in self.files('dir-key-certificate-3', start, end): - for desc in f.read(cache_to, 'dir-key-certificate-3', timeout = timeout, retries = retries): + for desc in f.read(cache_to, 'dir-key-certificate-3', start, end, timeout = timeout, retries = retries): yield desc
def get_bandwidth_files(self, start = None, end = None, cache_to = None, timeout = None, retries = 3): @@ -560,8 +574,8 @@ class CollecTor(object): Bandwidth authority heuristics for the given time range, sorted oldest to newest.
- :param datetime.datetime start: time range to begin with - :param datetime.datetime end: time range to end with + :param datetime.datetime start: publication time to begin with + :param datetime.datetime end: publication time to end with :param str cache_to: directory to cache archives into, if an archive is available here it is not downloaded :param int timeout: timeout for downloading each individual archive when @@ -576,7 +590,7 @@ class CollecTor(object): """
for f in self.files('bandwidth-file', start, end): - for desc in f.read(cache_to, 'bandwidth-file', timeout = timeout, retries = retries): + for desc in f.read(cache_to, 'bandwidth-file', start, end, timeout = timeout, retries = retries): yield desc
def get_exit_lists(self, start = None, end = None, cache_to = None, timeout = None, retries = 3): @@ -584,8 +598,8 @@ class CollecTor(object): `TorDNSEL exit lists https://www.torproject.org/projects/tordnsel.html.en`_ for the given time range, sorted oldest to newest.
- :param datetime.datetime start: time range to begin with - :param datetime.datetime end: time range to end with + :param datetime.datetime start: publication time to begin with + :param datetime.datetime end: publication time to end with :param str cache_to: directory to cache archives into, if an archive is available here it is not downloaded :param int timeout: timeout for downloading each individual archive when @@ -600,7 +614,7 @@ class CollecTor(object): """
for f in self.files('tordnsel', start, end): - for desc in f.read(cache_to, 'tordnsel', timeout = timeout, retries = retries): + for desc in f.read(cache_to, 'tordnsel', start, end, timeout = timeout, retries = retries): yield desc
def index(self, compression = 'best'): @@ -643,8 +657,8 @@ class CollecTor(object): Provides files CollecTor presently has, sorted oldest to newest.
:param str descriptor_type: descriptor type or prefix to retrieve - :param datetime.datetime start: time range to begin with - :param datetime.datetime end: time range to end with + :param datetime.datetime start: publication time to begin with + :param datetime.datetime end: publication time to end with
:returns: **list** of :class:`~stem.descriptor.collector.File`
@@ -662,10 +676,10 @@ class CollecTor(object): matches = []
for f in self._cached_files: - if start and (f.start is None or f.start < start): - continue - elif end and (f.end is None or f.end > end): - continue + if start and (f.end is None or f.end < start): + continue # only contains descriptors before time range + elif end and (f.start is None or f.start > end): + continue # only contains descriptors after time range
if descriptor_type is None or any([desc_type.startswith(descriptor_type) for desc_type in f.types]): matches.append(f) diff --git a/test/integ/descriptor/collector.py b/test/integ/descriptor/collector.py index ac33490d..3af25c29 100644 --- a/test/integ/descriptor/collector.py +++ b/test/integ/descriptor/collector.py @@ -12,7 +12,11 @@ import stem.descriptor.collector
from stem.descriptor import Compression
-RECENT = datetime.datetime.utcnow() - datetime.timedelta(minutes = 60) +# The latest hour may or may not be published, so testing against a time range +# a little back. + +START = datetime.datetime.utcnow() - datetime.timedelta(minutes = 180) +END = datetime.datetime.utcnow() - datetime.timedelta(minutes = 120)
class TestCollector(unittest.TestCase): @@ -39,23 +43,23 @@ class TestCollector(unittest.TestCase): @test.require.only_run_once @test.require.online def test_downloading_server_descriptors(self): - recent_descriptors = list(stem.descriptor.collector.get_server_descriptors(start = RECENT)) + recent_descriptors = list(stem.descriptor.collector.get_server_descriptors(start = START, end = END))
- if not (300 < len(recent_descriptors) < 800): - self.fail('Downloaded %i descriptors, expected 300-800' % len(recent_descriptors)) # 584 on 8/5/19 + if not (400 < len(recent_descriptors) < 1200): + self.fail('Downloaded %i descriptors, expected 400-1200' % len(recent_descriptors)) # 803 on 12/27/19
@test.require.only_run_once @test.require.online def test_downloading_extrainfo_descriptors(self): - recent_descriptors = list(stem.descriptor.collector.get_extrainfo_descriptors(start = RECENT)) + recent_descriptors = list(stem.descriptor.collector.get_extrainfo_descriptors(start = START, end = END))
- if not (300 < len(recent_descriptors) < 800): - self.fail('Downloaded %i descriptors, expected 300-800' % len(recent_descriptors)) # 583 on 8/7/19 + if not (400 < len(recent_descriptors) < 1200): + self.fail('Downloaded %i descriptors, expected 400-1200' % len(recent_descriptors)) # 803 on 12/27/19
@test.require.only_run_once @test.require.online def test_downloading_microdescriptors(self): - recent_descriptors = list(stem.descriptor.collector.get_microdescriptors(start = RECENT)) + recent_descriptors = list(stem.descriptor.collector.get_microdescriptors(start = START, end = END))
if not (10 < len(recent_descriptors) < 100): self.fail('Downloaded %i descriptors, expected 10-100' % len(recent_descriptors)) # 23 on 8/7/19 @@ -63,18 +67,18 @@ class TestCollector(unittest.TestCase): @test.require.only_run_once @test.require.online def test_downloading_consensus_v3(self): - recent_descriptors = list(stem.descriptor.collector.get_consensus(start = RECENT)) + recent_descriptors = list(stem.descriptor.collector.get_consensus(start = START, end = END))
- if not (3000 < len(recent_descriptors) < 10000): - self.fail('Downloaded %i descriptors, expected 3000-10000' % len(recent_descriptors)) # 6554 on 8/10/19 + if not (100 < len(recent_descriptors) < 500): + self.fail('Downloaded %i descriptors, expected 100-500' % len(recent_descriptors)) # 316 on 12/27/19
@test.require.only_run_once @test.require.online def test_downloading_consensus_micro(self): - recent_descriptors = list(stem.descriptor.collector.get_consensus(start = RECENT, microdescriptor = True)) + recent_descriptors = list(stem.descriptor.collector.get_consensus(start = START, end = END, microdescriptor = True))
- if not (3000 < len(recent_descriptors) < 10000): - self.fail('Downloaded %i descriptors, expected 3000-10000' % len(recent_descriptors)) + if not (100 < len(recent_descriptors) < 500): + self.fail('Downloaded %i descriptors, expected 100-500' % len(recent_descriptors)) # 316 on 12/27/19
def test_downloading_consensus_invalid_type(self): test_values = ( diff --git a/test/unit/descriptor/collector.py b/test/unit/descriptor/collector.py index 7d80d572..b4ff7636 100644 --- a/test/unit/descriptor/collector.py +++ b/test/unit/descriptor/collector.py @@ -181,6 +181,7 @@ class TestCollector(unittest.TestCase): ], [f.path for f in collector.files(descriptor_type = 'server-descriptor', end = datetime.datetime(2007, 1, 1))])
self.assertEqual([ + 'archive/relay-descriptors/server-descriptors/server-descriptors-2006-02.tar.xz', 'archive/relay-descriptors/server-descriptors/server-descriptors-2006-03.tar.xz', ], [f.path for f in collector.files(descriptor_type = 'server-descriptor', start = datetime.datetime(2006, 2, 10), end = datetime.datetime(2007, 1, 1))])