commit 612807e3155d4a85d004995af0dc06b45b22905d
Author: juga0 <juga(a)riseup.net>
Date: Mon Mar 25 14:28:12 2019 +0000
fix: destination: Multiply errors by the threads
Since when a destination fails, all the threads using it will also
fail that moment.
For now not checking which threads are actually using it.
Also lower the time to retry.
Closes: #29891.
---
sbws/core/scanner.py | 19 +++++++++++++++++--
sbws/globals.py | 4 +++-
sbws/lib/destination.py | 35 ++++++++++++++++++++++++-----------
tests/unit/lib/test_destination.py | 12 ++++++------
4 files changed, 50 insertions(+), 20 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index 32752ca..fdb44eb 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -29,7 +29,6 @@ from multiprocessing.dummy import Pool
import time
import os
import logging
-import requests
import random
from .. import settings
@@ -367,6 +366,19 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
def dispatch_worker_thread(*a, **kw):
+ # If at the point where the relay is actually going to be measured there
+ # are not any functional destinations or the `end_event` is set, do not
+ # try to start measuring the relay, since it will fail anyway.
+ try:
+ # a[2] is the argument `destinations`
+ functional_destinations = a[2].functional_destinations
+ # In case the arguments or the method change, catch the possible exceptions
+ # but ignore here that there are not destinations.
+ except (IndexError, TypeError):
+ log.debug("Wrong argument or attribute.")
+ functional_destinations = True
+ if not functional_destinations or settings.end_event.is_set():
+ return None
return measure_relay(*a, **kw)
@@ -530,7 +542,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
loop_tstop = time.time()
loop_tdelta = (loop_tstop - loop_tstart) / 60
- log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta)
+ # At this point, we know the relays that were queued to be measured.
+ # That does not mean they were actually measured.
+ log.debug("Attempted to measure %s relays in %s minutes",
+ num_relays, loop_tdelta)
# In a testing network, exit after first loop
if controller.get_conf('TestingTorNetwork') == '1':
log.info("In a testing network, exiting after the first loop.")
diff --git a/sbws/globals.py b/sbws/globals.py
index 5003973..723625d 100644
--- a/sbws/globals.py
+++ b/sbws/globals.py
@@ -127,7 +127,9 @@ DESTINATION_VERIFY_CERTIFICATE = True
# whether the destination is functional or not.
NUM_DESTINATION_ATTEMPTS_STORED = 10
# Time to wait before trying again a destination that wasn't functional.
-DELTA_SECONDS_RETRY_DESTINATION = 60 * 60 * 3
+# Because intermitent failures with CDN destinations, start trying again
+# after 5 min.
+DELTA_SECONDS_RETRY_DESTINATION = 60 * 5
# Number of consecutive times a destination can fail before considering it
# not functional.
MAX_NUM_DESTINATION_FAILURES = 3
diff --git a/sbws/lib/destination.py b/sbws/lib/destination.py
index 21a907c..3ccd94f 100644
--- a/sbws/lib/destination.py
+++ b/sbws/lib/destination.py
@@ -229,12 +229,19 @@ class Destination:
the time to try again is incremented and resetted as soon as the
destination does not fail.
"""
+ # NOTE: does a destination fail because several threads are using
+ # it at the same time?
+ # If a destination fails for 1 minute and there're 3 threads, the
+ # 3 threads will fail.
+
# Failed the last X consecutive times
if self._are_last_attempts_failures():
+ # The log here will appear in all the the queued
+ # relays and threads.
log.warning("The last %s times the destination %s failed."
- "It will not be used again in %s hours.\n",
+ "Disabled for %s minutes.",
self._max_num_failures, self.url,
- self._delta_seconds_retry / 60 / 60)
+ self._delta_seconds_retry / 60)
log.warning("Please, add more destinations or increment the "
"number of maximum number of consecutive failures "
"in the configuration.")
@@ -285,19 +292,22 @@ class Destination:
return p
@staticmethod
- def from_config(conf_section, max_dl):
+ def from_config(conf_section, max_dl, number_threads):
assert 'url' in conf_section
url = conf_section['url']
verify = _parse_verify_option(conf_section)
try:
- max_num_failures = conf_section.getint('max_num_failures')
+ # Because one a destination fails, all the threads that are using
+ # it at that moment will fail too, multiply by the number of
+ # threads.
+ max_num_failures = (conf_section.getint('max_num_failures')
+ or MAX_NUM_DESTINATION_FAILURES)
except ValueError:
- log.warning("Configuration max_num_failures is wrong, ignoring.")
- max_num_failures = None
- if max_num_failures:
- return Destination(url, max_dl, verify, max_num_failures)
- else:
- return Destination(url, max_dl, verify)
+ # If the operator did not setup the number, set to the default.
+ max_num_failures = MAX_NUM_DESTINATION_FAILURES
+
+ max_num_failures *= number_threads
+ return Destination(url, max_dl, verify, max_num_failures)
class DestinationList:
@@ -331,7 +341,10 @@ class DestinationList:
log.debug('Loading info for destination %s', key)
dests.append(Destination.from_config(
conf[dest_sec],
- conf.getint('scanner', 'max_download_size')))
+ # Multiply by the number of threads since all the threads will
+ # fail at the same time.
+ conf.getint('scanner', 'max_download_size'),
+ conf.getint('scanner', 'measurement_threads')))
if len(dests) < 1:
msg = 'No enabled destinations in config. Please see '\
'docs/source/man_sbws.ini.rst" or "man 5 sbws.ini" ' \
diff --git a/tests/unit/lib/test_destination.py b/tests/unit/lib/test_destination.py
index fc8b489..ca2ff21 100644
--- a/tests/unit/lib/test_destination.py
+++ b/tests/unit/lib/test_destination.py
@@ -5,9 +5,9 @@ from sbws.lib import destination
def test_destination_is_functional():
- eight_hours_ago = datetime.utcnow() - timedelta(hours=8)
- four_hours_ago = datetime.utcnow() - timedelta(hours=4)
- two_hours_ago = datetime.utcnow() - timedelta(hours=2)
+ eleven_mins_ago = datetime.utcnow() - timedelta(minutes=11)
+ six_mins_ago = datetime.utcnow() - timedelta(minutes=6)
+ four_mins_ago = datetime.utcnow() - timedelta(minutes=4)
d = destination.Destination('unexistenturl', 0, False)
assert d.is_functional()
@@ -29,19 +29,19 @@ def test_destination_is_functional():
d.add_failure()
d.add_failure()
# And last failure was 2h ago
- d.add_failure(two_hours_ago)
+ d.add_failure(four_mins_ago)
assert d._are_last_attempts_failures()
assert not d._is_last_try_old_enough()
assert not d.is_functional()
# But if the last failure was 4h ago, try to use it again
# And last failure was 4h ago
- d.add_failure(four_hours_ago)
+ d.add_failure(six_mins_ago)
assert d._is_last_try_old_enough()
assert d.is_functional()
# If last failure was 8h ago, try to use it again again
- d.add_failure(eight_hours_ago)
+ d.add_failure(eleven_mins_ago)
assert d._is_last_try_old_enough()
assert d.is_functional()