commit 969bfbbfc32af87b424dbbff168f2f4992033326 Author: George Kadianakis desnacked@riseup.net Date: Thu Mar 21 19:48:44 2019 +0200
Bake more details into the heartbeat module and out of the main loop. --- sbws/core/scanner.py | 24 ++++++------ sbws/lib/heartbeat.py | 85 +++++++++++++++++++++++++--------------- tests/unit/lib/test_heartbeat.py | 22 ++++++----- 3 files changed, 78 insertions(+), 53 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index f6443e5..b05f6e2 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -33,7 +33,7 @@ import requests import random
from .. import settings -from ..lib import heartbeat +from ..lib.heartbeat import Heartbeat
rng = random.SystemRandom() log = logging.getLogger(__name__) @@ -479,12 +479,7 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, measured.
""" - # Variable to count total progress in the last days: - # In case it is needed to see which relays are not being measured, - # store their fingerprint, not only their number. - measured_fp_set = set() - measured_percent = 0 - main_loop_tstart = time.monotonic() + hbeat = Heartbeat(conf.getpath('paths', 'state_fname'))
# Set the time to wait for a thread to finish as the half of an HTTP # request timeout. @@ -496,6 +491,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, # long, set it here and not outside the loop. pending_results = [] loop_tstart = time.time() + + # Register relay fingerprints to the heartbeat module + hbeat.register_consensus_fprs(relay_list.relays_fingerprints) + for target in relay_prioritizer.best_priority(): # Don't start measuring a relay if sbws is stopping. if settings.end_event.is_set(): @@ -511,7 +510,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, [args, conf, destinations, circuit_builder, relay_list, target], {}, callback, callback_err) pending_results.append(async_result) - measured_fp_set.add(async_result) + + # Register this measurement to the heartbeat module + hbeat.register_measured_fpr(target.fingerprint) + # After the for has finished, the pool has queued all the relays # and pending_results has the list of all the AsyncResults. # It could also be obtained with pool._cache, which contains @@ -519,10 +521,8 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, num_relays_to_measure = len(pending_results) wait_for_results(num_relays_to_measure, pending_results)
- measured_percent = heartbeat.total_measured_percent( - measured_percent, relay_list.relays_fingerprints, measured_fp_set, - main_loop_tstart, conf.getpath('paths', 'state_fname') - ) + # Print the heartbeat message + hbeat.print_heartbeat_message()
loop_tstop = time.time() loop_tdelta = (loop_tstop - loop_tstart) / 60 diff --git a/sbws/lib/heartbeat.py b/sbws/lib/heartbeat.py index 7dfa716..ad7fd88 100644 --- a/sbws/lib/heartbeat.py +++ b/sbws/lib/heartbeat.py @@ -9,38 +9,61 @@ from ..util.state import State
log = logging.getLogger(__name__)
-# NOTE tech-debt: this could go be tracked globally as a singleton -consensus_fp_set = set()
+class Heartbeat(object): + """ + Tracks current status of sbws and is capable of printing periodic + information about the current state + """
-def total_measured_percent(measured_percent, relays_fingerprints, - measured_fp_set, main_loop_tstart, state_path): - """Returns the new percentage of the different relays that were measured. + def __init__(self, state_path): + # Variable to count total progress in the last days: + # In case it is needed to see which relays are not being measured, + # store their fingerprint, not only their number. + self.measured_fp_set = set() + self.consensus_fp_set = set() + self.measured_percent = 0 + self.main_loop_tstart = time.monotonic()
- This way it can be known whether the scanner is making progress measuring - all the Network. + self.state_dict = State(state_path)
- Log the percentage, the number of relays measured and not measured, - the number of loops and the time elapsed since it started measuring. - """ - global consensus_fp_set - # NOTE: in a future refactor make State a singleton in __init__.py - state_dict = State(state_path) - loops_count = state_dict.get('recent_priority_list_count', 0) - - # Store all the relays seen in all the consensuses. - [consensus_fp_set.add(r) for r in relays_fingerprints] - - not_measured_fp_set = consensus_fp_set.difference(measured_fp_set) - main_loop_tdelta = (time.monotonic() - main_loop_tstart) / 60 - new_measured_percent = round( - len(measured_fp_set) / len(consensus_fp_set) * 100) - log.info("Run %s main loops.", loops_count) - log.info("Measured in total %s (%s%%) unique relays in %s minutes", - len(measured_fp_set), new_measured_percent, main_loop_tdelta) - log.info("%s relays still not measured.", len(not_measured_fp_set)) - # The case when it is equal will only happen when all the relays have been - # measured. - if (new_measured_percent <= measured_percent): - log.warning("There is no progress measuring relays!.") - return new_measured_percent + self.previous_measurement_percent = 0 + + def register_measured_fpr(self, async_result): + self.measured_fp_set.add(async_result) + + def register_consensus_fprs(self, relay_fprs): + for r in relay_fprs: + self.consensus_fp_set.add(r) + + def print_heartbeat_message(self): + """Print the new percentage of the different relays that were measured. + + This way it can be known whether the scanner is making progress + measuring all the Network. + + Log the percentage, the number of relays measured and not measured, + the number of loops and the time elapsed since it started measuring. + """ + loops_count = self.state_dict.get('recent_priority_list_count', 0) + + not_measured_fp_set = self.consensus_fp_set.difference( + self.measured_fp_set + ) + main_loop_tdelta = (time.monotonic() - self.main_loop_tstart) / 60 + new_measured_percent = round( + len(self.measured_fp_set) / len(self.consensus_fp_set) * 100 + ) + + log.info("Run %s main loops.", loops_count) + log.info("Measured in total %s (%s%%) unique relays in %s minutes", + len(self.measured_fp_set), new_measured_percent, + main_loop_tdelta) + log.info("%s relays still not measured.", len(not_measured_fp_set)) + + # The case when it is equal will only happen when all the relays + # have been measured. + if (new_measured_percent <= self.previous_measurement_percent): + log.warning("There is no progress measuring relays!.") + + self.previous_measurement_percent = new_measured_percent diff --git a/tests/unit/lib/test_heartbeat.py b/tests/unit/lib/test_heartbeat.py index 55573a8..5c82a4b 100644 --- a/tests/unit/lib/test_heartbeat.py +++ b/tests/unit/lib/test_heartbeat.py @@ -1,21 +1,23 @@ """Unit tests for heartbeat""" import logging -import time
from sbws.lib import heartbeat
def test_total_measured_percent(conf, caplog): - measured_percent = 0 - measured_fp_set = set(['A', 'B']) - main_loop_tstart = time.monotonic() - relays_fingerprints = set(['A', 'B', 'C']) + hbeat = heartbeat.Heartbeat(conf.getpath('paths', 'state_fname')) + + hbeat.register_consensus_fprs(['A', 'B', 'C']) + + hbeat.register_measured_fpr('A') + hbeat.register_measured_fpr('B')
caplog.set_level(logging.INFO) - new_measured_percent = heartbeat.total_measured_percent( - measured_percent, relays_fingerprints, measured_fp_set, - main_loop_tstart, conf.getpath('paths', 'state_fname') - ) - assert new_measured_percent == 67 + + assert hbeat.previous_measurement_percent == 0 + + hbeat.print_heartbeat_message() + + assert hbeat.previous_measurement_percent == 67 caplog.records[1].getMessage().find("Measured in total 2 (67%)") caplog.records[2].getMessage().find("1 relays still not measured")
tor-commits@lists.torproject.org