[tor-commits] [stem/master] CollecTor publication time filtering

atagar at torproject.org atagar at torproject.org
Fri Dec 27 23:50:23 UTC 2019


commit 6c07fe554ef6a7d40b196d314101153eacfb41af
Author: Damian Johnson <atagar at torproject.org>
Date:   Fri Dec 27 15:13:44 2019 -0800

    CollecTor publication time filtering
    
    When I first wrote this module I played fast and lose with the 'start' and
    'end' parameters, guessing relevance purely based on filenames.
    
    When Karsten added timestamps to the index he better defined the relevant
    timestamp to be a descriptor's publication, which everything except
    microdescriptors contains.
    
    Interestingly, archives can contain publications both before and after its
    filename date. For example...
    
      recent/relay-descriptors/server-descriptors/2019-12-27-22-04-59-server-descriptors
    
      Old filename derived timestamps:
        start: 2019-12-27 22:04:59
        end: 2019-12-27 23:04:59
    
      Index's publication timpestamps:
        start: 2019-12-27 20:30:00
        end: 2019-12-27 22:45:00
    
    If the file was created at 22:04 how does it contain something published at
    22:45?
    
    Regardless, now that the index contains publication times for our purposes
    filenames dates are moot. Our 'start' and 'end' arguments provide the subset
    of archives that reside within the given publication range.
    
    For example, the following downloads descriptors that were published up to two
    hours ago...
    
      recent = datetime.datetime.utcnow() - datetime.timedelta(minutes = 120)
      descriptors = stem.descriptor.collector.get_server_descriptors(start = recent)
    
    If we make this more sophisticated we can demonstrate how many
    descriptors we pull from each archive...
    
      import datetime
      import stem.descriptor.collector
    
      collector = stem.descriptor.collector.get_instance()
      recent = datetime.datetime.utcnow() - datetime.timedelta(minutes = 120)
    
      # This effectively does the same thing as get_server_descriptors(),
      # but in a way we can also determine the full counts.
    
      for f in collector.files('server-descriptor', start = recent):
        all_desc = list(f.read())
        recent_desc = list(f.read(start = recent))
    
        print('%s (%s => %s)' % (f.path, f.start, f.end))
        print('  %i of %i descriptors were published recently' % (len(recent_desc), len(all_desc)))
    
      # Download them again, but through our more prevalently used
      # get_server_descriptors() method.
    
      print('\nIn total there are %i server descriptors published recently' % len(list(collector.get_server_descriptors(start = recent))))
    
      ----------------------------------------------------------------------
    
      % python demo.py
    
      recent/relay-descriptors/server-descriptors/2019-12-27-21-04-59-server-descriptors (2019-12-27 17:59:00 => 2019-12-27 22:13:00)
        3 of 817 descriptors were published recently
      recent/relay-descriptors/server-descriptors/2019-12-27-22-04-59-server-descriptors (2019-12-27 20:30:00 => 2019-12-27 22:45:00)
        297 of 776 descriptors were published recently
      recent/relay-descriptors/server-descriptors/2019-12-27-23-04-59-server-descriptors (2019-12-27 21:49:00 => 2019-12-27 23:01:00)
        800 of 800 descriptors were published recently
    
      In total there are 1100 server descriptors published recently
---
 stem/descriptor/collector.py       | 86 ++++++++++++++++++++++----------------
 test/integ/descriptor/collector.py | 32 +++++++-------
 test/unit/descriptor/collector.py  |  1 +
 3 files changed, 69 insertions(+), 50 deletions(-)

diff --git a/stem/descriptor/collector.py b/stem/descriptor/collector.py
index 05826c49..3ee0e1aa 100644
--- a/stem/descriptor/collector.py
+++ b/stem/descriptor/collector.py
@@ -181,10 +181,10 @@ class File(object):
   :var int size: size of the file
   :var str sha256: file's sha256 checksum
 
