[tor-commits] [sbws/master] Merge branch 'bug28869_squashed'

juga at torproject.org juga at torproject.org
Mon Feb 4 14:57:14 UTC 2019


commit 149e236e64bf469221a3874bcfc82e89fe408785
Merge: c523619 c24b235
Author: juga0 <juga at riseup.net>
Date:   Mon Feb 4 14:56:10 2019 +0000

    Merge branch 'bug28869_squashed'
    
    Solved merge conflicts in sbws/__init__.py and sbws/core/scanner.py
    after merging bug28741

 sbws/__init__.py                               |  18 +++
 sbws/core/scanner.py                           | 201 ++++++++++++++++++-------
 sbws/lib/circuitbuilder.py                     |  16 +-
 sbws/lib/resultdump.py                         |  10 +-
 sbws/util/stem.py                              |   9 +-
 tests/integration/lib/test_relayprioritizer.py |   8 +-
 6 files changed, 185 insertions(+), 77 deletions(-)

diff --cc sbws/__init__.py
index eea7006,3871edb..84b2787
--- a/sbws/__init__.py
+++ b/sbws/__init__.py
@@@ -1,18 -1,25 +1,36 @@@
  __version__ = '1.0.3-dev0'
  
 -
+ import threading  # noqa
+ 
 +from . import globals  # noqa
 +
  
  class Settings:
+     """Singleton settings for all the packages.
+     This way change settings can be seen by all the packages that import it.
+ 
+     It lives in ``__init__.py`` to leave open the possibility of having a
+     ``settings.py`` module for user settings.
+ 
+     .. note:: After refactoring, globals should only have constants.
+       Any other variable that needs to be modified when initializing
+       should be initialized here.
+ 
+     """
      def __init__(self):
 +        # update this dict from globals (but only for ALL_CAPS settings)
 +        for setting in dir(globals):
 +            if setting.isupper():
 +                setattr(self, setting, getattr(globals, setting))
+         self.end_event = threading.Event()
  
 +    def init_http_headers(self, nickname, uuid, tor_version):
 +        self.HTTP_HEADERS['Tor-Bandwidth-Scanner-Nickname'] = nickname
 +        self.HTTP_HEADERS['Tor-Bandwidth-Scanner-UUID'] = uuid
 +        self.HTTP_HEADERS['User-Agent'] += tor_version
 +
+     def set_end_event(self):
+         self.end_event.set()
+ 
++
  settings = Settings()  # noqa
diff --cc sbws/core/scanner.py
index 8fe8a5b,bb7a2ca..013d4ee
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@@ -1,8 -1,7 +1,9 @@@
  ''' Measure the relays. '''
  
+ import signal
  import sys
 +import threading
 +import uuid
  
  from ..lib.circuitbuilder import GapsCircuitBuilder as CB
  from ..lib.resultdump import ResultDump
@@@ -13,7 -12,7 +14,7 @@@ from ..lib.relayprioritizer import Rela
  from ..lib.destination import DestinationList
  from ..util.timestamp import now_isodt_str
  from ..util.state import State
- from sbws.globals import fail_hard, TIMEOUT_MEASUREMENTS, HTTP_GET_HEADERS
 -from sbws.globals import fail_hard
++from sbws.globals import fail_hard, HTTP_GET_HEADERS
  import sbws.util.stem as stem_utils
  import sbws.util.requests as requests_utils
  from argparse import ArgumentDefaultsHelpFormatter
@@@ -25,54 -23,52 +25,78 @@@ import loggin
  import requests
  import random
  
- from sbws import settings
- 
+ from .. import settings
  
  rng = random.SystemRandom()
- end_event = Event()
  log = logging.getLogger(__name__)
