commit 65e7e29de0746b69265bd24451d9f59bbec1921c Merge: 09ea037 adcaf04 Author: juga0 juga@riseup.net Date: Sat Feb 23 11:32:38 2019 +0000
Merge branch 'bug28933_01'
Solved conflicts: sbws/core/scanner.py tox.ini
sbws/core/scanner.py | 6 +++++- tests/integration/sbws_testnet.ini | 26 ++++++++++++++++++++++++++ tox.ini | 8 +++++++- 3 files changed, 38 insertions(+), 2 deletions(-)
diff --cc sbws/core/scanner.py index 72df4c0,791d719..b486dd9 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@@ -391,111 -330,7 +391,115 @@@ def result_putter_error(target) return closure
+def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump, + relay_prioritizer, destinations, max_pending_results, pool): + """Starts and reuse the threads that measure the relays forever. + + It starts a loop that will be run while there is not and event signaling + that sbws is stopping (because of SIGTERM or SIGINT). + + Then, it starts a second loop with an ordered list (generator) of relays + to measure that might a subset of all the current relays in the Network. + + For every relay, it starts a new thread which runs ``measure_relay`` to + measure the relay until there are ``max_pending_results`` threads. + After that, it will reuse a thread that has finished for every relay to + measure. + It is the the pool method ``apply_async`` which starts or reuse a thread. + This method returns an ``ApplyResult`` immediately, which has a ``ready`` + methods that tells whether the thread has finished or not. + + When the thread finish, ie. ``ApplyResult`` is ``ready``, it triggers + ``result_putter`` callback, which put the ``Result`` in ``ResultDump`` + queue and complete immediately. + + ``ResultDump`` thread (started before and out of this function) will get + the ``Result`` from the queue and write it to disk, so this doesn't block + the measurement threads. + + If there was an exception not catched by ``measure_relay``, it will call + instead ``result_putter_error``, which logs the error and complete + immediately. + + Before iterating over the next relay, it waits (non blocking, since it + happens in the main thread) until one of the ``max_pending_results`` + threads has finished. + + This is not needed, since otherwise async_result will queue the relays to + measure in order and won't start reusing a thread to measure a relay until + other thread has finished. But it makes the logic a bit more sequential. + + Before the outer loop iterates, it also waits (again non blocking) that all + the ``Results`` are ready. + This avoid to start measuring the same relay which might still being + measured. + + """ + pending_results = [] + # Set the time to wait for a thread to finish as the half of an HTTP + # request timeout. + time_to_sleep = conf.getfloat('general', 'http_timeout') / 2 + # Do not start a new loop if sbws is stopping. + while not settings.end_event.is_set(): + log.debug("Starting a new measurement loop.") + num_relays = 0 + loop_tstart = time.time() + for target in relay_prioritizer.best_priority(): + # Don't start measuring a relay if sbws is stopping. + if settings.end_event.is_set(): + break + num_relays += 1 + # callback and callback_err must be non-blocking + callback = result_putter(result_dump) + callback_err = result_putter_error(target) + async_result = pool.apply_async( + dispatch_worker_thread, + [args, conf, destinations, circuit_builder, relay_list, + target], {}, callback, callback_err) + pending_results.append(async_result) + # Instead of letting apply_async to queue the relays in order until + # a thread has finished, wait here until a thread has finished. + while len(pending_results) >= max_pending_results: + # sleep is non-blocking since happens in the main process. + time.sleep(time_to_sleep) + pending_results = [r for r in pending_results if not r.ready()] + time_waiting = 0 + while (len(pending_results) > 0 + and time_waiting <= TIMEOUT_MEASUREMENTS): + log.debug("Number of pending measurement threads %s after " + "a prioritization loop.", len(pending_results)) + time.sleep(time_to_sleep) + time_waiting += time_to_sleep + pending_results = [r for r in pending_results if not r.ready()] + if time_waiting > TIMEOUT_MEASUREMENTS: + dumpstacks() + loop_tstop = time.time() + loop_tdelta = (loop_tstop - loop_tstart) / 60 + log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta) - ++ # In a testing network, exit after first loop ++ if controller.get_conf('TestingTorNetwork') == '1': ++ log.info("In a testing network, exiting after the first loop.") ++ # Threads should be closed nicely in some refactor ++ exit(0) + def run_speedtest(args, conf): + """Initializes all the data and threads needed to measure the relays. + + It launches or connect to Tor in a thread. + It initializes the list of relays seen in the Tor network. + It starts a thread to read the previous measurements and wait for new + measurements to write them to the disk. + It initializes a class that will be used to order the relays depending + on their measurements age. + It initializes the list of destinations that will be used for the + measurements. + It initializes the thread pool that will launch the measurement threads. + The pool starts 3 other threads that are not the measurement (worker) + threads. + Finally, it calls the function that will manage the measurement threads. + + """ + global rd, pool, controller controller, _ = stem_utils.init_controller( path=conf.getpath('tor', 'control_socket')) if not controller: diff --cc tox.ini index 4aca0ca,d8fd1c4..5094b78 --- a/tox.ini +++ b/tox.ini @@@ -43,16 -43,21 +43,22 @@@ whitelist_externals bash sleep wget + mkdir commands = - tar -C {envtmpdir} -vxf {toxinidir}/tests/integration/net.tar + cp -af {toxinidir}/tests/integration/net {envtmpdir} bash {envtmpdir}/net/start.sh bash -c "time python3 {envtmpdir}/net/wait.py {envtmpdir}/net/{auth,relay,exit}*" bash -c "python3 {toxinidir}/scripts/tools/sbws-http-server.py --port 28888 &>/dev/null &" - sleep 15 + sleep 1 wget -O/dev/null http://127.0.0.1:28888/sbws.bin + ; Run actually the scanner + mkdir -p /tmp/.sbws + ; This add around 3min more to the tests + sbws -c {toxinidir}/tests/integration/sbws_testnet.ini scanner coverage run -a --rcfile={toxinidir}/.coveragerc --source=sbws -m pytest -s {toxinidir}/tests/integration -vv bash {envtmpdir}/net/stop.sh + # no need to remove .tox/net directory. + rm -rf /tmp/.sbws
[testenv:lint] skip_install = True