[tor-commits] [sbws/master] new: destination: Recover destination when it failed

juga at torproject.org juga at torproject.org
Thu Mar 21 18:30:42 UTC 2019


commit 083e7c702313ffd3e4bf2be31ec19cb90bc046a1
Author: juga0 <juga at riseup.net>
Date:   Sat Mar 16 10:21:22 2019 +0000

    new: destination: Recover destination when it failed
    
    Closes: #29589.
---
 sbws/core/scanner.py                      |  24 ++---
 sbws/lib/destination.py                   | 171 +++++++++++++++++++++++-------
 tests/integration/lib/test_destination.py |  52 ++++-----
 tests/unit/lib/test_destination.py        |  54 ++++++++++
 4 files changed, 222 insertions(+), 79 deletions(-)

diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index 24a975b..4b06225 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -267,16 +267,13 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
         log.critical("There are not any functional destinations.\n"
                      "It is recommended to set several destinations so that "
                      "the scanner can continue if one fails.")
-        # Exit the scanner with error stopping threads first.
-        stop_threads(signal.SIGTERM, None, 1)
-        # When the destinations can recover would be implemented;
-        # reason = 'Unable to get destination'
-        # log.debug(reason + ' to measure %s %s',
-        #           relay.nickname, relay.fingerprint)
-        # return [
-        #     ResultErrorDestination(relay, [], dest.url, our_nick,
-        #                            msg=reason),
-        #     ]
+        # NOTE: Because this is executed in a thread, stop_threads can not
+        # be call from here, it has to be call from the main thread.
+        # Instead set the singleton end event, that will call stop_threads
+        # from the main process.
+        # Errors with only one destination are set in ResultErrorStream.
+        settings.end_event.set()
+        return None
     # Pick a relay to help us measure the given relay. If the given relay is an
     # exit, then pick a non-exit. Otherwise pick an exit.
     helper = None
@@ -321,10 +318,9 @@ def measure_relay(args, conf, destinations, cb, rl, relay):
         log.debug('Destination %s unusable via circuit %s (%s), %s',
                   dest.url, circ_fps, nicknames, usable_data)
         cb.close_circuit(circ_id)
-        # TODO: Return a different/new type of ResultError?
-        msg = 'The destination seemed to have stopped being usable'
         return [
-            ResultErrorStream(relay, circ_fps, dest.url, our_nick, msg=msg),
+            ResultErrorStream(relay, circ_fps, dest.url, our_nick,
+                              msg=usable_data),
         ]
     assert is_usable
     assert 'content_length' in usable_data
