commit 612807e3155d4a85d004995af0dc06b45b22905d Author: juga0 juga@riseup.net Date: Mon Mar 25 14:28:12 2019 +0000
fix: destination: Multiply errors by the threads
Since when a destination fails, all the threads using it will also fail that moment. For now not checking which threads are actually using it. Also lower the time to retry.
Closes: #29891. --- sbws/core/scanner.py | 19 +++++++++++++++++-- sbws/globals.py | 4 +++- sbws/lib/destination.py | 35 ++++++++++++++++++++++++----------- tests/unit/lib/test_destination.py | 12 ++++++------ 4 files changed, 50 insertions(+), 20 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index 32752ca..fdb44eb 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -29,7 +29,6 @@ from multiprocessing.dummy import Pool import time import os import logging -import requests import random
from .. import settings @@ -367,6 +366,19 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
def dispatch_worker_thread(*a, **kw): + # If at the point where the relay is actually going to be measured there + # are not any functional destinations or the `end_event` is set, do not + # try to start measuring the relay, since it will fail anyway. + try: + # a[2] is the argument `destinations` + functional_destinations = a[2].functional_destinations + # In case the arguments or the method change, catch the possible exceptions + # but ignore here that there are not destinations. + except (IndexError, TypeError): + log.debug("Wrong argument or attribute.") + functional_destinations = True + if not functional_destinations or settings.end_event.is_set(): + return None return measure_relay(*a, **kw)
@@ -530,7 +542,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
loop_tstop = time.time() loop_tdelta = (loop_tstop - loop_tstart) / 60 - log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta) + # At this point, we know the relays that were queued to be measured. + # That does not mean they were actually measured. + log.debug("Attempted to measure %s relays in %s minutes", + num_relays, loop_tdelta) # In a testing network, exit after first loop if controller.get_conf('TestingTorNetwork') == '1': log.info("In a testing network, exiting after the first loop.") diff --git a/sbws/globals.py b/sbws/globals.py index 5003973..723625d 100644 --- a/sbws/globals.py +++ b/sbws/globals.py @@ -127,7 +127,9 @@ DESTINATION_VERIFY_CERTIFICATE = True # whether the destination is functional or not. NUM_DESTINATION_ATTEMPTS_STORED = 10 # Time to wait before trying again a destination that wasn't functional. -DELTA_SECONDS_RETRY_DESTINATION = 60 * 60 * 3 +# Because intermitent failures with CDN destinations, start trying again +# after 5 min. +DELTA_SECONDS_RETRY_DESTINATION = 60 * 5 # Number of consecutive times a destination can fail before considering it # not functional. MAX_NUM_DESTINATION_FAILURES = 3 diff --git a/sbws/lib/destination.py b/sbws/lib/destination.py index 21a907c..3ccd94f 100644 --- a/sbws/lib/destination.py +++ b/sbws/lib/destination.py @@ -229,12 +229,19 @@ class Destination: the time to try again is incremented and resetted as soon as the destination does not fail. """ + # NOTE: does a destination fail because several threads are using + # it at the same time? + # If a destination fails for 1 minute and there're 3 threads, the + # 3 threads will fail. + # Failed the last X consecutive times if self._are_last_attempts_failures(): + # The log here will appear in all the the queued + # relays and threads. log.warning("The last %s times the destination %s failed." - "It will not be used again in %s hours.\n", + "Disabled for %s minutes.", self._max_num_failures, self.url, - self._delta_seconds_retry / 60 / 60) + self._delta_seconds_retry / 60) log.warning("Please, add more destinations or increment the " "number of maximum number of consecutive failures " "in the configuration.") @@ -285,19 +292,22 @@ class Destination: return p
@staticmethod - def from_config(conf_section, max_dl): + def from_config(conf_section, max_dl, number_threads): assert 'url' in conf_section url = conf_section['url'] verify = _parse_verify_option(conf_section) try: - max_num_failures = conf_section.getint('max_num_failures') + # Because one a destination fails, all the threads that are using + # it at that moment will fail too, multiply by the number of + # threads. + max_num_failures = (conf_section.getint('max_num_failures') + or MAX_NUM_DESTINATION_FAILURES) except ValueError: - log.warning("Configuration max_num_failures is wrong, ignoring.") - max_num_failures = None - if max_num_failures: - return Destination(url, max_dl, verify, max_num_failures) - else: - return Destination(url, max_dl, verify) + # If the operator did not setup the number, set to the default. + max_num_failures = MAX_NUM_DESTINATION_FAILURES + + max_num_failures *= number_threads + return Destination(url, max_dl, verify, max_num_failures)
class DestinationList: @@ -331,7 +341,10 @@ class DestinationList: log.debug('Loading info for destination %s', key) dests.append(Destination.from_config( conf[dest_sec], - conf.getint('scanner', 'max_download_size'))) + # Multiply by the number of threads since all the threads will + # fail at the same time. + conf.getint('scanner', 'max_download_size'), + conf.getint('scanner', 'measurement_threads'))) if len(dests) < 1: msg = 'No enabled destinations in config. Please see '\ 'docs/source/man_sbws.ini.rst" or "man 5 sbws.ini" ' \ diff --git a/tests/unit/lib/test_destination.py b/tests/unit/lib/test_destination.py index fc8b489..ca2ff21 100644 --- a/tests/unit/lib/test_destination.py +++ b/tests/unit/lib/test_destination.py @@ -5,9 +5,9 @@ from sbws.lib import destination
def test_destination_is_functional(): - eight_hours_ago = datetime.utcnow() - timedelta(hours=8) - four_hours_ago = datetime.utcnow() - timedelta(hours=4) - two_hours_ago = datetime.utcnow() - timedelta(hours=2) + eleven_mins_ago = datetime.utcnow() - timedelta(minutes=11) + six_mins_ago = datetime.utcnow() - timedelta(minutes=6) + four_mins_ago = datetime.utcnow() - timedelta(minutes=4)
d = destination.Destination('unexistenturl', 0, False) assert d.is_functional() @@ -29,19 +29,19 @@ def test_destination_is_functional(): d.add_failure() d.add_failure() # And last failure was 2h ago - d.add_failure(two_hours_ago) + d.add_failure(four_mins_ago) assert d._are_last_attempts_failures() assert not d._is_last_try_old_enough() assert not d.is_functional()
# But if the last failure was 4h ago, try to use it again # And last failure was 4h ago - d.add_failure(four_hours_ago) + d.add_failure(six_mins_ago) assert d._is_last_try_old_enough() assert d.is_functional()
# If last failure was 8h ago, try to use it again again - d.add_failure(eight_hours_ago) + d.add_failure(eleven_mins_ago) assert d._is_last_try_old_enough() assert d.is_functional()
tor-commits@lists.torproject.org