[tor-commits] [bridgedb/develop] Reparse descriptors in background thread on signal

isis at torproject.org isis at torproject.org
Tue Apr 1 22:16:43 UTC 2014


commit bd652aa560cfaf15b9b37979aba73e9f8f548b4e
Author: Matthew Finkel <Matthew.Finkel at gmail.com>
Date:   Fri Feb 7 01:16:34 2014 +0000

    Reparse descriptors in background thread on signal
    
    Create temporary distributors and fill them with bridges then
    overwrite the old distributors' rings with the rings from the
    temporary distributors. Factor out distributor creation code.
    
    Bug fix 5232
---
 CHANGELOG            |    7 ++
 lib/bridgedb/Main.py |  203 +++++++++++++++++++++++++++++++-------------------
 2 files changed, 134 insertions(+), 76 deletions(-)

diff --git a/CHANGELOG b/CHANGELOG
index 092ce8c..0498e4f 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,10 @@
+Changes in version 0.1.7 -
+        * FIXES #5232 Perform long running and blocking transactions
+                in background threads. Primarily this moves bridge
+                descriptor reparsing into another thread and
+                significantly increased the availability of bridgedb,
+                as a result.
+
 Changes in version 0.1.6 - 2014-03-26
 BridgeDB 0.1.6 includes fixes for the following bugs:
 	* FIXES #11196 BridgeDB should use leekspin
diff --git a/lib/bridgedb/Main.py b/lib/bridgedb/Main.py
index 7fc4c6c..6c099b8 100644
--- a/lib/bridgedb/Main.py
+++ b/lib/bridgedb/Main.py
@@ -92,6 +92,7 @@ def load(state, splitter, clear=False):
 
     logging.info("Loading bridges...")
 
+    bridges = {}
     status = {}
     addresses = {}
     timestamps = {}
@@ -118,8 +119,6 @@ def load(state, splitter, clear=False):
     logging.debug("Closing network status file")
     f.close()
 
-    db = bridgedb.Storage.getDB()
-
     for fname in state.BRIDGE_FILES:
         logging.info("Opening bridge-server-descriptor file: '%s'" % fname)
         f = open(fname, 'r')
@@ -191,6 +190,27 @@ def load(state, splitter, clear=False):
         logging.debug("Closing blocking-countries document")
         f.close()
 
+    def updateBridgeHistory(bridges, timestamps):
+        if not hasattr(state, 'config'):
+            logging.info("updateBridgeHistory(): Config file not set "\
+                "in State file.")
+            return
+        if state.COLLECT_TIMESTAMPS:
+            logging.debug("Beginning bridge stability calculations")
+            for bridge in bridges.values():
+                if bridge.getID() in timestamps.keys():
+                    ts = timestamps[bridge.getID()][:]
+                    ts.sort()
+                    for timestamp in ts:
+                        logging.debug(
+                            "Updating BridgeHistory timestamps for %s: %s"
+                            % (bridge.fingerprint, timestamp))
+                        bridgedb.Stability.addOrUpdateBridgeHistory(
+                            bridge, timestamp)
+            logging.debug("Stability calculations complete")
+
+    reactor.callInThread(updateBridgeHistory, bridges, timestamps)
+
     bridges = None
     state.save()
     return
@@ -327,7 +347,7 @@ def _reloadFn(*args):
 
 def _handleSIGHUP(*args):
     """Called when we receive a SIGHUP; invokes _reloadFn."""
-    reactor.callLater(0, _reloadFn, *args)
+    reactor.callInThread(_reloadFn)
 
 def _handleSIGUSR1(*args):
     """Handler for SIGUSR1. Calls :func:`~bridgedb.runner.doDumpBridges`."""
@@ -340,7 +360,7 @@ def _handleSIGUSR1(*args):
     cfg = loadConfig(state.CONFIG_FILE, state.config)
 
     logging.info("Dumping bridge assignments to files...")
-    reactor.callLater(0, runner.doDumpBridges, cfg)
+    reactor.callInThread(runner.doDumpBridges, cfg)
 
 
 class ProxyCategory:
@@ -351,6 +371,75 @@ class ProxyCategory:
     def replaceProxyList(self, ipset):
         self.ipset = ipset
 