-  :var datetime start: beginning of the time range descriptors are for,
-    **None** if this cannot be determined
-  :var datetime end: ending of the time range descriptors are for,
-    **None** if this cannot be determined
+  :var datetime start: first publication within the file, **None** if this
+    cannot be determined
+  :var datetime end: last publication within the file, **None** if this cannot
+    be determined
   :var datetime last_modified: when the file was last modified
   """
 
@@ -206,7 +206,7 @@ class File(object):
     else:
       self.start, self.end = File._guess_time_range(path)
 
-  def read(self, directory = None, descriptor_type = None, document_handler = DocumentHandler.ENTRIES, timeout = None, retries = 3):
+  def read(self, directory = None, descriptor_type = None, start = None, end = None, document_handler = DocumentHandler.ENTRIES, timeout = None, retries = 3):
     """
     Provides descriptors from this archive. Descriptors are downloaded or read
     from disk as follows...
@@ -229,6 +229,8 @@ class File(object):
     :param str descriptor_type: `descriptor type
       <https://metrics.torproject.org/collector.html#data-formats>`_, this is
       guessed if not provided
+    :param datetime.datetime start: publication time to begin with
+    :param datetime.datetime end: publication time to end with
     :param stem.descriptor.__init__.DocumentHandler document_handler: method in
       which to parse a :class:`~stem.descriptor.networkstatus.NetworkStatusDocument`
     :param int timeout: timeout when connection becomes idle, no timeout
@@ -267,7 +269,7 @@ class File(object):
 
         tmp_directory = tempfile.mkdtemp()
 
-        for desc in self.read(tmp_directory, descriptor_type, document_handler, timeout, retries):
+        for desc in self.read(tmp_directory, descriptor_type, start, end, document_handler, timeout, retries):
           yield desc
 
         shutil.rmtree(tmp_directory)
@@ -281,6 +283,17 @@ class File(object):
 
     for desc in stem.descriptor.parse_file(path, document_handler = document_handler):
       if descriptor_type is None or descriptor_type.startswith(desc.type_annotation().name):
+        # TODO: This can filter server and extrainfo times, but other
+        # descriptor types may use other attribute names.
+
+        published = getattr(desc, 'published', None)
+
+        if published:
+          if start and published < start:
+            continue
+          elif end and published > end:
+            continue
+
         yield desc
 
   def download(self, directory, decompress = True, timeout = None, retries = 3, overwrite = False):
@@ -405,8 +418,8 @@ class CollecTor(object):
     Provides server descriptors published during the given time range, sorted
     oldest to newest.
 
-    :param datetime.datetime start: time range to begin with
-    :param datetime.datetime end: time range to end with
+    :param datetime.datetime start: publication time to begin with
+    :param datetime.datetime end: publication time to end with
     :param str cache_to: directory to cache archives into, if an archive is
       available here it is not downloaded
     :param bool bridge: standard descriptors if **False**, bridge if **True**
@@ -424,7 +437,7 @@ class CollecTor(object):
     desc_type = 'server-descriptor' if not bridge else 'bridge-server-descriptor'
 
     for f in self.files(desc_type, start, end):
-      for desc in f.read(cache_to, desc_type, timeout = timeout, retries = retries):
+      for desc in f.read(cache_to, desc_type, start, end, timeout = timeout, retries = retries):
         yield desc
 
   def get_extrainfo_descriptors(self, start = None, end = None, cache_to = None, bridge = False, timeout = None, retries = 3):
@@ -432,8 +445,8 @@ class CollecTor(object):
     Provides extrainfo descriptors published during the given time range,
     sorted oldest to newest.
 
-    :param datetime.datetime start: time range to begin with
-    :param datetime.datetime end: time range to end with
+    :param datetime.datetime start: publication time to begin with
+    :param datetime.datetime end: publication time to end with
     :param str cache_to: directory to cache archives into, if an archive is
       available here it is not downloaded
     :param bool bridge: standard descriptors if **False**, bridge if **True**
@@ -451,13 +464,13 @@ class CollecTor(object):
     desc_type = 'extra-info' if not bridge else 'bridge-extra-info'
 
     for f in self.files(desc_type, start, end):
