commit fe16d6d8a82e53cd62b9bdda577e79b2f6e666d9 Author: juga0 juga@riseup.net Date: Tue Jan 8 15:59:00 2019 +0000
scanner: catch SIGINT in the main loop
also split main function into an extra main_loop function to be able to stop the threads after they have started. Also check end event in the mean loop and before starting to measure a new relay.
Fixes bug #28869. Bugfix v0.1.0. --- sbws/core/scanner.py | 115 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 90 insertions(+), 25 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index 7c2a937..166ce5d 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -354,6 +354,88 @@ def result_putter_error(target): return closure
+def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, + relay_prioritizer, destinations, max_pending_results, pool): + """Starts and reuse the threads that measure the relays forever. + + It starts a loop that will be run while there is not and event signaling + that sbws is stopping (because of SIGTERM or SIGINT). + + Then, it starts a second loop with an ordered list (generator) of relays + to measure that might a subset of all the current relays in the Network. + + For every relay, it starts a new thread which runs ``measure_relay`` to + measure the relay until there are ``max_pending_results`` threads. + After that, it will reuse a thread that has finished for every relay to + measure. + It is the the pool method ``apply_async`` which starts or reuse a thread. + This method returns an ``ApplyResult`` immediately, which has a ``ready`` + methods that tells whether the thread has finished or not. + + When the thread finish, ie. ``ApplyResult`` is ``ready``, it triggers + ``result_putter`` callback, which put the ``Result`` in ``ResultDump`` + queue and complete immediately. + + ``ResultDump`` thread (started before and out of this function) will get + the ``Result`` from the queue and write it to disk, so this doesn't block + the measurement threads. + + If there was an exception not catched by ``measure_relay``, it will call + instead ``result_putter_error``, which logs the error and complete + immediately. + + Before iterating over the next relay, it waits (non blocking, since it + happens in the main thread) until one of the ``max_pending_results`` + threads has finished. + + This is not needed, since otherwise async_result will queue the relays to + measure in order and won't start reusing a thread to measure a relay until + other thread has finished. But it makes the logic a bit more sequential. + + Before the outer loop iterates, it also waits (again non blocking) that all + the ``Results`` are ready. + This avoid to start measuring the same relay which might still being + measured. + + """ + pending_results = [] + # Do not start a new loop if sbws is stopping. + while not settings.end_event.is_set(): + log.debug("Starting a new measurement loop.") + num_relays = 0 + loop_tstart = time.time() + for target in relay_prioritizer.best_priority(): + # Don't start measuring a relay if sbws is stopping. + if settings.end_event.is_set(): + break + num_relays += 1 + log.debug('Measuring %s %s', target.nickname, + target.fingerprint[0:8]) + # callback and callback_err must be non-blocking + callback = result_putter(result_dump) + callback_err = result_putter_error(target) + async_result = pool.apply_async( + dispatch_worker_thread, + [args, conf, destinations, circuit_builder, relay_list, + target], {}, callback, callback_err) + pending_results.append(async_result) + # Instead of letting apply_async to queue the relays in order until + # a thread has finished, wait here until a thread has finished. + while len(pending_results) >= max_pending_results: + # sleep is non-blocking sine happens in the main process + time.sleep(5) + pending_results = [r for r in pending_results if not r.ready()] + while len(pending_results) > 0: + log.debug("There are %s pending measurements.", + len(pending_results)) + # sleep is non-blocking sine happens in the main process + time.sleep(5) + pending_results = [r for r in pending_results if not r.ready()] + loop_tstop = time.time() + loop_tdelta = (loop_tstop - loop_tstart) / 60 + log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta) + + def run_speedtest(args, conf): global rd, pool, controller controller, _ = stem_utils.init_controller( @@ -382,31 +464,14 @@ def run_speedtest(args, conf): fail_hard(error_msg) max_pending_results = conf.getint('scanner', 'measurement_threads') pool = Pool(max_pending_results) - pending_results = [] - while True: - num_relays = 0 - loop_tstart = time.time() - log.info("Starting a new loop to measure relays.") - for target in rp.best_priority(): - num_relays += 1 - log.debug('Measuring %s %s', target.nickname, - target.fingerprint[0:8]) - callback = result_putter(rd) - callback_err = result_putter_error(target) - async_result = pool.apply_async( - dispatch_worker_thread, - [args, conf, destinations, cb, rl, target], - {}, callback, callback_err) - pending_results.append(async_result) - while len(pending_results) >= max_pending_results: - time.sleep(5) - pending_results = [r for r in pending_results if not r.ready()] - while len(pending_results) > 0: - time.sleep(5) - pending_results = [r for r in pending_results if not r.ready()] - loop_tstop = time.time() - loop_tdelta = (loop_tstop - loop_tstart) / 60 - log.info("Measured %s relays in %s minutes", num_relays, loop_tdelta) + + try: + main_loop(args, conf, controller, rl, cb, rd, rp, destinations, + max_pending_results, pool) + except KeyboardInterrupt: + log.info("Interrupted by the user.") + finally: + stop_threads(signal.SIGINT, None)
def gen_parser(sub):