+def replaceBridgeRings(current, replacement):
+    """Replace the current thing with the new one"""
+    current.splitter = replacement.splitter
+
+def createBridgeRings(cfg, proxyList, key):
+    """Create the bridge distributors defined by the config file
+
+    :type cfg:  :class:`Conf`
+    :param cfg: The current configuration, including any in-memory
+                settings (i.e. settings whose values were not obtained from the
+                config file, but were set via a function somewhere)
+    :type proxyList: :class:`ProxyCategory`
+    :param proxyList: The container for the IP addresses of any currently
+                      known open proxies.
+    :param bytes key: Splitter master key
+    :rtype: tuple
+    :returns: A BridgeSplitter splitter, an IPBasedDistributor or None,
+              and an EmailBasedDistributor or None.
+    """
+
+    # Create a BridgeSplitter to assign the bridges to the different
+    # distributors.
+    splitter = Bridges.BridgeSplitter(crypto.getHMAC(key, "Splitter-Key"))
+    logging.debug("Created splitter: %r" % splitter)
+
+    # Create ring parameters.
+    ringParams = Bridges.BridgeRingParameters(needPorts=cfg.FORCE_PORTS,
+                                              needFlags=cfg.FORCE_FLAGS)
+
+    emailDistributor = ipDistributor = None
+    # As appropriate, create an IP-based distributor.
+    if cfg.HTTPS_DIST and cfg.HTTPS_SHARE:
+        logging.debug("Setting up HTTPS Distributor...")
+        categories = []
+        if proxyList.ipset:
+            logging.debug("Adding proxyList to HTTPS Distributor categories.")
+            categories.append(proxyList)
+        logging.debug("HTTPS Distributor categories: '%s'" % categories)
+
+        ipDistributor = Dist.IPBasedDistributor(
+            Dist.uniformMap,
+            cfg.N_IP_CLUSTERS,
+            crypto.getHMAC(key, "HTTPS-IP-Dist-Key"),
+            categories,
+            answerParameters=ringParams)
+        splitter.addRing(ipDistributor, "https", cfg.HTTPS_SHARE)
+
+    # As appropriate, create an email-based distributor.
+    if cfg.EMAIL_DIST and cfg.EMAIL_SHARE:
+        logging.debug("Setting up Email Distributor...")
+        emailDistributor = Dist.EmailBasedDistributor(
+            crypto.getHMAC(key, "Email-Dist-Key"),
+            cfg.EMAIL_DOMAIN_MAP.copy(),
+            cfg.EMAIL_DOMAIN_RULES.copy(),
+            answerParameters=ringParams)
+        splitter.addRing(emailDistributor, "email", cfg.EMAIL_SHARE)
+
+    # As appropriate, tell the splitter to leave some bridges unallocated.
+    if cfg.RESERVED_SHARE:
+        splitter.addRing(Bridges.UnallocatedHolder(),
+                         "unallocated",
+                         cfg.RESERVED_SHARE)
+
+    # Add pseudo distributors to splitter
+    for pseudoRing in cfg.FILE_BUCKETS.keys():
+        splitter.addPseudoRing(pseudoRing)
+
+    return splitter, emailDistributor, ipDistributor
+
 def startup(options):
     """Parse bridges,
 
@@ -402,73 +491,18 @@ def startup(options):
     # Load the master key, or create a new one.
     key = crypto.getKey(config.MASTER_KEY_FILE)
 
-    # Initialize our DB file.
-    db = bridgedb.Storage.Database(config.DB_FILE + ".sqlite", config.DB_FILE)
-    # TODO: move setGlobalDB to bridgedb.persistent.State class
-    bridgedb.Storage.setGlobalDB(db)
-
     # Get a proxy list.
     proxyList = ProxyCategory()
     proxyList.replaceProxyList(loadProxyList(config))
 
-    # Create a BridgeSplitter to assign the bridges to the different
-    # distributors.
-    splitter = Bridges.BridgeSplitter(crypto.getHMAC(key, "Splitter-Key"))
-    logging.debug("Created splitter: %r" % splitter)
-
-    # Create ring parameters.
-    ringParams = Bridges.BridgeRingParameters(needPorts=config.FORCE_PORTS,
-                                              needFlags=config.FORCE_FLAGS)
-
     emailDistributor = ipDistributor = None
 
-    # As appropriate, create an IP-based distributor.
-    if config.HTTPS_DIST and config.HTTPS_SHARE:
-        logging.debug("Setting up HTTPS Distributor...")
-        categories = []
-        if proxyList.ipset:
-            logging.debug("Adding proxyList to HTTPS Distributor categories.")
-            categories.append(proxyList)
-        logging.debug("HTTPS Distributor categories: '%s'" % categories)
-
-        ipDistributor = Dist.IPBasedDistributor(
-            Dist.uniformMap,
-            config.N_IP_CLUSTERS,
-            crypto.getHMAC(key, "HTTPS-IP-Dist-Key"),
-            categories,
-            answerParameters=ringParams)
-        splitter.addRing(ipDistributor, "https", config.HTTPS_SHARE)
-        #webSchedule = Time.IntervalSchedule("day", 2)
-        webSchedule = Time.NoSchedule()
-
-    # As appropriate, create an email-based distributor.
-    if config.EMAIL_DIST and config.EMAIL_SHARE:
-        logging.debug("Setting up Email Distributor...")
-        emailDistributor = Dist.EmailBasedDistributor(
-            crypto.getHMAC(key, "Email-Dist-Key"),
-            config.EMAIL_DOMAIN_MAP.copy(),
-            config.EMAIL_DOMAIN_RULES.copy(),
-            answerParameters=ringParams)
-        splitter.addRing(emailDistributor, "email", config.EMAIL_SHARE)
-        #emailSchedule = Time.IntervalSchedule("day", 1)
-        emailSchedule = Time.NoSchedule()
-
-    # As appropriate, tell the splitter to leave some bridges unallocated.
-    if config.RESERVED_SHARE:
-        splitter.addRing(Bridges.UnallocatedHolder(),
-                         "unallocated",
-                         config.RESERVED_SHARE)
-
-    # Add pseudo distributors to splitter
-    for pseudoRing in config.FILE_BUCKETS.keys():
-        splitter.addPseudoRing(pseudoRing)
-
     # Save our state
     state.proxyList = proxyList
     state.key = key
     state.save()
 
-    def reload(*args):
+    def reload(inThread=True):
         """Reload settings, proxy lists, and bridges.
 
         State should be saved before calling this method, and will be saved