@@ -561,7 +557,7 @@ def wait_for_results(num_relays_to_measure, pending_results):
       than the time to request over the network.)
     """
     num_last_measured = 1
-    while num_last_measured > 0:
+    while num_last_measured > 0 and not settings.end_event.is_set():
         log.info("Pending measurements: %s out of %s: ",
                  len(pending_results), num_relays_to_measure)
         time.sleep(TIMEOUT_MEASUREMENTS)
diff --git a/sbws/lib/destination.py b/sbws/lib/destination.py
index a92df61..59474a6 100644
--- a/sbws/lib/destination.py
+++ b/sbws/lib/destination.py
@@ -1,3 +1,5 @@
+import collections
+import datetime
 import logging
 import random
 import requests
@@ -6,8 +8,13 @@ from stem.control import EventType
 
 from sbws.globals import DESTINATION_VERIFY_CERTIFICATE
 import sbws.util.stem as stem_utils
+from ..globals import (
+    MAX_NUM_DESTINATION_FAILURES,
+    DELTA_SECONDS_RETRY_DESTINATION,
+    NUM_DESTINATION_ATTEMPTS_STORED,
+    FACTOR_INCREMENT_DESTINATION_RETRY
+    )
 
-from ..globals import MAXIMUM_NUMBER_DESTINATION_FAILURES
 
 log = logging.getLogger(__name__)
 
@@ -98,73 +105,155 @@ def connect_to_destination_over_circuit(dest, circ_id, session, cont, max_dl):
         try:
             head = session.head(dest.url, verify=dest.verify)
         except requests.exceptions.RequestException as e:
-            dest.set_failure()
+            dest.add_failure()
             return False, 'Could not connect to {} over circ {} {}: {}'.format(
                 dest.url, circ_id, stem_utils.circuit_str(cont, circ_id), e)
         finally:
             stem_utils.remove_event_listener(cont, listener)
     if head.status_code != requests.codes.ok:
-        dest.set_failure()
+        dest.add_failure()
         return False, error_prefix + 'we expected HTTP code '\
             '{} not {}'.format(requests.codes.ok, head.status_code)
     if 'content-length' not in head.headers:
-        dest.set_failure()
+        dest.add_failure()
         return False, error_prefix + 'we except the header Content-Length '\
             'to exist in the response'
     content_length = int(head.headers['content-length'])
     if max_dl > content_length:
-        dest.set_failure()
+        dest.add_failure()
         return False, error_prefix + 'our maximum configured download size '\
             'is {} but the content is only {}'.format(max_dl, content_length)
     log.debug('Connected to %s over circuit %s', dest.url, circ_id)
-    # Any failure connecting to the destination will call set_failure,
-    # which will set `failed` to True and count consecutives failures.
-    # It can not be set at the start, to be able to know if it failed a
-    # a previous time, which is checked by set_failure.
-    # Future improvement: use a list to count consecutive failures
-    # or calculate it from the results.
-    dest.failed = False
+    # Any failure connecting to the destination will call add_failure,
+    # It can not be set at the start, to be able to know whether it is
+    # failing consecutive times.
+    dest.add_success()
     return True, {'content_length': content_length}
 
 
 class Destination:
-    def __init__(self, url, max_dl, verify):
+    """Web server from which data is downloaded to measure bandwidth.
+    """
+    # NOTE: max_dl and verify should be optional and have defaults
+    def __init__(self, url, max_dl, verify,
+                 max_num_failures=MAX_NUM_DESTINATION_FAILURES,
+                 delta_seconds_retry=DELTA_SECONDS_RETRY_DESTINATION,
+                 num_attempts_stored=NUM_DESTINATION_ATTEMPTS_STORED,
+                 factor_increment_retry=FACTOR_INCREMENT_DESTINATION_RETRY):
+        """Initalizes the Web server from which the data is downloaded.
+
+        :param str url: Web server data URL to download.
+        :param int max_dl: Maximum size of the the data to download.
+        :param bool verify: Whether to verify or not the TLS certificate.
+        :param int max_num_failures: Number of consecutive failures when the
+            destination is not considered functional.
+        :param int delta_seconds_retry: Delta time to try a destination
+            that was not functional.
+        :param int num_attempts_stored: Number of attempts to store.
+        :param int factor_increment_retry: Factor to increment delta by
+            before trying to use a destination again.
+        """
         self._max_dl = max_dl
         u = urlparse(url)
         self._url = u
         self._verify = verify
-        # Flag to record whether this destination failed in the last
-        # measurement.
-        # Failures can happen if:
-        # - an HTTPS request can not be made over Tor
-        # (which might be the relays fault, not the destination being
-        # unreachable)
-        # - the destination does not support HTTP Range requests.
-        self.failed = False
-        self.consecutive_failures = 0
 
-    @property
-    def is_functional(self):
+        # Attributes to decide whether a destination is functional or not.
+        self._max_num_failures = max_num_failures
+        self._num_attempts_stored = num_attempts_stored
+        # Default delta time to try a destination that was not functional.
+        self._default_delta_seconds_retry = delta_seconds_retry
+        self._delta_seconds_retry = delta_seconds_retry
+        # Using a deque (FIFO) to do not grow forever and
+        # to do not have to remove old attempts.
+        # Store tuples of timestamp and whether the destination succed or not
+        # (succed, 1, failed, 0).
+        # Initialize it as if it never failed.
+        self._attempts = collections.deque([(datetime.datetime.utcnow(), 1), ],
+                                           maxlen=self._num_attempts_stored)
+        self._factor = factor_increment_retry
+
+    def _last_attempts(self, n=None):
+        """Return the last ``n`` attempts the destination was used."""
+        # deque does not accept slices,
+        # a new deque is returned with the last n items
+        # (or less if there were less).
+        return collections.deque(self._attempts,
+                                 maxlen=(n or self._max_num_failures))
+
+    def _are_last_attempts_failures(self, n=None):
+        """
+        Return True if the last`` n`` times the destination was used
+        and failed.
+        """
+        # Count the number that there was a failure when used
+        n = n if n else self._max_num_failures
+        return ([i[1] for i in self._last_attempts(n)].count(0)
+                >= self._max_num_failures)
+
+    def _increment_time_to_retry(self, factor=None):
+        """
+        Increment the time a destination will be tried again by a ``factor``.
         """
-        Returns True if there has not been a number consecutive measurements.
-        Otherwise warn about it and return False.
+        self._delta_seconds_retry *= factor or self._factor
+        log.info("Incremented the time to try destination %s to %s hours.",
+                 self.url, self._delta_seconds_retry / 60 / 60)
 
+    def _is_last_try_old_enough(self, n=None):
+        """
+        Return True if the last time it was used it was ``n`` seconds ago.
         """
-        if self.consecutive_failures > MAXIMUM_NUMBER_DESTINATION_FAILURES:
-            log.warning("Destination %s is not functional. Please check that "
-                        "it is correct.", self._url)
+        # Timestamp of the last attempt.
+        last_time = self._attempts[-1][0]
+        # If the last attempt is older than _delta_seconds_retry,
+        if (datetime.datetime.utcnow()
+                - datetime.timedelta(seconds=self._delta_seconds_retry)
+                > last_time):
+            # And try again.
+            return True
+        return False
+
+    def is_functional(self):
+        """Whether connections to a destination are failing or not.
+
+        Return True if:
+            - It did not fail more than n (by default 3) consecutive times.
+            - The last time the destination was tried
+              was x (by default 3h) seconds ago.
+        And False otherwise.
+
+        When the destination is tried again after the consecutive failures,
+        the time to try again is incremented and resetted as soon as the
+        destination does not fail.
+        """
+        # Failed the last X consecutive times
+        if self._are_last_attempts_failures():
+            log.warning("The last %s times the destination %s failed."
+                        "It will not be used again in %s hours.\n",
+                        self._max_num_failures, self.url,
+                        self._delta_seconds_retry / 60 / 60)
+            log.warning("Please, add more destinations or increment the "
+                        "number of maximum number of consecutive failures "
+                        "in the configuration.")
+            # It was not used for a while and the last time it was used
+            # was long ago, then try again
+            if self._is_last_try_old_enough():
+                log.info("The destination %s was not tried for %s hours, "
+                         "it is going to by tried again.")
+                # Set the next time to retry higher, in case this attempt fails
+                self._increment_time_to_retry()
+                return True
             return False
+        # Reset the time to retry to the initial value
+        # In case it was incrememented
+        self._delta_seconds_retry = self._default_delta_seconds_retry
         return True
 
-    def set_failure(self):
-        """Set failed to True and increase the number of consecutive failures.
-        Only if it also failed in the previous measuremnt.
+    def add_failure(self, dt=None):
+        self._attempts.append((dt or datetime.datetime.utcnow(), 0))
 
