commit dbf50208a1a76e5a17f927d92fce43e2e15778fb
Author: juga0 <juga(a)riseup.net>
Date: Tue Jun 29 09:20:45 2021 +0000
fix: scanner: Rename functions
to more appropriate names, after switching to concurrent. futures.
---
sbws/core/scanner.py | 27 ++++++++++++++-------------
tests/unit/core/test_scanner.py | 29 +++++++++++++++++------------
2 files changed, 31 insertions(+), 25 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index bb723bf..25ad01a 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -599,7 +599,7 @@ def _next_expected_amount(
return expected_amount
-def result_putter(result_dump, measurement):
+def measurement_writer(result_dump, measurement):
# Since result_dump thread is calling queue.get() every second,
# the queue should be full for only 1 second.
# This call blocks at maximum timeout seconds.
@@ -615,7 +615,7 @@ def result_putter(result_dump, measurement):
)
-def result_putter_error(target, exception):
+def log_measurement_exception(target, exception):
print("in result putter error")
if settings.end_event.is_set():
return
@@ -666,8 +666,8 @@ def main_loop(
After that, it will reuse a thread that has finished for every relay to
measure.
- Then ``wait_for_results`` is call, to obtain the results in the completed
- ``future``\s.
+ Then ``process_completed_futures`` is call, to obtain the results in the
+ completed ``future``\s.
"""
log.info("Started the main loop to measure the relays.")
@@ -710,18 +710,19 @@ def main_loop(
# `Future`s.
# Each target relay_recent_measurement_attempt is incremented in
- # `wait_for_results` as well as hbeat measured fingerprints.
+ # `process_completed_futures` as well as hbeat measured
+ # fingerprints.
num_relays = len(pending_results)
# Without a callback, it's needed to pass `result_dump` here to
# call the function that writes the measurement when it's
# finished.
- wait_for_results(
+ process_completed_futures(
executor,
hbeat,
result_dump,
pending_results,
)
- force_get_results(pending_results)
+ wait_futures_completed(pending_results)
# Print the heartbeat message
hbeat.print_heartbeat_message()
@@ -742,11 +743,11 @@ def main_loop(
stop_threads(signal.SIGTERM, None)
-def wait_for_results(executor, hbeat, result_dump, pending_results):
+def process_completed_futures(executor, hbeat, result_dump, pending_results):
"""Obtain the relays' measurements as they finish.
For every ``Future`` measurements that gets completed, obtain the
- ``result`` and call ``result_putter``, which put the ``Result`` in
+ ``result`` and call ``measurement_writer``, which put the ``Result`` in
``ResultDump`` queue and complete immediately.
``ResultDump`` thread (started before and out of this function) will get
@@ -754,7 +755,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results):
the measurement threads.
If there was an exception not caught by ``measure_relay``, it will call
- instead ``result_putter_error``, which logs the error and complete
+ instead ``log_measurement_exception``, which logs the error and complete
immediately.
"""
@@ -779,7 +780,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results):
try:
measurement = future_measurement.result()
except Exception as e:
- result_putter_error(target, e)
+ log_measurement_exception(target, e)
import psutil
log.warning(psutil.Process(os.getpid()).memory_full_info())
@@ -791,7 +792,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results):
dumpstacks()
else:
log.info("Measurement ready: %s" % (measurement))
- result_putter(result_dump, measurement)
+ measurement_writer(result_dump, measurement)
# `pending_results` has all the initial queued `Future`s,
# they don't decrease as they get completed, but we know 1 has be
# completed in each loop,
@@ -803,7 +804,7 @@ def wait_for_results(executor, hbeat, result_dump, pending_results):
)
-def force_get_results(pending_results):
+def wait_futures_completed(pending_results):
"""Wait for last futures to finish, before starting new loop."""
log.info("Wait for any remaining measurements.")
done, not_done = concurrent.futures.wait(
diff --git a/tests/unit/core/test_scanner.py b/tests/unit/core/test_scanner.py
index f7ec69d..f805d50 100644
--- a/tests/unit/core/test_scanner.py
+++ b/tests/unit/core/test_scanner.py
@@ -14,24 +14,26 @@ from sbws.lib.relayprioritizer import RelayPrioritizer
log = logging.getLogger(__name__)
-def test_result_putter(sbwshome_only_datadir, result_success, rd, end_event):
+def test_measurement_writer(
+ sbwshome_only_datadir, result_success, rd, end_event
+):
if rd is None:
pytest.skip("ResultDump is None")
# Put one item in the queue
- scanner.result_putter(rd, result_success)
+ scanner.measurement_writer(rd, result_success)
assert rd.queue.qsize() == 1
# Make queue maxsize 1, so that it'll be full after the first callback.
# The second callback will wait 1 second, then the queue will be empty
# again.
rd.queue.maxsize = 1
- scanner.result_putter(rd, result_success)
+ scanner.measurement_writer(rd, result_success)
# after putting 1 result, the queue will be full
assert rd.queue.qsize() == 1
assert rd.queue.full()
# it's still possible to put another results, because the callback will
# wait 1 second and the queue will be empty again.
- scanner.result_putter(rd, result_success)
+ scanner.measurement_writer(rd, result_success)
assert rd.queue.qsize() == 1
assert rd.queue.full()
end_event.set()
@@ -49,9 +51,10 @@ def test_complete_measurements(
):
"""
Test that the ``ThreadPoolExecutor``` creates the epexted number of
- futures, ``wait_for_results``process all of them and ``force_get_results``
- completes them if they were not already completed by the time
- ``wait_for_results`` has already processed them.
+ futures, ``process_completed_futures``process all of them and
+ ``wait_futures_completed`` completes them if they were not already
+ completed by the time ``process_completed_futures`` has already processed
+ them.
There are not real measurements done and the ``results`` are None objects.
Running the scanner with the test network, test the real measurements.
@@ -90,12 +93,14 @@ def test_complete_measurements(
assert len(pending_results) == 321
assert len(hbeat.measured_fp_set) == 0
- log.debug("Before wait_for_results.")
- scanner.wait_for_results(executor, hbeat, rd, pending_results)
- log.debug("After wait_for_results")
+ log.debug("Before process_completed_futures.")
+ scanner.process_completed_futures(
+ executor, hbeat, rd, pending_results
+ )
+ log.debug("After process_completed_futures")
for pending_result in pending_results:
assert pending_result.done() is True
assert len(hbeat.measured_fp_set) == 321
- scanner.force_get_results(pending_results)
- log.debug("After force_get_results.")
+ scanner.wait_futures_completed(pending_results)
+ log.debug("After wait_futures_completed.")
assert concurrent.futures.ALL_COMPLETED