-      for desc in f.read(cache_to, desc_type, timeout = timeout, retries = retries):
+      for desc in f.read(cache_to, desc_type, start, end, timeout = timeout, retries = retries):
         yield desc
 
   def get_microdescriptors(self, start = None, end = None, cache_to = None, timeout = None, retries = 3):
     """
-    Provides microdescriptors published during the given time range,
-    sorted oldest to newest. Unlike server/extrainfo descriptors,
+    Provides microdescriptors estimated to be published during the given time
+    range, sorted oldest to newest. Unlike server/extrainfo descriptors,
     microdescriptors change very infrequently...
 
     ::
@@ -466,10 +479,11 @@ class CollecTor(object):
       about once per week." -dir-spec section 3.3
 
     CollecTor archives only contain microdescriptors that *change*, so hourly
-    tarballs often contain very few.
+    tarballs often contain very few. Microdescriptors also do not contain
+    their publication timestamp, so this is estimated.
 
-    :param datetime.datetime start: time range to begin with
-    :param datetime.datetime end: time range to end with
+    :param datetime.datetime start: publication time to begin with
+    :param datetime.datetime end: publication time to end with
     :param str cache_to: directory to cache archives into, if an archive is
       available here it is not downloaded
     :param int timeout: timeout for downloading each individual archive when
@@ -484,7 +498,7 @@ class CollecTor(object):
     """
 
     for f in self.files('microdescriptor', start, end):
-      for desc in f.read(cache_to, 'microdescriptor', timeout = timeout, retries = retries):
+      for desc in f.read(cache_to, 'microdescriptor', start, end, timeout = timeout, retries = retries):
         yield desc
 
   def get_consensus(self, start = None, end = None, cache_to = None, document_handler = DocumentHandler.ENTRIES, version = 3, microdescriptor = False, bridge = False, timeout = None, retries = 3):
@@ -492,8 +506,8 @@ class CollecTor(object):
     Provides consensus router status entries published during the given time
     range, sorted oldest to newest.
 
-    :param datetime.datetime start: time range to begin with
-    :param datetime.datetime end: time range to end with
+    :param datetime.datetime start: publication time to begin with
+    :param datetime.datetime end: publication time to end with
     :param str cache_to: directory to cache archives into, if an archive is
       available here it is not downloaded
     :param stem.descriptor.__init__.DocumentHandler document_handler: method in
@@ -528,7 +542,7 @@ class CollecTor(object):
         raise ValueError('Only v2 and v3 router status entries are available (not version %s)' % version)
 
     for f in self.files(desc_type, start, end):
-      for desc in f.read(cache_to, desc_type, document_handler, timeout = timeout, retries = retries):
+      for desc in f.read(cache_to, desc_type, start, end, document_handler, timeout = timeout, retries = retries):
         yield desc
 
   def get_key_certificates(self, start = None, end = None, cache_to = None, timeout = None, retries = 3):
@@ -536,8 +550,8 @@ class CollecTor(object):
     Directory authority key certificates for the given time range,
     sorted oldest to newest.
 
-    :param datetime.datetime start: time range to begin with
-    :param datetime.datetime end: time range to end with
+    :param datetime.datetime start: publication time to begin with
+    :param datetime.datetime end: publication time to end with
     :param str cache_to: directory to cache archives into, if an archive is
       available here it is not downloaded
     :param int timeout: timeout for downloading each individual archive when