-        """
-        # if it failed in the last measurement
-        if self.failed:
-            self.consecutive_failures += 1
-        self.failed = True
+    def add_success(self, dt=None):
+        self._attempts.append((dt or datetime.datetime.utcnow(), 1))
 
     @property
     def url(self):
@@ -213,7 +302,7 @@ class DestinationList:
 
     @property
     def functional_destinations(self):
-        return [d for d in self._all_dests if d.is_functional]
+        return [d for d in self._all_dests if d.is_functional()]
 
     @staticmethod
     def from_config(conf, circuit_builder, relay_list, controller):
@@ -250,4 +339,8 @@ class DestinationList:
         # This removes the need for an extra lock for every measurement.
         # Do not change the order of the destinations, just return a
         # destination.
-        return self._rng.choice(self.functional_destinations)
+        # random.choice raises IndexError with an empty list.
+        if self.functional_destinations:
+            return self._rng.choice(self.functional_destinations)
+        else:
+            return None
diff --git a/tests/integration/lib/test_destination.py b/tests/integration/lib/test_destination.py
index 54cbacc..98ed89f 100644
--- a/tests/integration/lib/test_destination.py
+++ b/tests/integration/lib/test_destination.py
@@ -1,5 +1,4 @@
 """Integration tests for destination.py"""
-from sbws.globals import MAXIMUM_NUMBER_DESTINATION_FAILURES
 import sbws.util.requests as requests_utils
 from sbws.lib.destination import (DestinationList, Destination,
                                   connect_to_destination_over_circuit)
@@ -36,16 +35,12 @@ def test_connect_to_destination_over_circuit_success(persistent_launch_tor,
         destination, circuit_id, session, persistent_launch_tor, 1024)
     assert is_usable is True
     assert 'content_length' in response
-    assert not destination.failed
-    assert destination.consecutive_failures == 0
-    assert destination.is_functional
+    assert destination.is_functional()
 
 
 def test_connect_to_destination_over_circuit_fail(persistent_launch_tor,
                                                   dests, cb, rl):
     bad_destination = Destination('https://example.example', 1024, False)
-    # dests._all_dests.append(bad_destination)
-    # dests._usable_dests.append(bad_destination)
     session = requests_utils.make_session(persistent_launch_tor, 10)
     # Choose a relay that is not an exit
     relay = [r for r in rl.relays
@@ -61,35 +56,40 @@ def test_connect_to_destination_over_circuit_fail(persistent_launch_tor,
     assert is_usable is False
 
     # because it is the first time it fails, failures aren't count
-    assert bad_destination.failed
-    assert bad_destination.consecutive_failures == 0
-    assert bad_destination.is_functional
+    assert bad_destination.is_functional()
 
-    # fail twice in a row
+    # fail three times in a row
     is_usable, response = connect_to_destination_over_circuit(
         bad_destination, circuit_id, session, persistent_launch_tor, 1024)
-    assert bad_destination.failed
-    assert bad_destination.consecutive_failures == 1
-    assert bad_destination.is_functional
+    is_usable, response = connect_to_destination_over_circuit(
+        bad_destination, circuit_id, session, persistent_launch_tor, 1024)
+    assert not bad_destination.is_functional()
 
 
 def test_functional_destinations(conf, cb, rl, persistent_launch_tor):
     good_destination = Destination('https://127.0.0.1:28888', 1024, False)
-    # Mock that it failed before and just now, but it's still considered
-    # functional.
-    good_destination.consecutive_failures = 3
-    good_destination.failed = True
     bad_destination = Destination('https://example.example', 1024, False)
-    # Mock that it didn't fail now, but it already failed 11 consecutive
-    # times.
-    bad_destination.consecutive_failures = \
-        MAXIMUM_NUMBER_DESTINATION_FAILURES + 1
-    bad_destination.failed = False
-    # None of the arguments are used, move to unit tests when this get
-    # refactored
+
+    session = requests_utils.make_session(persistent_launch_tor, 10)
+    # Choose a relay that is not an exit
+    relay = [r for r in rl.relays
+             if r.nickname == 'relay1mbyteMAB'][0]
+    # Choose an exit, for this test it does not matter the bandwidth
+    helper = rl.exits_not_bad_allowing_port(bad_destination.port)[0]
+    circuit_path = [relay.fingerprint, helper.fingerprint]
+    # Build a circuit.
+    circuit_id, _ = cb.build_circuit(circuit_path)
+
+    # fail three times in a row
+    is_usable, response = connect_to_destination_over_circuit(
+        bad_destination, circuit_id, session, persistent_launch_tor, 1024)
+    is_usable, response = connect_to_destination_over_circuit(
+        bad_destination, circuit_id, session, persistent_launch_tor, 1024)
+    is_usable, response = connect_to_destination_over_circuit(
+        bad_destination, circuit_id, session, persistent_launch_tor, 1024)
+
     destination_list = DestinationList(
         conf, [good_destination, bad_destination], cb, rl,
         persistent_launch_tor)
-    expected_functional_destinations = [good_destination]
     functional_destinations = destination_list.functional_destinations
-    assert expected_functional_destinations == functional_destinations
+    assert [good_destination] == functional_destinations
diff --git a/tests/unit/lib/test_destination.py b/tests/unit/lib/test_destination.py
new file mode 100644
index 0000000..fc8b489
--- /dev/null
+++ b/tests/unit/lib/test_destination.py
@@ -0,0 +1,54 @@
+"""Unit tests for sbws.lib.destination."""
+from datetime import datetime, timedelta
+
+from sbws.lib import destination
+
+
+def test_destination_is_functional():
+    eight_hours_ago = datetime.utcnow() - timedelta(hours=8)
+    four_hours_ago = datetime.utcnow() - timedelta(hours=4)
+    two_hours_ago = datetime.utcnow() - timedelta(hours=2)
+
+    d = destination.Destination('unexistenturl', 0, False)
+    assert d.is_functional()
+
+    # Fail 3 consecutive times
+    d.add_failure()
+    d.add_failure()
+    d.add_failure()
+    assert d._are_last_attempts_failures()
+    assert not d._is_last_try_old_enough()
+    assert not d.is_functional()
+
+    # Then doesn't fail and it's functional again
+    d.add_success()
+    assert not d._are_last_attempts_failures()
+    assert d.is_functional()
+
+    # Fail again 3 times
+    d.add_failure()
+    d.add_failure()
+    # And last failure was 2h ago
+    d.add_failure(two_hours_ago)
+    assert d._are_last_attempts_failures()
+    assert not d._is_last_try_old_enough()
+    assert not d.is_functional()
+
+    # But if the last failure was 4h ago, try to use it again
+    # And last failure was 4h ago
+    d.add_failure(four_hours_ago)
+    assert d._is_last_try_old_enough()
+    assert d.is_functional()
+
+    # If last failure was 8h ago, try to use it again again
+    d.add_failure(eight_hours_ago)
+    assert d._is_last_try_old_enough()
+    assert d.is_functional()
+
+    # Whenever it does not fail again, reset the time to try again
+    # on 3 consecutive failures
+    d.add_success()
+    assert not d._are_last_attempts_failures()
+    assert d.is_functional()
+    # And the delta to try is resetted
+    assert not d._is_last_try_old_enough()





More information about the tor-commits mailing list