+ # Declare the objects that manage the threads global so that sbws can exit
+ # gracefully at any time.
+ pool = None
+ rd = None
+ controller = None
+ 
+ 
+ def stop_threads(signal, frame):
+     global rd, pool
+     log.debug('Stopping sbws.')
+     # Avoid new threads to start.
+     settings.set_end_event()
+     # Stop Pool threads
+     pool.close()
+     pool.join()
+     # Stop ResultDump thread
+     rd.thread.join()
+     # Stop Tor thread
+     controller.close()
+     sys.exit(0)
+ 
+ 
+ signal.signal(signal.SIGTERM, stop_threads)
  
  
 +def dumpstacks():
 +    import traceback
 +    log.critical("sbws stop measuring relays, probably because of a bug."
 +                 "Please, open a ticket in trac.torproject.org with this"
 +                 "backtrace.")
 +    thread_id2name = dict([(t.ident, t.name) for t in threading.enumerate()])
 +    for thread_id, stack in sys._current_frames().items():
 +        log.critical("Thread: %s(%d)",
 +                     thread_id2name.get(thread_id, ""), thread_id)
 +        log.critical(traceback.print_stack(stack))
 +    # If logging level is less than DEBUG (more verbose), start pdb so that
 +    # developers can debug the issue.
 +    if log.getEffectiveLevel() < logging.DEBUG:
 +        import pdb
 +        pdb.set_trace()
 +    # Otherwise exit.
 +    else:
 +        # Change to stop threads when #28869 is merged
 +        sys.exit(1)
 +
 +
  def timed_recv_from_server(session, dest, byte_range):
      ''' Request the **byte_range** from the URL at **dest**. If successful,
      return True and the time it took to download. Otherwise return False and an
      exception. '''
 -    headers = {'Range': byte_range, 'Accept-Encoding': 'identity'}
 +
      start_time = time.time()
 +    HTTP_GET_HEADERS['Range'] = byte_range
      # TODO:
      # - What other exceptions can this throw?
 -    # - Do we have to read the content, or did requests already do so?
 +    # - response.elapsed "measures the time taken between sending the first
 +    #   byte of the request and finishing parsing the headers.
 +    #   It is therefore unaffected by consuming the response content"
 +    #   If this mean that the content has arrived, elapsed could be used to
 +    #   know the time it took.
      try:
 -        requests_utils.get(
 -            session, dest.url, headers=headers, verify=dest.verify)
 +        # headers are merged with the session ones, not overwritten.
 +        session.get(dest.url, headers=HTTP_GET_HEADERS, verify=dest.verify)
+     # NewConnectionError will be raised when shutting down.
      except (requests.exceptions.ConnectionError,
-             requests.exceptions.ReadTimeout) as e:
+             requests.exceptions.ReadTimeout,
+             requests.exceptions.NewConnectionError) as e:
+         log.debug(e)
          return False, e
      end_time = time.time()
      return True, end_time - start_time
@@@ -379,17 -472,9 +501,17 @@@ def run_speedtest(args, conf)
              'even lead to messed up results.',
              conf.getpath('tor', 'control_socket'))
          time.sleep(15)
 +
 +    # When there will be a refactor where conf is global, this can be removed
 +    # from here.
 +    state = State(conf.getpath('paths', 'state_fname'))
 +    # Call only once to initialize http_headers
 +    settings.init_http_headers(conf.get('scanner', 'nickname'), state['uuid'],
 +                               str(controller.get_version()))
 +
      rl = RelayList(args, conf, controller)
      cb = CB(args, conf, controller, rl)
-     rd = ResultDump(args, conf, end_event)
+     rd = ResultDump(args, conf)
      rp = RelayPrioritizer(args, conf, rl, rd)
      destinations, error_msg = DestinationList.from_config(
          conf, cb, rl, controller)
@@@ -454,13 -515,5 +552,8 @@@ def main(args, conf)
  
      state = State(conf.getpath('paths', 'state_fname'))
      state['scanner_started'] = now_isodt_str()
 +    # Generate an unique identifier for each scanner
 +    if 'uuid' not in state:
 +        state['uuid'] = str(uuid.uuid4())
  
-     try:
-         run_speedtest(args, conf)
-     except KeyboardInterrupt as e:
-         raise e
-     finally:
-         end_event.set()
+     run_speedtest(args, conf)



More information about the tor-commits mailing list