[tor-commits] [sbws/m12] fix: scanner: Increase time getting measurements

juga at torproject.org juga at torproject.org
Tue May 25 13:15:05 UTC 2021


commit 3653396c9daa9170033fd2b706cd9d290eac6f6b
Author: juga0 <juga at riseup.net>
Date:   Thu May 13 12:10:35 2021 +0000

    fix: scanner: Increase time getting measurements
    
    - Increase the time waiting for the last measurements queued, to avoid
      canceling unfinished measurements and gc maybe not releasing thread
      variables
    - Use the already declared global pool instead of passing it by args
    - Log more information when the last measuremetns timeout
    
    Closes: #40087
---
 sbws/core/scanner.py | 30 +++++++++++++++++++++++++-----
 setup.cfg            |  1 +
 2 files changed, 26 insertions(+), 5 deletions(-)

diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index dbcceb9..7637028 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -15,7 +15,12 @@ from multiprocessing.dummy import Pool
 
 import sbws.util.requests as requests_utils
 import sbws.util.stem as stem_utils
-from sbws.globals import HTTP_GET_HEADERS, TIMEOUT_MEASUREMENTS, fail_hard
+from sbws.globals import (
+    HTTP_GET_HEADERS,
+    SOCKET_TIMEOUT,
+    TIMEOUT_MEASUREMENTS,
+    fail_hard,
+)
 
 from .. import settings
 from ..lib.circuitbuilder import GapsCircuitBuilder as CB
@@ -77,7 +82,7 @@ def dumpstacks():
         log.critical(
             "Thread: %s(%d)", thread_id2name.get(thread_id, ""), thread_id
         )
-        log.critical(traceback.format_stack("".join(stack)))
+        log.critical("Traceback: %s", "".join(traceback.format_stack(stack)))
     # If logging level is less than DEBUG (more verbose), start pdb so that
     # developers can debug the issue.
     if log.getEffectiveLevel() < logging.DEBUG:
@@ -660,7 +665,6 @@ def main_loop(
     result_dump,
     relay_prioritizer,
     destinations,
-    pool,
 ):
     """Starts and reuse the threads that measure the relays forever.
 
@@ -696,6 +700,7 @@ def main_loop(
     measured.
 
     """
+    global pool
     log.info("Started the main loop to measure the relays.")
     hbeat = Heartbeat(conf.getpath("paths", "state_fname"))
 
@@ -743,6 +748,7 @@ def main_loop(
             # Register this measurement to the heartbeat module
             hbeat.register_measured_fpr(target.fingerprint)
 
+        log.debug("Measurements queued.")
         # After the for has finished, the pool has queued all the relays
         # and pending_results has the list of all the AsyncResults.
         # It could also be obtained with pool._cache, which contains
@@ -815,6 +821,7 @@ def wait_for_results(num_relays_to_measure, pending_results):
             len(pending_results),
             num_relays_to_measure,
         )
+        log.info("Last measured: %s", num_last_measured)
         time.sleep(TIMEOUT_MEASUREMENTS)
         old_pending_results = pending_results
         pending_results = [r for r in pending_results if not r.ready()]
@@ -836,15 +843,28 @@ def force_get_results(pending_results):
     ``get`` is not call before, because it blocks and the callbacks
     are not call.
     """
+    global pool
     log.debug("Forcing get")
+    # In case there are no finished AsyncResults, print the cache here
+    # at level info so that is visible even if debug is not enabled.
+    log.info("Pool cache %s", pool._cache)
     for r in pending_results:
         try:
-            result = r.get(timeout=0.1)
+            # HTTP timeout is 10
+            result = r.get(timeout=SOCKET_TIMEOUT + 10)
             log.warning("Result %s was not stored, it took too long.", result)
         # TimeoutError is raised when the result is not ready, ie. has not
         # been processed yet
         except TimeoutError:
             log.warning("A result was not stored, it was not ready.")
+            # This is the only place where using psutil so far.
+            import psutil
+
+            log.warning(psutil.Process(os.getpid()).memory_full_info())
+            virtualMemoryInfo = psutil.virtual_memory()
+            availableMemory = virtualMemoryInfo.available
+            log.warning("Memory available %s MB.", availableMemory / 1024 ** 2)
+            dumpstacks()
         # If the result raised an exception, `get` returns it,
         # then log any exception so that it can be fixed.
         # This should not happen, since `callback_err` would have been call
@@ -910,7 +930,7 @@ def run_speedtest(args, conf):
     max_pending_results = conf.getint("scanner", "measurement_threads")
     pool = Pool(max_pending_results)
     try:
-        main_loop(args, conf, controller, rl, cb, rd, rp, destinations, pool)
+        main_loop(args, conf, controller, rl, cb, rd, rp, destinations)
     except KeyboardInterrupt:
         log.info("Interrupted by the user.")
         stop_threads(signal.SIGINT, None)
diff --git a/setup.cfg b/setup.cfg
index 0074a1d..88d4613 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -32,6 +32,7 @@ include_package_data = True
 # See stable releases at https://www.python.org/downloads/
 python_requires = >= 3.6
 install_requires =
+    psutil >= 5.5
     stem >= 1.7.0
     ; # Now versioneer is also needed as dependency
     versioneer



More information about the tor-commits mailing list