commit 083e7c702313ffd3e4bf2be31ec19cb90bc046a1 Author: juga0 juga@riseup.net Date: Sat Mar 16 10:21:22 2019 +0000
new: destination: Recover destination when it failed
Closes: #29589. --- sbws/core/scanner.py | 24 ++--- sbws/lib/destination.py | 171 +++++++++++++++++++++++------- tests/integration/lib/test_destination.py | 52 ++++----- tests/unit/lib/test_destination.py | 54 ++++++++++ 4 files changed, 222 insertions(+), 79 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index 24a975b..4b06225 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -267,16 +267,13 @@ def measure_relay(args, conf, destinations, cb, rl, relay): log.critical("There are not any functional destinations.\n" "It is recommended to set several destinations so that " "the scanner can continue if one fails.") - # Exit the scanner with error stopping threads first. - stop_threads(signal.SIGTERM, None, 1) - # When the destinations can recover would be implemented; - # reason = 'Unable to get destination' - # log.debug(reason + ' to measure %s %s', - # relay.nickname, relay.fingerprint) - # return [ - # ResultErrorDestination(relay, [], dest.url, our_nick, - # msg=reason), - # ] + # NOTE: Because this is executed in a thread, stop_threads can not + # be call from here, it has to be call from the main thread. + # Instead set the singleton end event, that will call stop_threads + # from the main process. + # Errors with only one destination are set in ResultErrorStream. + settings.end_event.set() + return None # Pick a relay to help us measure the given relay. If the given relay is an # exit, then pick a non-exit. Otherwise pick an exit. helper = None @@ -321,10 +318,9 @@ def measure_relay(args, conf, destinations, cb, rl, relay): log.debug('Destination %s unusable via circuit %s (%s), %s', dest.url, circ_fps, nicknames, usable_data) cb.close_circuit(circ_id) - # TODO: Return a different/new type of ResultError? - msg = 'The destination seemed to have stopped being usable' return [ - ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg), + ResultErrorStream(relay, circ_fps, dest.url, our_nick, + msg=usable_data), ] assert is_usable assert 'content_length' in usable_data @@ -561,7 +557,7 @@ def wait_for_results(num_relays_to_measure, pending_results): than the time to request over the network.) """ num_last_measured = 1 - while num_last_measured > 0: + while num_last_measured > 0 and not settings.end_event.is_set(): log.info("Pending measurements: %s out of %s: ", len(pending_results), num_relays_to_measure) time.sleep(TIMEOUT_MEASUREMENTS) diff --git a/sbws/lib/destination.py b/sbws/lib/destination.py index a92df61..59474a6 100644 --- a/sbws/lib/destination.py +++ b/sbws/lib/destination.py @@ -1,3 +1,5 @@ +import collections +import datetime import logging import random import requests @@ -6,8 +8,13 @@ from stem.control import EventType
from sbws.globals import DESTINATION_VERIFY_CERTIFICATE import sbws.util.stem as stem_utils +from ..globals import ( + MAX_NUM_DESTINATION_FAILURES, + DELTA_SECONDS_RETRY_DESTINATION, + NUM_DESTINATION_ATTEMPTS_STORED, + FACTOR_INCREMENT_DESTINATION_RETRY + )
-from ..globals import MAXIMUM_NUMBER_DESTINATION_FAILURES
log = logging.getLogger(__name__)
@@ -98,73 +105,155 @@ def connect_to_destination_over_circuit(dest, circ_id, session, cont, max_dl): try: head = session.head(dest.url, verify=dest.verify) except requests.exceptions.RequestException as e: - dest.set_failure() + dest.add_failure() return False, 'Could not connect to {} over circ {} {}: {}'.format( dest.url, circ_id, stem_utils.circuit_str(cont, circ_id), e) finally: stem_utils.remove_event_listener(cont, listener) if head.status_code != requests.codes.ok: - dest.set_failure() + dest.add_failure() return False, error_prefix + 'we expected HTTP code '\ '{} not {}'.format(requests.codes.ok, head.status_code) if 'content-length' not in head.headers: - dest.set_failure() + dest.add_failure() return False, error_prefix + 'we except the header Content-Length '\ 'to exist in the response' content_length = int(head.headers['content-length']) if max_dl > content_length: - dest.set_failure() + dest.add_failure() return False, error_prefix + 'our maximum configured download size '\ 'is {} but the content is only {}'.format(max_dl, content_length) log.debug('Connected to %s over circuit %s', dest.url, circ_id) - # Any failure connecting to the destination will call set_failure, - # which will set `failed` to True and count consecutives failures. - # It can not be set at the start, to be able to know if it failed a - # a previous time, which is checked by set_failure. - # Future improvement: use a list to count consecutive failures - # or calculate it from the results. - dest.failed = False + # Any failure connecting to the destination will call add_failure, + # It can not be set at the start, to be able to know whether it is + # failing consecutive times. + dest.add_success() return True, {'content_length': content_length}
class Destination: - def __init__(self, url, max_dl, verify): + """Web server from which data is downloaded to measure bandwidth. + """ + # NOTE: max_dl and verify should be optional and have defaults + def __init__(self, url, max_dl, verify, + max_num_failures=MAX_NUM_DESTINATION_FAILURES, + delta_seconds_retry=DELTA_SECONDS_RETRY_DESTINATION, + num_attempts_stored=NUM_DESTINATION_ATTEMPTS_STORED, + factor_increment_retry=FACTOR_INCREMENT_DESTINATION_RETRY): + """Initalizes the Web server from which the data is downloaded. + + :param str url: Web server data URL to download. + :param int max_dl: Maximum size of the the data to download. + :param bool verify: Whether to verify or not the TLS certificate. + :param int max_num_failures: Number of consecutive failures when the + destination is not considered functional. + :param int delta_seconds_retry: Delta time to try a destination + that was not functional. + :param int num_attempts_stored: Number of attempts to store. + :param int factor_increment_retry: Factor to increment delta by + before trying to use a destination again. + """ self._max_dl = max_dl u = urlparse(url) self._url = u self._verify = verify - # Flag to record whether this destination failed in the last - # measurement. - # Failures can happen if: - # - an HTTPS request can not be made over Tor - # (which might be the relays fault, not the destination being - # unreachable) - # - the destination does not support HTTP Range requests. - self.failed = False - self.consecutive_failures = 0
- @property - def is_functional(self): + # Attributes to decide whether a destination is functional or not. + self._max_num_failures = max_num_failures + self._num_attempts_stored = num_attempts_stored + # Default delta time to try a destination that was not functional. + self._default_delta_seconds_retry = delta_seconds_retry + self._delta_seconds_retry = delta_seconds_retry + # Using a deque (FIFO) to do not grow forever and + # to do not have to remove old attempts. + # Store tuples of timestamp and whether the destination succed or not + # (succed, 1, failed, 0). + # Initialize it as if it never failed. + self._attempts = collections.deque([(datetime.datetime.utcnow(), 1), ], + maxlen=self._num_attempts_stored) + self._factor = factor_increment_retry + + def _last_attempts(self, n=None): + """Return the last ``n`` attempts the destination was used.""" + # deque does not accept slices, + # a new deque is returned with the last n items + # (or less if there were less). + return collections.deque(self._attempts, + maxlen=(n or self._max_num_failures)) + + def _are_last_attempts_failures(self, n=None): + """ + Return True if the last`` n`` times the destination was used + and failed. + """ + # Count the number that there was a failure when used + n = n if n else self._max_num_failures + return ([i[1] for i in self._last_attempts(n)].count(0) + >= self._max_num_failures) + + def _increment_time_to_retry(self, factor=None): + """ + Increment the time a destination will be tried again by a ``factor``. """ - Returns True if there has not been a number consecutive measurements. - Otherwise warn about it and return False. + self._delta_seconds_retry *= factor or self._factor + log.info("Incremented the time to try destination %s to %s hours.", + self.url, self._delta_seconds_retry / 60 / 60)
+ def _is_last_try_old_enough(self, n=None): + """ + Return True if the last time it was used it was ``n`` seconds ago. """ - if self.consecutive_failures > MAXIMUM_NUMBER_DESTINATION_FAILURES: - log.warning("Destination %s is not functional. Please check that " - "it is correct.", self._url) + # Timestamp of the last attempt. + last_time = self._attempts[-1][0] + # If the last attempt is older than _delta_seconds_retry, + if (datetime.datetime.utcnow() + - datetime.timedelta(seconds=self._delta_seconds_retry) + > last_time): + # And try again. + return True + return False + + def is_functional(self): + """Whether connections to a destination are failing or not. + + Return True if: + - It did not fail more than n (by default 3) consecutive times. + - The last time the destination was tried + was x (by default 3h) seconds ago. + And False otherwise. + + When the destination is tried again after the consecutive failures, + the time to try again is incremented and resetted as soon as the + destination does not fail. + """ + # Failed the last X consecutive times + if self._are_last_attempts_failures(): + log.warning("The last %s times the destination %s failed." + "It will not be used again in %s hours.\n", + self._max_num_failures, self.url, + self._delta_seconds_retry / 60 / 60) + log.warning("Please, add more destinations or increment the " + "number of maximum number of consecutive failures " + "in the configuration.") + # It was not used for a while and the last time it was used + # was long ago, then try again + if self._is_last_try_old_enough(): + log.info("The destination %s was not tried for %s hours, " + "it is going to by tried again.") + # Set the next time to retry higher, in case this attempt fails + self._increment_time_to_retry() + return True return False + # Reset the time to retry to the initial value + # In case it was incrememented + self._delta_seconds_retry = self._default_delta_seconds_retry return True
- def set_failure(self): - """Set failed to True and increase the number of consecutive failures. - Only if it also failed in the previous measuremnt. + def add_failure(self, dt=None): + self._attempts.append((dt or datetime.datetime.utcnow(), 0))
- """ - # if it failed in the last measurement - if self.failed: - self.consecutive_failures += 1 - self.failed = True + def add_success(self, dt=None): + self._attempts.append((dt or datetime.datetime.utcnow(), 1))
@property def url(self): @@ -213,7 +302,7 @@ class DestinationList:
@property def functional_destinations(self): - return [d for d in self._all_dests if d.is_functional] + return [d for d in self._all_dests if d.is_functional()]
@staticmethod def from_config(conf, circuit_builder, relay_list, controller): @@ -250,4 +339,8 @@ class DestinationList: # This removes the need for an extra lock for every measurement. # Do not change the order of the destinations, just return a # destination. - return self._rng.choice(self.functional_destinations) + # random.choice raises IndexError with an empty list. + if self.functional_destinations: + return self._rng.choice(self.functional_destinations) + else: + return None diff --git a/tests/integration/lib/test_destination.py b/tests/integration/lib/test_destination.py index 54cbacc..98ed89f 100644 --- a/tests/integration/lib/test_destination.py +++ b/tests/integration/lib/test_destination.py @@ -1,5 +1,4 @@ """Integration tests for destination.py""" -from sbws.globals import MAXIMUM_NUMBER_DESTINATION_FAILURES import sbws.util.requests as requests_utils from sbws.lib.destination import (DestinationList, Destination, connect_to_destination_over_circuit) @@ -36,16 +35,12 @@ def test_connect_to_destination_over_circuit_success(persistent_launch_tor, destination, circuit_id, session, persistent_launch_tor, 1024) assert is_usable is True assert 'content_length' in response - assert not destination.failed - assert destination.consecutive_failures == 0 - assert destination.is_functional + assert destination.is_functional()
def test_connect_to_destination_over_circuit_fail(persistent_launch_tor, dests, cb, rl): bad_destination = Destination('https://example.example', 1024, False) - # dests._all_dests.append(bad_destination) - # dests._usable_dests.append(bad_destination) session = requests_utils.make_session(persistent_launch_tor, 10) # Choose a relay that is not an exit relay = [r for r in rl.relays @@ -61,35 +56,40 @@ def test_connect_to_destination_over_circuit_fail(persistent_launch_tor, assert is_usable is False
# because it is the first time it fails, failures aren't count - assert bad_destination.failed - assert bad_destination.consecutive_failures == 0 - assert bad_destination.is_functional + assert bad_destination.is_functional()
- # fail twice in a row + # fail three times in a row is_usable, response = connect_to_destination_over_circuit( bad_destination, circuit_id, session, persistent_launch_tor, 1024) - assert bad_destination.failed - assert bad_destination.consecutive_failures == 1 - assert bad_destination.is_functional + is_usable, response = connect_to_destination_over_circuit( + bad_destination, circuit_id, session, persistent_launch_tor, 1024) + assert not bad_destination.is_functional()
def test_functional_destinations(conf, cb, rl, persistent_launch_tor): good_destination = Destination('https://127.0.0.1:28888', 1024, False) - # Mock that it failed before and just now, but it's still considered - # functional. - good_destination.consecutive_failures = 3 - good_destination.failed = True bad_destination = Destination('https://example.example', 1024, False) - # Mock that it didn't fail now, but it already failed 11 consecutive - # times. - bad_destination.consecutive_failures = \ - MAXIMUM_NUMBER_DESTINATION_FAILURES + 1 - bad_destination.failed = False - # None of the arguments are used, move to unit tests when this get - # refactored + + session = requests_utils.make_session(persistent_launch_tor, 10) + # Choose a relay that is not an exit + relay = [r for r in rl.relays + if r.nickname == 'relay1mbyteMAB'][0] + # Choose an exit, for this test it does not matter the bandwidth + helper = rl.exits_not_bad_allowing_port(bad_destination.port)[0] + circuit_path = [relay.fingerprint, helper.fingerprint] + # Build a circuit. + circuit_id, _ = cb.build_circuit(circuit_path) + + # fail three times in a row + is_usable, response = connect_to_destination_over_circuit( + bad_destination, circuit_id, session, persistent_launch_tor, 1024) + is_usable, response = connect_to_destination_over_circuit( + bad_destination, circuit_id, session, persistent_launch_tor, 1024) + is_usable, response = connect_to_destination_over_circuit( + bad_destination, circuit_id, session, persistent_launch_tor, 1024) + destination_list = DestinationList( conf, [good_destination, bad_destination], cb, rl, persistent_launch_tor) - expected_functional_destinations = [good_destination] functional_destinations = destination_list.functional_destinations - assert expected_functional_destinations == functional_destinations + assert [good_destination] == functional_destinations diff --git a/tests/unit/lib/test_destination.py b/tests/unit/lib/test_destination.py new file mode 100644 index 0000000..fc8b489 --- /dev/null +++ b/tests/unit/lib/test_destination.py @@ -0,0 +1,54 @@ +"""Unit tests for sbws.lib.destination.""" +from datetime import datetime, timedelta + +from sbws.lib import destination + + +def test_destination_is_functional(): + eight_hours_ago = datetime.utcnow() - timedelta(hours=8) + four_hours_ago = datetime.utcnow() - timedelta(hours=4) + two_hours_ago = datetime.utcnow() - timedelta(hours=2) + + d = destination.Destination('unexistenturl', 0, False) + assert d.is_functional() + + # Fail 3 consecutive times + d.add_failure() + d.add_failure() + d.add_failure() + assert d._are_last_attempts_failures() + assert not d._is_last_try_old_enough() + assert not d.is_functional() + + # Then doesn't fail and it's functional again + d.add_success() + assert not d._are_last_attempts_failures() + assert d.is_functional() + + # Fail again 3 times + d.add_failure() + d.add_failure() + # And last failure was 2h ago + d.add_failure(two_hours_ago) + assert d._are_last_attempts_failures() + assert not d._is_last_try_old_enough() + assert not d.is_functional() + + # But if the last failure was 4h ago, try to use it again + # And last failure was 4h ago + d.add_failure(four_hours_ago) + assert d._is_last_try_old_enough() + assert d.is_functional() + + # If last failure was 8h ago, try to use it again again + d.add_failure(eight_hours_ago) + assert d._is_last_try_old_enough() + assert d.is_functional() + + # Whenever it does not fail again, reset the time to try again + # on 3 consecutive failures + d.add_success() + assert not d._are_last_attempts_failures() + assert d.is_functional() + # And the delta to try is resetted + assert not d._is_last_try_old_enough()