[tor-commits] [sbws/master] chg: scanner: Remove not needed thread loop

juga at torproject.org juga at torproject.org
Thu Mar 21 18:30:42 UTC 2019


commit 6fc0c16fc756f9cf4820b94d60b26c59b350a125
Author: juga0 <juga at riseup.net>
Date:   Mon Mar 4 19:00:32 2019 +0000

    chg: scanner: Remove not needed thread loop
    
    The pool can queue all the relays to be measured,
    but this inner loop was not adding more worker threads to the pool
    unit it finishs the current threads.
    Instead, wait for all the worker threads in the pool to finish
    or try to get the status after waiting enough.
    
    Closes: #28864, #28865. Bugfix v1.0.0
---
 sbws/core/scanner.py | 116 ++++++++++++++++++++++++++++++++++-----------------
 1 file changed, 77 insertions(+), 39 deletions(-)

diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index 2e88b53..3d0924d 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -443,28 +443,21 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
     instead ``result_putter_error``, which logs the error and complete
     immediately.
 
-    Before iterating over the next relay, it waits (non blocking, since it
-    happens in the main thread) until one of the ``max_pending_results``
-    threads has finished.
-
-    This is not needed, since otherwise async_result will queue the relays to
-    measure in order and won't start reusing a thread to measure a relay until
-    other thread has finished. But it makes the logic a bit more sequential.
-
-    Before the outer loop iterates, it also waits (again non blocking) that all
-    the ``Results`` are ready.
+    Before the outer loop iterates, it waits (non blocking) that all
+    the ``Results`` are ready calling ``wait_for_results``.
     This avoid to start measuring the same relay which might still being
     measured.
 
     """
-    pending_results = []
     # Set the time to wait for a thread to finish as the half of an HTTP
     # request timeout.
-    time_to_sleep = conf.getfloat('general', 'http_timeout') / 2
     # Do not start a new loop if sbws is stopping.
     while not settings.end_event.is_set():
         log.debug("Starting a new measurement loop.")
         num_relays = 0
+        # Since loop might finish before pending_results is 0 due waiting too
+        # long, set it here and not outside the loop.
+        pending_results = []
         loop_tstart = time.time()
         for target in relay_prioritizer.best_priority():
             # Don't start measuring a relay if sbws is stopping.
@@ -479,14 +472,14 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
                 [args, conf, destinations, circuit_builder, relay_list,
                  target], {}, callback, callback_err)
             pending_results.append(async_result)
-            # Instead of letting apply_async to queue the relays in order until
-            # a thread has finished, wait here until a thread has finished.
-            while len(pending_results) >= max_pending_results:
-                # sleep is non-blocking since happens in the main process.
-                time.sleep(time_to_sleep)
-                pending_results = [r for r in pending_results if not r.ready()]
-
-        wait_for_results(time_to_sleep, pending_results)
+
+        # After the for has finished, the pool has queued all the relays
+        # and pending_results has the list of all the AsyncResults.
+        # It could also be obtained with pool._cache, which contains
+        # a dictionary with AsyncResults as items.
+        num_relays_to_measure = len(pending_results)
+        wait_for_results(num_relays_to_measure, pending_results)
+
         loop_tstop = time.time()
         loop_tdelta = (loop_tstop - loop_tstart) / 60
         log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta)
@@ -497,27 +490,71 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
             stop_threads(signal.SIGTERM, None)
 
 
-def wait_for_results(time_to_sleep, pending_results):
-    time_waiting = 0
-    while (len(pending_results) > 0
-           and time_waiting <= TIMEOUT_MEASUREMENTS):
-        log.debug("Number of pending measurement threads %s after "
-                  "a prioritization loop.", len(pending_results))
-        time.sleep(time_to_sleep)
-        time_waiting += time_to_sleep
+def wait_for_results(num_relays_to_measure, pending_results):
+    """Wait for the pool to finish and log progress.
+
+    While there are relays being measured, just log the progress
+    and sleep :const:`~sbws.globals.TIMEOUT_MEASUREMENTS` (3mins),
+    which is aproximately the time it can take to measure a relay in
+    the worst case.
+
+    When there has not been any relay measured in ``TIMEOUT_MEASUREMENTS``
+    and there are still relays pending to be measured, it means there is no
+    progress and call :func:`~sbws.core.scanner.force_get_results`.
+
+    This can happen in the case of a bug that makes either
+    :func:`~sbws.core.scanner.measure_relay`,
+    :func:`~sbws.core.scanner.result_putter` (callback) and/or
+    :func:`~sbws.core.scanner.result_putter_error` (callback error) stall.
+
+    .. note:: in a future refactor, this could be simpler by:
+
+      1. Initializing the pool at the begingging of each loop
+      2. Callling :meth:`~Pool.close`; :meth:`~Pool.join` after
+         :meth:`~Pool.apply_async`,
+         to ensure no new jobs are added until the pool has finished with all
+         the ones in the queue.
+
+      As currently, there would be still two cases when the pool could stall:
+
+      1. There's an exception in ``measure_relay`` and another in
+         ``callback_err``
+      2. There's an exception ``callback``.
+
+      This could also be simpler by not having callback and callback error in
+      ``apply_async`` and instead just calling callback with the
+      ``pending_results``.
+
+      (callback could be also simpler by not having a thread and queue and
+      just storing to disk, since the time to write to disk is way smaller
+      than the time to request over the network.)
+    """
+    num_last_measured = 1
+    while num_last_measured > 0:
+        log.info("Pending measurements: %s out of %s: ",
+                 len(pending_results), num_relays_to_measure)
+        time.sleep(TIMEOUT_MEASUREMENTS)
+        old_pending_results = pending_results
         pending_results = [r for r in pending_results if not r.ready()]
-    if time_waiting > TIMEOUT_MEASUREMENTS:
-        dumpstacks()
+        num_last_measured = len(old_pending_results) - len(pending_results)
+    if len(pending_results) > 0:
+        force_get_results(pending_results)
 
 
 def force_get_results(pending_results):
-    """Try to get either the result or an exception, which is logged.
+    """Try to get either the result or an exception, which gets logged.
 
-    Get call by ``wait_for_results`` when the time waiting was already long.
-    To get either the Result or an exception, call `get` with timeout.
+    It is call by :func:`~sbws.core.scanner.wait_for_results` when
+    the time waiting for the results was long.
+
+    To get either the :class:`~sbws.lib.resultdump.Result` or an exception,
+    call :meth:`~AsyncResult.get` with timeout.
     Timeout is low since we already waited.
-    `get` is not call before, because it blocks and the callbacks are not call.
+
+    ``get`` is not call before, because it blocks and the callbacks
+    are not call.
     """
+    log.debug("Forcing get")
     for r in pending_results:
         try:
             result = r.get(timeout=0.1)
@@ -527,13 +564,14 @@ def force_get_results(pending_results):
         # been processed yet
         except TimeoutError:
             log.warning("A result was not stored, it was not ready.")
-        # If the result raised an exception, get returns it,
-        # so, log any exception so that it can be fixed.
-        # This should not happen, since callback_err would have been call first
+        # If the result raised an exception, `get` returns it,
+        # then log any exception so that it can be fixed.
+        # This should not happen, since `callback_err` would have been call
+        # first.
         except Exception as e:
             log.critical(FILLUP_TICKET_MSG)
-            log.exception("exception %s", e)
-            # if the exception happened in the threads:
+            # If the exception happened in the threads, `log.exception` does
+            # not have the traceback.
             log.warning("traceback %s",
                         traceback.print_exception(type(e), e, e.__traceback__))
 





More information about the tor-commits mailing list