commit 969bfbbfc32af87b424dbbff168f2f4992033326
Author: George Kadianakis <desnacked(a)riseup.net>
Date: Thu Mar 21 19:48:44 2019 +0200
Bake more details into the heartbeat module and out of the main loop.
---
sbws/core/scanner.py | 24 ++++++------
sbws/lib/heartbeat.py | 85 +++++++++++++++++++++++++---------------
tests/unit/lib/test_heartbeat.py | 22 ++++++-----
3 files changed, 78 insertions(+), 53 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index f6443e5..b05f6e2 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -33,7 +33,7 @@ import requests
import random
from .. import settings
-from ..lib import heartbeat
+from ..lib.heartbeat import Heartbeat
rng = random.SystemRandom()
log = logging.getLogger(__name__)
@@ -479,12 +479,7 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
measured.
"""
- # Variable to count total progress in the last days:
- # In case it is needed to see which relays are not being measured,
- # store their fingerprint, not only their number.
- measured_fp_set = set()
- measured_percent = 0
- main_loop_tstart = time.monotonic()
+ hbeat = Heartbeat(conf.getpath('paths', 'state_fname'))
# Set the time to wait for a thread to finish as the half of an HTTP
# request timeout.
@@ -496,6 +491,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
# long, set it here and not outside the loop.
pending_results = []
loop_tstart = time.time()
+
+ # Register relay fingerprints to the heartbeat module
+ hbeat.register_consensus_fprs(relay_list.relays_fingerprints)
+
for target in relay_prioritizer.best_priority():
# Don't start measuring a relay if sbws is stopping.
if settings.end_event.is_set():
@@ -511,7 +510,10 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
[args, conf, destinations, circuit_builder, relay_list,
target], {}, callback, callback_err)
pending_results.append(async_result)
- measured_fp_set.add(async_result)
+
+ # Register this measurement to the heartbeat module
+ hbeat.register_measured_fpr(target.fingerprint)
+
# After the for has finished, the pool has queued all the relays
# and pending_results has the list of all the AsyncResults.
# It could also be obtained with pool._cache, which contains
@@ -519,10 +521,8 @@ def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
num_relays_to_measure = len(pending_results)
wait_for_results(num_relays_to_measure, pending_results)
- measured_percent = heartbeat.total_measured_percent(
- measured_percent, relay_list.relays_fingerprints, measured_fp_set,
- main_loop_tstart, conf.getpath('paths', 'state_fname')
- )
+ # Print the heartbeat message
+ hbeat.print_heartbeat_message()
loop_tstop = time.time()
loop_tdelta = (loop_tstop - loop_tstart) / 60
diff --git a/sbws/lib/heartbeat.py b/sbws/lib/heartbeat.py
index 7dfa716..ad7fd88 100644
--- a/sbws/lib/heartbeat.py
+++ b/sbws/lib/heartbeat.py
@@ -9,38 +9,61 @@ from ..util.state import State
log = logging.getLogger(__name__)
-# NOTE tech-debt: this could go be tracked globally as a singleton
-consensus_fp_set = set()
+class Heartbeat(object):
+ """
+ Tracks current status of sbws and is capable of printing periodic
+ information about the current state
+ """
-def total_measured_percent(measured_percent, relays_fingerprints,
- measured_fp_set, main_loop_tstart, state_path):
- """Returns the new percentage of the different relays that were measured.
+ def __init__(self, state_path):
+ # Variable to count total progress in the last days:
+ # In case it is needed to see which relays are not being measured,
+ # store their fingerprint, not only their number.
+ self.measured_fp_set = set()
+ self.consensus_fp_set = set()
+ self.measured_percent = 0
+ self.main_loop_tstart = time.monotonic()
- This way it can be known whether the scanner is making progress measuring
- all the Network.
+ self.state_dict = State(state_path)
- Log the percentage, the number of relays measured and not measured,
- the number of loops and the time elapsed since it started measuring.
- """
- global consensus_fp_set
- # NOTE: in a future refactor make State a singleton in __init__.py
- state_dict = State(state_path)
- loops_count = state_dict.get('recent_priority_list_count', 0)
-
- # Store all the relays seen in all the consensuses.
- [consensus_fp_set.add(r) for r in relays_fingerprints]
-
- not_measured_fp_set = consensus_fp_set.difference(measured_fp_set)
- main_loop_tdelta = (time.monotonic() - main_loop_tstart) / 60
- new_measured_percent = round(
- len(measured_fp_set) / len(consensus_fp_set) * 100)
- log.info("Run %s main loops.", loops_count)
- log.info("Measured in total %s (%s%%) unique relays in %s minutes",
- len(measured_fp_set), new_measured_percent, main_loop_tdelta)
- log.info("%s relays still not measured.", len(not_measured_fp_set))
- # The case when it is equal will only happen when all the relays have been
- # measured.
- if (new_measured_percent <= measured_percent):
- log.warning("There is no progress measuring relays!.")
- return new_measured_percent
+ self.previous_measurement_percent = 0
+
+ def register_measured_fpr(self, async_result):
+ self.measured_fp_set.add(async_result)
+
+ def register_consensus_fprs(self, relay_fprs):
+ for r in relay_fprs:
+ self.consensus_fp_set.add(r)
+
+ def print_heartbeat_message(self):
+ """Print the new percentage of the different relays that were measured.
+
+ This way it can be known whether the scanner is making progress
+ measuring all the Network.
+
+ Log the percentage, the number of relays measured and not measured,
+ the number of loops and the time elapsed since it started measuring.
+ """
+ loops_count = self.state_dict.get('recent_priority_list_count', 0)
+
+ not_measured_fp_set = self.consensus_fp_set.difference(
+ self.measured_fp_set
+ )
+ main_loop_tdelta = (time.monotonic() - self.main_loop_tstart) / 60
+ new_measured_percent = round(
+ len(self.measured_fp_set) / len(self.consensus_fp_set) * 100
+ )
+
+ log.info("Run %s main loops.", loops_count)
+ log.info("Measured in total %s (%s%%) unique relays in %s minutes",
+ len(self.measured_fp_set), new_measured_percent,
+ main_loop_tdelta)
+ log.info("%s relays still not measured.", len(not_measured_fp_set))
+
+ # The case when it is equal will only happen when all the relays
+ # have been measured.
+ if (new_measured_percent <= self.previous_measurement_percent):
+ log.warning("There is no progress measuring relays!.")
+
+ self.previous_measurement_percent = new_measured_percent
diff --git a/tests/unit/lib/test_heartbeat.py b/tests/unit/lib/test_heartbeat.py
index 55573a8..5c82a4b 100644
--- a/tests/unit/lib/test_heartbeat.py
+++ b/tests/unit/lib/test_heartbeat.py
@@ -1,21 +1,23 @@
"""Unit tests for heartbeat"""
import logging
-import time
from sbws.lib import heartbeat
def test_total_measured_percent(conf, caplog):
- measured_percent = 0
- measured_fp_set = set(['A', 'B'])
- main_loop_tstart = time.monotonic()
- relays_fingerprints = set(['A', 'B', 'C'])
+ hbeat = heartbeat.Heartbeat(conf.getpath('paths', 'state_fname'))
+
+ hbeat.register_consensus_fprs(['A', 'B', 'C'])
+
+ hbeat.register_measured_fpr('A')
+ hbeat.register_measured_fpr('B')
caplog.set_level(logging.INFO)
- new_measured_percent = heartbeat.total_measured_percent(
- measured_percent, relays_fingerprints, measured_fp_set,
- main_loop_tstart, conf.getpath('paths', 'state_fname')
- )
- assert new_measured_percent == 67
+
+ assert hbeat.previous_measurement_percent == 0
+
+ hbeat.print_heartbeat_message()
+
+ assert hbeat.previous_measurement_percent == 67
caplog.records[1].getMessage().find("Measured in total 2 (67%)")
caplog.records[2].getMessage().find("1 relays still not measured")