commit dbf50208a1a76e5a17f927d92fce43e2e15778fb Author: juga0 juga@riseup.net Date: Tue Jun 29 09:20:45 2021 +0000
fix: scanner: Rename functions
to more appropriate names, after switching to concurrent. futures. --- sbws/core/scanner.py | 27 ++++++++++++++------------- tests/unit/core/test_scanner.py | 29 +++++++++++++++++------------ 2 files changed, 31 insertions(+), 25 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index bb723bf..25ad01a 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -599,7 +599,7 @@ def _next_expected_amount( return expected_amount
-def result_putter(result_dump, measurement): +def measurement_writer(result_dump, measurement): # Since result_dump thread is calling queue.get() every second, # the queue should be full for only 1 second. # This call blocks at maximum timeout seconds. @@ -615,7 +615,7 @@ def result_putter(result_dump, measurement): )
-def result_putter_error(target, exception): +def log_measurement_exception(target, exception): print("in result putter error") if settings.end_event.is_set(): return @@ -666,8 +666,8 @@ def main_loop( After that, it will reuse a thread that has finished for every relay to measure.
- Then ``wait_for_results`` is call, to obtain the results in the completed - ``future``\s. + Then ``process_completed_futures`` is call, to obtain the results in the + completed ``future``\s.
""" log.info("Started the main loop to measure the relays.") @@ -710,18 +710,19 @@ def main_loop( # `Future`s.
# Each target relay_recent_measurement_attempt is incremented in - # `wait_for_results` as well as hbeat measured fingerprints. + # `process_completed_futures` as well as hbeat measured + # fingerprints. num_relays = len(pending_results) # Without a callback, it's needed to pass `result_dump` here to # call the function that writes the measurement when it's # finished. - wait_for_results( + process_completed_futures( executor, hbeat, result_dump, pending_results, ) - force_get_results(pending_results) + wait_futures_completed(pending_results)
# Print the heartbeat message hbeat.print_heartbeat_message() @@ -742,11 +743,11 @@ def main_loop( stop_threads(signal.SIGTERM, None)
-def wait_for_results(executor, hbeat, result_dump, pending_results): +def process_completed_futures(executor, hbeat, result_dump, pending_results): """Obtain the relays' measurements as they finish.
For every ``Future`` measurements that gets completed, obtain the - ``result`` and call ``result_putter``, which put the ``Result`` in + ``result`` and call ``measurement_writer``, which put the ``Result`` in ``ResultDump`` queue and complete immediately.
``ResultDump`` thread (started before and out of this function) will get @@ -754,7 +755,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results): the measurement threads.
If there was an exception not caught by ``measure_relay``, it will call - instead ``result_putter_error``, which logs the error and complete + instead ``log_measurement_exception``, which logs the error and complete immediately.
""" @@ -779,7 +780,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results): try: measurement = future_measurement.result() except Exception as e: - result_putter_error(target, e) + log_measurement_exception(target, e) import psutil
log.warning(psutil.Process(os.getpid()).memory_full_info()) @@ -791,7 +792,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results): dumpstacks() else: log.info("Measurement ready: %s" % (measurement)) - result_putter(result_dump, measurement) + measurement_writer(result_dump, measurement) # `pending_results` has all the initial queued `Future`s, # they don't decrease as they get completed, but we know 1 has be # completed in each loop, @@ -803,7 +804,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results): )
-def force_get_results(pending_results): +def wait_futures_completed(pending_results): """Wait for last futures to finish, before starting new loop.""" log.info("Wait for any remaining measurements.") done, not_done = concurrent.futures.wait( diff --git a/tests/unit/core/test_scanner.py b/tests/unit/core/test_scanner.py index f7ec69d..f805d50 100644 --- a/tests/unit/core/test_scanner.py +++ b/tests/unit/core/test_scanner.py @@ -14,24 +14,26 @@ from sbws.lib.relayprioritizer import RelayPrioritizer log = logging.getLogger(__name__)
-def test_result_putter(sbwshome_only_datadir, result_success, rd, end_event): +def test_measurement_writer( + sbwshome_only_datadir, result_success, rd, end_event +): if rd is None: pytest.skip("ResultDump is None") # Put one item in the queue - scanner.result_putter(rd, result_success) + scanner.measurement_writer(rd, result_success) assert rd.queue.qsize() == 1
# Make queue maxsize 1, so that it'll be full after the first callback. # The second callback will wait 1 second, then the queue will be empty # again. rd.queue.maxsize = 1 - scanner.result_putter(rd, result_success) + scanner.measurement_writer(rd, result_success) # after putting 1 result, the queue will be full assert rd.queue.qsize() == 1 assert rd.queue.full() # it's still possible to put another results, because the callback will # wait 1 second and the queue will be empty again. - scanner.result_putter(rd, result_success) + scanner.measurement_writer(rd, result_success) assert rd.queue.qsize() == 1 assert rd.queue.full() end_event.set() @@ -49,9 +51,10 @@ def test_complete_measurements( ): """ Test that the ``ThreadPoolExecutor``` creates the epexted number of - futures, ``wait_for_results``process all of them and ``force_get_results`` - completes them if they were not already completed by the time - ``wait_for_results`` has already processed them. + futures, ``process_completed_futures``process all of them and + ``wait_futures_completed`` completes them if they were not already + completed by the time ``process_completed_futures`` has already processed + them. There are not real measurements done and the ``results`` are None objects. Running the scanner with the test network, test the real measurements.
@@ -90,12 +93,14 @@ def test_complete_measurements(
assert len(pending_results) == 321 assert len(hbeat.measured_fp_set) == 0 - log.debug("Before wait_for_results.") - scanner.wait_for_results(executor, hbeat, rd, pending_results) - log.debug("After wait_for_results") + log.debug("Before process_completed_futures.") + scanner.process_completed_futures( + executor, hbeat, rd, pending_results + ) + log.debug("After process_completed_futures") for pending_result in pending_results: assert pending_result.done() is True assert len(hbeat.measured_fp_set) == 321 - scanner.force_get_results(pending_results) - log.debug("After force_get_results.") + scanner.wait_futures_completed(pending_results) + log.debug("After wait_futures_completed.") assert concurrent.futures.ALL_COMPLETED