@@ -514,6 +548,14 @@ def startup(options):
         logging.debug("Saving state again before reparsing descriptors...")
         state.save()
         logging.info("Reparsing bridge descriptors...")
+
+        (splitter,
+         emailDistributorTmp,
+         ipDistributorTmp) = createBridgeRings(cfg, proxyList, key)
+
+        # Initialize our DB file.
+        db = bridgedb.Storage.Database(cfg.DB_FILE + ".sqlite", cfg.DB_FILE)
+        bridgedb.Storage.setGlobalDB(db)
         load(state, splitter, clear=False)
 
         state = persistent.load()
@@ -521,32 +563,32 @@ def startup(options):
         logging.debug("Replacing the list of open proxies...")
         state.proxyList.replaceProxyList(loadProxyList(cfg))
 
-        if emailDistributor is not None:
-            emailDistributor.prepopulateRings() # create default rings
+        if emailDistributorTmp is not None:
+            emailDistributorTmp.prepopulateRings() # create default rings
             logging.info("Bridges allotted for %s distribution: %d"
-                         % (emailDistributor.name,
-                            len(emailDistributor.splitter)))
+                         % (emailDistributorTmp.name,
+                            len(emailDistributorTmp.splitter)))
         else:
             logging.warn("No email distributor created!")
 
-        if ipDistributor is not None:
-            ipDistributor.prepopulateRings() # create default rings
+        if ipDistributorTmp is not None:
+            ipDistributorTmp.prepopulateRings() # create default rings
 
             logging.info("Bridges allotted for %s distribution: %d"
-                         % (ipDistributor.name,
-                            len(ipDistributor.splitter)))
+                         % (ipDistributorTmp.name,
+                            len(ipDistributorTmp.splitter)))
             logging.info("\tNum bridges:\tFilter set:")
 
             nSubrings  = 0
-            ipSubrings = ipDistributor.splitter.filterRings
+            ipSubrings = ipDistributorTmp.splitter.filterRings
             for (ringname, (filterFn, subring)) in ipSubrings.items():
                 nSubrings += 1
                 filterSet = ' '.join(
-                    ipDistributor.splitter.extractFilterNames(ringname))
+                    ipDistributorTmp.splitter.extractFilterNames(ringname))
                 logging.info("\t%2d bridges\t%s" % (len(subring), filterSet))
 
             logging.info("Total subrings for %s: %d"
-                         % (ipDistributor.name, nSubrings))
+                         % (ipDistributorTmp.name, nSubrings))
         else:
             logging.warn("No HTTP(S) distributor created!")
 
@@ -563,21 +605,32 @@ def startup(options):
         except IOError:
             logging.info("I/O error while writing assignments to: '%s'"
                          % state.ASSIGNMENTS_FILE)
-
         state.save()
 
+        if inThread:
+            reactor.callFromThread(replaceBridgeRings, ipDistributor, ipDistributorTmp)
+            reactor.callFromThread(replaceBridgeRings, emailDistributor, emailDistributorTmp)
+        else:
+            # We're still starting up. Return these distributors so
+            # they are configured in the outer-namespace
+            return emailDistributorTmp, ipDistributorTmp
+
     global _reloadFn
     _reloadFn = reload
     signal.signal(signal.SIGHUP, _handleSIGHUP)
     signal.signal(signal.SIGUSR1, _handleSIGUSR1)
 
-    # And actually load it to start parsing.
-    reload()
+    # And actually load it to start parsing. Get back our distributors.
+    emailDistributor, ipDistributor = reload(False)
 
     # Configure all servers:
     if config.HTTPS_DIST and config.HTTPS_SHARE:
+        #webSchedule = Time.IntervalSchedule("day", 2)
+        webSchedule = Time.NoSchedule()
         HTTPServer.addWebServer(config, ipDistributor, webSchedule)
     if config.EMAIL_DIST and config.EMAIL_SHARE:
+        #emailSchedule = Time.IntervalSchedule("day", 1)
+        emailSchedule = Time.NoSchedule()
         EmailServer.addSMTPServer(config, emailDistributor, emailSchedule)
 
     # Actually run the servers.
@@ -587,8 +640,6 @@ def startup(options):
     except KeyboardInterrupt:
         logging.fatal("Received keyboard interrupt. Shutting down...")
     finally:
-        logging.info("Closing databases...")
-        db.close()
         if config.PIDFILE:
             os.unlink(config.PIDFILE)
         logging.info("Exiting...")





More information about the tor-commits mailing list