commit 3653396c9daa9170033fd2b706cd9d290eac6f6b Author: juga0 juga@riseup.net Date: Thu May 13 12:10:35 2021 +0000
fix: scanner: Increase time getting measurements
- Increase the time waiting for the last measurements queued, to avoid canceling unfinished measurements and gc maybe not releasing thread variables - Use the already declared global pool instead of passing it by args - Log more information when the last measuremetns timeout
Closes: #40087 --- sbws/core/scanner.py | 30 +++++++++++++++++++++++++----- setup.cfg | 1 + 2 files changed, 26 insertions(+), 5 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index dbcceb9..7637028 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -15,7 +15,12 @@ from multiprocessing.dummy import Pool
import sbws.util.requests as requests_utils import sbws.util.stem as stem_utils -from sbws.globals import HTTP_GET_HEADERS, TIMEOUT_MEASUREMENTS, fail_hard +from sbws.globals import ( + HTTP_GET_HEADERS, + SOCKET_TIMEOUT, + TIMEOUT_MEASUREMENTS, + fail_hard, +)
from .. import settings from ..lib.circuitbuilder import GapsCircuitBuilder as CB @@ -77,7 +82,7 @@ def dumpstacks(): log.critical( "Thread: %s(%d)", thread_id2name.get(thread_id, ""), thread_id ) - log.critical(traceback.format_stack("".join(stack))) + log.critical("Traceback: %s", "".join(traceback.format_stack(stack))) # If logging level is less than DEBUG (more verbose), start pdb so that # developers can debug the issue. if log.getEffectiveLevel() < logging.DEBUG: @@ -660,7 +665,6 @@ def main_loop( result_dump, relay_prioritizer, destinations, - pool, ): """Starts and reuse the threads that measure the relays forever.
@@ -696,6 +700,7 @@ def main_loop( measured.
""" + global pool log.info("Started the main loop to measure the relays.") hbeat = Heartbeat(conf.getpath("paths", "state_fname"))
@@ -743,6 +748,7 @@ def main_loop( # Register this measurement to the heartbeat module hbeat.register_measured_fpr(target.fingerprint)
+ log.debug("Measurements queued.") # After the for has finished, the pool has queued all the relays # and pending_results has the list of all the AsyncResults. # It could also be obtained with pool._cache, which contains @@ -815,6 +821,7 @@ def wait_for_results(num_relays_to_measure, pending_results): len(pending_results), num_relays_to_measure, ) + log.info("Last measured: %s", num_last_measured) time.sleep(TIMEOUT_MEASUREMENTS) old_pending_results = pending_results pending_results = [r for r in pending_results if not r.ready()] @@ -836,15 +843,28 @@ def force_get_results(pending_results): ``get`` is not call before, because it blocks and the callbacks are not call. """ + global pool log.debug("Forcing get") + # In case there are no finished AsyncResults, print the cache here + # at level info so that is visible even if debug is not enabled. + log.info("Pool cache %s", pool._cache) for r in pending_results: try: - result = r.get(timeout=0.1) + # HTTP timeout is 10 + result = r.get(timeout=SOCKET_TIMEOUT + 10) log.warning("Result %s was not stored, it took too long.", result) # TimeoutError is raised when the result is not ready, ie. has not # been processed yet except TimeoutError: log.warning("A result was not stored, it was not ready.") + # This is the only place where using psutil so far. + import psutil + + log.warning(psutil.Process(os.getpid()).memory_full_info()) + virtualMemoryInfo = psutil.virtual_memory() + availableMemory = virtualMemoryInfo.available + log.warning("Memory available %s MB.", availableMemory / 1024 ** 2) + dumpstacks() # If the result raised an exception, `get` returns it, # then log any exception so that it can be fixed. # This should not happen, since `callback_err` would have been call @@ -910,7 +930,7 @@ def run_speedtest(args, conf): max_pending_results = conf.getint("scanner", "measurement_threads") pool = Pool(max_pending_results) try: - main_loop(args, conf, controller, rl, cb, rd, rp, destinations, pool) + main_loop(args, conf, controller, rl, cb, rd, rp, destinations) except KeyboardInterrupt: log.info("Interrupted by the user.") stop_threads(signal.SIGINT, None) diff --git a/setup.cfg b/setup.cfg index 0074a1d..88d4613 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,7 @@ include_package_data = True # See stable releases at https://www.python.org/downloads/ python_requires = >= 3.6 install_requires = + psutil >= 5.5 stem >= 1.7.0 ; # Now versioneer is also needed as dependency versioneer