commit 149e236e64bf469221a3874bcfc82e89fe408785 Merge: c523619 c24b235 Author: juga0 juga@riseup.net Date: Mon Feb 4 14:56:10 2019 +0000
Merge branch 'bug28869_squashed'
Solved merge conflicts in sbws/__init__.py and sbws/core/scanner.py after merging bug28741
sbws/__init__.py | 18 +++ sbws/core/scanner.py | 201 ++++++++++++++++++------- sbws/lib/circuitbuilder.py | 16 +- sbws/lib/resultdump.py | 10 +- sbws/util/stem.py | 9 +- tests/integration/lib/test_relayprioritizer.py | 8 +- 6 files changed, 185 insertions(+), 77 deletions(-)
diff --cc sbws/__init__.py index eea7006,3871edb..84b2787 --- a/sbws/__init__.py +++ b/sbws/__init__.py @@@ -1,18 -1,25 +1,36 @@@ __version__ = '1.0.3-dev0'
- + import threading # noqa + +from . import globals # noqa +
class Settings: + """Singleton settings for all the packages. + This way change settings can be seen by all the packages that import it. + + It lives in ``__init__.py`` to leave open the possibility of having a + ``settings.py`` module for user settings. + + .. note:: After refactoring, globals should only have constants. + Any other variable that needs to be modified when initializing + should be initialized here. + + """ def __init__(self): + # update this dict from globals (but only for ALL_CAPS settings) + for setting in dir(globals): + if setting.isupper(): + setattr(self, setting, getattr(globals, setting)) + self.end_event = threading.Event()
+ def init_http_headers(self, nickname, uuid, tor_version): + self.HTTP_HEADERS['Tor-Bandwidth-Scanner-Nickname'] = nickname + self.HTTP_HEADERS['Tor-Bandwidth-Scanner-UUID'] = uuid + self.HTTP_HEADERS['User-Agent'] += tor_version + + def set_end_event(self): + self.end_event.set() + ++ settings = Settings() # noqa diff --cc sbws/core/scanner.py index 8fe8a5b,bb7a2ca..013d4ee --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@@ -1,8 -1,7 +1,9 @@@ ''' Measure the relays. '''
+ import signal import sys +import threading +import uuid
from ..lib.circuitbuilder import GapsCircuitBuilder as CB from ..lib.resultdump import ResultDump @@@ -13,7 -12,7 +14,7 @@@ from ..lib.relayprioritizer import Rela from ..lib.destination import DestinationList from ..util.timestamp import now_isodt_str from ..util.state import State - from sbws.globals import fail_hard, TIMEOUT_MEASUREMENTS, HTTP_GET_HEADERS -from sbws.globals import fail_hard ++from sbws.globals import fail_hard, HTTP_GET_HEADERS import sbws.util.stem as stem_utils import sbws.util.requests as requests_utils from argparse import ArgumentDefaultsHelpFormatter @@@ -25,54 -23,52 +25,78 @@@ import loggin import requests import random
- from sbws import settings - + from .. import settings
rng = random.SystemRandom() - end_event = Event() log = logging.getLogger(__name__) + # Declare the objects that manage the threads global so that sbws can exit + # gracefully at any time. + pool = None + rd = None + controller = None + + + def stop_threads(signal, frame): + global rd, pool + log.debug('Stopping sbws.') + # Avoid new threads to start. + settings.set_end_event() + # Stop Pool threads + pool.close() + pool.join() + # Stop ResultDump thread + rd.thread.join() + # Stop Tor thread + controller.close() + sys.exit(0) + + + signal.signal(signal.SIGTERM, stop_threads)
+def dumpstacks(): + import traceback + log.critical("sbws stop measuring relays, probably because of a bug." + "Please, open a ticket in trac.torproject.org with this" + "backtrace.") + thread_id2name = dict([(t.ident, t.name) for t in threading.enumerate()]) + for thread_id, stack in sys._current_frames().items(): + log.critical("Thread: %s(%d)", + thread_id2name.get(thread_id, ""), thread_id) + log.critical(traceback.print_stack(stack)) + # If logging level is less than DEBUG (more verbose), start pdb so that + # developers can debug the issue. + if log.getEffectiveLevel() < logging.DEBUG: + import pdb + pdb.set_trace() + # Otherwise exit. + else: + # Change to stop threads when #28869 is merged + sys.exit(1) + + def timed_recv_from_server(session, dest, byte_range): ''' Request the **byte_range** from the URL at **dest**. If successful, return True and the time it took to download. Otherwise return False and an exception. ''' - headers = {'Range': byte_range, 'Accept-Encoding': 'identity'} + start_time = time.time() + HTTP_GET_HEADERS['Range'] = byte_range # TODO: # - What other exceptions can this throw? - # - Do we have to read the content, or did requests already do so? + # - response.elapsed "measures the time taken between sending the first + # byte of the request and finishing parsing the headers. + # It is therefore unaffected by consuming the response content" + # If this mean that the content has arrived, elapsed could be used to + # know the time it took. try: - requests_utils.get( - session, dest.url, headers=headers, verify=dest.verify) + # headers are merged with the session ones, not overwritten. + session.get(dest.url, headers=HTTP_GET_HEADERS, verify=dest.verify) + # NewConnectionError will be raised when shutting down. except (requests.exceptions.ConnectionError, - requests.exceptions.ReadTimeout) as e: + requests.exceptions.ReadTimeout, + requests.exceptions.NewConnectionError) as e: + log.debug(e) return False, e end_time = time.time() return True, end_time - start_time @@@ -379,17 -472,9 +501,17 @@@ def run_speedtest(args, conf) 'even lead to messed up results.', conf.getpath('tor', 'control_socket')) time.sleep(15) + + # When there will be a refactor where conf is global, this can be removed + # from here. + state = State(conf.getpath('paths', 'state_fname')) + # Call only once to initialize http_headers + settings.init_http_headers(conf.get('scanner', 'nickname'), state['uuid'], + str(controller.get_version())) + rl = RelayList(args, conf, controller) cb = CB(args, conf, controller, rl) - rd = ResultDump(args, conf, end_event) + rd = ResultDump(args, conf) rp = RelayPrioritizer(args, conf, rl, rd) destinations, error_msg = DestinationList.from_config( conf, cb, rl, controller) @@@ -454,13 -515,5 +552,8 @@@ def main(args, conf)
state = State(conf.getpath('paths', 'state_fname')) state['scanner_started'] = now_isodt_str() + # Generate an unique identifier for each scanner + if 'uuid' not in state: + state['uuid'] = str(uuid.uuid4())
- try: - run_speedtest(args, conf) - except KeyboardInterrupt as e: - raise e - finally: - end_event.set() + run_speedtest(args, conf)