@@ -552,7 +566,7 @@ class CollecTor(object):
     """
 
     for f in self.files('dir-key-certificate-3', start, end):
-      for desc in f.read(cache_to, 'dir-key-certificate-3', timeout = timeout, retries = retries):
+      for desc in f.read(cache_to, 'dir-key-certificate-3', start, end, timeout = timeout, retries = retries):
         yield desc
 
   def get_bandwidth_files(self, start = None, end = None, cache_to = None, timeout = None, retries = 3):
@@ -560,8 +574,8 @@ class CollecTor(object):
     Bandwidth authority heuristics for the given time range, sorted oldest to
     newest.
 
-    :param datetime.datetime start: time range to begin with
-    :param datetime.datetime end: time range to end with
+    :param datetime.datetime start: publication time to begin with
+    :param datetime.datetime end: publication time to end with
     :param str cache_to: directory to cache archives into, if an archive is
       available here it is not downloaded
     :param int timeout: timeout for downloading each individual archive when
@@ -576,7 +590,7 @@ class CollecTor(object):
     """
 
     for f in self.files('bandwidth-file', start, end):
-      for desc in f.read(cache_to, 'bandwidth-file', timeout = timeout, retries = retries):
+      for desc in f.read(cache_to, 'bandwidth-file', start, end, timeout = timeout, retries = retries):
         yield desc
 
   def get_exit_lists(self, start = None, end = None, cache_to = None, timeout = None, retries = 3):
@@ -584,8 +598,8 @@ class CollecTor(object):
     `TorDNSEL exit lists <https://www.torproject.org/projects/tordnsel.html.en>`_
     for the given time range, sorted oldest to newest.
 
-    :param datetime.datetime start: time range to begin with
-    :param datetime.datetime end: time range to end with
+    :param datetime.datetime start: publication time to begin with
+    :param datetime.datetime end: publication time to end with
     :param str cache_to: directory to cache archives into, if an archive is
       available here it is not downloaded
     :param int timeout: timeout for downloading each individual archive when
@@ -600,7 +614,7 @@ class CollecTor(object):
     """
 
     for f in self.files('tordnsel', start, end):
-      for desc in f.read(cache_to, 'tordnsel', timeout = timeout, retries = retries):
+      for desc in f.read(cache_to, 'tordnsel', start, end, timeout = timeout, retries = retries):
         yield desc
 
   def index(self, compression = 'best'):
@@ -643,8 +657,8 @@ class CollecTor(object):
     Provides files CollecTor presently has, sorted oldest to newest.
 
     :param str descriptor_type: descriptor type or prefix to retrieve
-    :param datetime.datetime start: time range to begin with
-    :param datetime.datetime end: time range to end with
+    :param datetime.datetime start: publication time to begin with
+    :param datetime.datetime end: publication time to end with
 
     :returns: **list** of :class:`~stem.descriptor.collector.File`
 
@@ -662,10 +676,10 @@ class CollecTor(object):
     matches = []
 
     for f in self._cached_files:
-      if start and (f.start is None or f.start < start):
-        continue
-      elif end and (f.end is None or f.end > end):
-        continue
+      if start and (f.end is None or f.end < start):
+        continue  # only contains descriptors before time range
+      elif end and (f.start is None or f.start > end):
+        continue  # only contains descriptors after time range
 
       if descriptor_type is None or any([desc_type.startswith(descriptor_type) for desc_type in f.types]):
         matches.append(f)
diff --git a/test/integ/descriptor/collector.py b/test/integ/descriptor/collector.py
index ac33490d..3af25c29 100644
--- a/test/integ/descriptor/collector.py
+++ b/test/integ/descriptor/collector.py
@@ -12,7 +12,11 @@ import stem.descriptor.collector
 
 from stem.descriptor import Compression
 
-RECENT = datetime.datetime.utcnow() - datetime.timedelta(minutes = 60)
+# The latest hour may or may not be published, so testing against a time range
+# a little back.
+
+START = datetime.datetime.utcnow() - datetime.timedelta(minutes = 180)
+END = datetime.datetime.utcnow() - datetime.timedelta(minutes = 120)
 
 
 class TestCollector(unittest.TestCase):
@@ -39,23 +43,23 @@ class TestCollector(unittest.TestCase):
   @test.require.only_run_once
   @test.require.online
   def test_downloading_server_descriptors(self):
-    recent_descriptors = list(stem.descriptor.collector.get_server_descriptors(start = RECENT))
+    recent_descriptors = list(stem.descriptor.collector.get_server_descriptors(start = START, end = END))
 
-    if not (300 < len(recent_descriptors) < 800):
-      self.fail('Downloaded %i descriptors, expected 300-800' % len(recent_descriptors))  # 584 on 8/5/19
+    if not (400 < len(recent_descriptors) < 1200):
+      self.fail('Downloaded %i descriptors, expected 400-1200' % len(recent_descriptors))  # 803 on 12/27/19
 
   @test.require.only_run_once
   @test.require.online
   def test_downloading_extrainfo_descriptors(self):
-    recent_descriptors = list(stem.descriptor.collector.get_extrainfo_descriptors(start = RECENT))
+    recent_descriptors = list(stem.descriptor.collector.get_extrainfo_descriptors(start = START, end = END))
 
-    if not (300 < len(recent_descriptors) < 800):
-      self.fail('Downloaded %i descriptors, expected 300-800' % len(recent_descriptors))  # 583 on 8/7/19
+    if not (400 < len(recent_descriptors) < 1200):
+      self.fail('Downloaded %i descriptors, expected 400-1200' % len(recent_descriptors))  # 803 on 12/27/19
 
   @test.require.only_run_once
   @test.require.online
   def test_downloading_microdescriptors(self):
-    recent_descriptors = list(stem.descriptor.collector.get_microdescriptors(start = RECENT))
+    recent_descriptors = list(stem.descriptor.collector.get_microdescriptors(start = START, end = END))
 
     if not (10 < len(recent_descriptors) < 100):
       self.fail('Downloaded %i descriptors, expected 10-100' % len(recent_descriptors))  # 23 on 8/7/19
@@ -63,18 +67,18 @@ class TestCollector(unittest.TestCase):
   @test.require.only_run_once
   @test.require.online
   def test_downloading_consensus_v3(self):
-    recent_descriptors = list(stem.descriptor.collector.get_consensus(start = RECENT))
+    recent_descriptors = list(stem.descriptor.collector.get_consensus(start = START, end = END))
 
-    if not (3000 < len(recent_descriptors) < 10000):
-      self.fail('Downloaded %i descriptors, expected 3000-10000' % len(recent_descriptors))  # 6554 on 8/10/19
+    if not (100 < len(recent_descriptors) < 500):
+      self.fail('Downloaded %i descriptors, expected 100-500' % len(recent_descriptors))  # 316 on 12/27/19
 
   @test.require.only_run_once
   @test.require.online
   def test_downloading_consensus_micro(self):
-    recent_descriptors = list(stem.descriptor.collector.get_consensus(start = RECENT, microdescriptor = True))
+    recent_descriptors = list(stem.descriptor.collector.get_consensus(start = START, end = END, microdescriptor = True))
 
-    if not (3000 < len(recent_descriptors) < 10000):
-      self.fail('Downloaded %i descriptors, expected 3000-10000' % len(recent_descriptors))
+    if not (100 < len(recent_descriptors) < 500):
+      self.fail('Downloaded %i descriptors, expected 100-500' % len(recent_descriptors))  # 316 on 12/27/19
 
   def test_downloading_consensus_invalid_type(self):
     test_values = (
diff --git a/test/unit/descriptor/collector.py b/test/unit/descriptor/collector.py
index 7d80d572..b4ff7636 100644
--- a/test/unit/descriptor/collector.py
+++ b/test/unit/descriptor/collector.py
@@ -181,6 +181,7 @@ class TestCollector(unittest.TestCase):
     ], [f.path for f in collector.files(descriptor_type = 'server-descriptor', end = datetime.datetime(2007, 1, 1))])
 
     self.assertEqual([
+      'archive/relay-descriptors/server-descriptors/server-descriptors-2006-02.tar.xz',
       'archive/relay-descriptors/server-descriptors/server-descriptors-2006-03.tar.xz',
     ], [f.path for f in collector.files(descriptor_type = 'server-descriptor', start = datetime.datetime(2006, 2, 10), end = datetime.datetime(2007, 1, 1))])
 



More information about the tor-commits mailing list