[tor-commits] [ooni-probe/master] Refactoring of the ORG queue

art at torproject.org art at torproject.org
Fri Apr 29 09:42:21 UTC 2016


commit 814b2879843a04450f1822dbdd42909906e876df
Author: Arturo Filastò <arturo at filasto.net>
Date:   Fri Feb 5 13:33:27 2016 +0100

    Refactoring of the ORG queue
    
    * Do not start tor if we already have a tor state
    * Fix failure to import GeoIP
---
 ooni/director.py |   2 +-
 ooni/geoip.py    |   1 +
 ooni/oonicli.py  | 166 ++++++++++++++++++++++---------------------------------
 3 files changed, 67 insertions(+), 102 deletions(-)

diff --git a/ooni/director.py b/ooni/director.py
index 9bac49d..f633048 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -130,7 +130,7 @@ class Director(object):
         if start_tor:
             if check_incoherences:
                 yield config.check_tor()
-            if config.advanced.start_tor:
+            if config.advanced.start_tor and config.tor_state is None:
                 yield self.startTor()
             elif config.tor.control_port and config.tor_state is None:
                 log.msg("Connecting to Tor Control Port...")
diff --git a/ooni/geoip.py b/ooni/geoip.py
index cb66675..11800b6 100644
--- a/ooni/geoip.py
+++ b/ooni/geoip.py
@@ -1,3 +1,4 @@
+from __future__ import absolute_import
 import re
 import os
 import random
diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 47aacfb..fc5eca0 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -226,8 +226,8 @@ def setupCollector(global_options, net_test_loader):
     return collector
 
 
-def createDeck(global_options,url=None,filename=None):
-    log.msg("Creating deck for: %s" %(url or filename,) )
+def createDeck(global_options, url=None, filename=None):
+    log.msg("Creating deck for: %s" % (url or filename,))
 
     deck = Deck(no_collector=global_options['no-collector'])
     deck.bouncer = global_options['bouncer']
@@ -239,9 +239,9 @@ def createDeck(global_options,url=None,filename=None):
             log.debug("No test deck detected")
             test_file = nettest_to_path(global_options['test_file'], True)
             if url is not None:
-                args = ('-u',url)
+                args = ('-u', url)
             elif filename is not None:
-                args = ('-f',filename)
+                args = ('-f', filename)
             else:
                 args = tuple()
             if any(global_options['subargs']):
@@ -270,48 +270,13 @@ def createDeck(global_options,url=None,filename=None):
         sys.exit(5)
     return deck
 
-def runWithDirector(logging=True, start_tor=True, check_incoherences=True):
-    """
-    Instance the director, parse command line options and start an ooniprobe
-    test!
-    """
-
-    global_options = setupGlobalOptions(logging, start_tor, check_incoherences)
-
-    director = Director()
-    if global_options['list']:
-        print "# Installed nettests"
-        for net_test_id, net_test in director.getNetTests().items():
-            print "* %s (%s/%s)" % (net_test['name'],
-                                    net_test['category'],
-                                    net_test['id'])
-            print "  %s" % net_test['description']
-
-        sys.exit(0)
-
-    elif global_options['printdeck']:
-        del global_options['printdeck']
-        print "# Copy and paste the lines below into a test deck to run the specified test with the specified arguments"
-        print yaml.safe_dump([{'options': global_options}]).strip()
-
-        sys.exit(0)
-
-    if global_options.get('annotations') is not None:
-        annotations = setupAnnotations(global_options)
-
-    if global_options['no-collector']:
-        log.msg("Not reporting using a collector")
-        global_options['collector'] = None
-        start_tor = False
-    else:
-        start_tor = True
-
-    if global_options['collector']:
-        start_tor |= True
 
-    deck = createDeck(global_options)
+def runTestWithDirector(director, global_options, url=None, filename=None,
+                        start_tor=True, check_incoherences=True):
+    deck = createDeck(global_options, url=url, filename=filename)
 
     start_tor |= deck.requiresTor
+
     d = director.start(start_tor=start_tor,
                        check_incoherences=check_incoherences)
 
@@ -341,18 +306,59 @@ def runWithDirector(logging=True, start_tor=True, check_incoherences=True):
             test_details['annotations'] = global_options['annotations']
 
             director.startNetTest(net_test_loader,
-                                  global_options['reportfile'],
-                                  collector)
+                                    global_options['reportfile'],
+                                    collector)
         return director.allTestsDone
 
-
     d.addCallback(setup_nettest)
     d.addCallback(post_director_start)
     d.addErrback(director_startup_handled_failures)
     d.addErrback(director_startup_other_failures)
     return d
 
-    
+def runWithDirector(logging=True, start_tor=True, check_incoherences=True):
+    """
+    Instance the director, parse command line options and start an ooniprobe
+    test!
+    """
+
+    global_options = setupGlobalOptions(logging, start_tor, check_incoherences)
+
+    director = Director()
+    if global_options['list']:
+        print "# Installed nettests"
+        for net_test_id, net_test in director.getNetTests().items():
+            print "* %s (%s/%s)" % (net_test['name'],
+                                    net_test['category'],
+                                    net_test['id'])
+            print "  %s" % net_test['description']
+
+        sys.exit(0)
+
+    elif global_options['printdeck']:
+        del global_options['printdeck']
+        print "# Copy and paste the lines below into a test deck to run the specified test with the specified arguments"
+        print yaml.safe_dump([{'options': global_options}]).strip()
+
+        sys.exit(0)
+
+    if global_options.get('annotations') is not None:
+        annotations = setupAnnotations(global_options)
+
+    if global_options['no-collector']:
+        log.msg("Not reporting using a collector")
+        global_options['collector'] = None
+        start_tor = False
+    else:
+        start_tor = True
+
+    if global_options['collector']:
+        start_tor |= True
+
+    return runTestWithDirector(director=director,
+                               global_options=global_options,
+                               check_incoherences=check_incoherences)
+
 
 # this variant version of runWithDirector splits the process in two,
 # allowing a single director instance to be reused with multiple decks.
@@ -373,7 +379,6 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True)
         sys.exit(7)
 
 
-
     global_options = setupGlobalOptions(logging, start_tor, check_incoherences)
 
     director = Director()
@@ -388,80 +393,40 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True)
     else:
         start_tor = True
 
-
-    def run_test(global_options, url=None, filename=None):
-        assert url is not None or filename is not None
-
-        deck = createDeck(global_options, url=url, filename=filename)
-
-        d = director.start(start_tor=True,
-                           check_incoherences=check_incoherences)
-
-        def setup_nettest(_):
-            try:
-                return deck.setup()
-            except errors.UnableToLoadDeckInput as error:
-                return defer.failure.Failure(error)
-
-
-
-        # Wait until director has started up (including bootstrapping Tor)
-        # before adding tests
-        def post_director_start(_):
-            for net_test_loader in deck.netTestLoaders:
-                # Decks can specify different collectors
-                # for each net test, so that each NetTest
-                # may be paired with a test_helper and its collector
-                # However, a user can override this behavior by
-                # specifying a collector from the command-line (-c).
-                # If a collector is not specified in the deck, or the
-                # deck is a singleton, the default collector set in
-                # ooniprobe.conf will be used
-
-                collector = setupCollector(global_options, net_test_loader)
-
-                test_details = net_test_loader.testDetails
-                test_details['annotations'] = global_options['annotations']
-
-                director.startNetTest(net_test_loader,
-                                      global_options['reportfile'],
-                                      collector)
-            return director.allTestsDone
- 
-        d.addCallback(setup_nettest)
-        d.addCallback(post_director_start)
-        d.addErrback(director_startup_handled_failures)
-        d.addErrback(director_startup_other_failures)
-        return d
-
     finished = defer.Deferred()
 
     @defer.inlineCallbacks
-    def readmsg(_,channel, queue_object, consumer_tag, counter):
+    def readmsg(_, channel, queue_object, consumer_tag, counter):
 
         # Wait for a message and decode it.
         if counter >= lifetime:
+            log.msg("Counter")
             queue_object.close(LifetimeExceeded())
             yield channel.basic_cancel(consumer_tag=consumer_tag)
             finished.callback(None)
 
         else:
             log.msg("Waiting for message")
-            
+
             try:
                 ch, method, properties, body = yield queue_object.get()
                 log.msg("Got message")
                 data = json.loads(body)
                 counter += 1
 
-                log.msg("Received %d/%d: %s" %(counter, lifetime, data['url'],))
+                log.msg("Received %d/%d: %s" % (counter, lifetime, data['url'],))
                 # acknowledge the message
                 ch.basic_ack(delivery_tag=method.delivery_tag)
 
-                d = run_test(global_options, url=data['url'].encode('utf8'))
+                d = runTestWithDirector(director=director,
+                                        global_options=global_options,
+                                        url=data['url'].encode('utf8'),
+                                        check_incoherences=check_incoherences)
                 # When the test has been completed, go back to waiting for a message.
                 d.addCallback(readmsg, channel, queue_object, consumer_tag, counter+1)
             except exceptions.AMQPError,v:
+                log.msg("Error")
+                log.exception(v)
                 finished.errback(v)
 
 
@@ -474,7 +439,7 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True)
         queue_object, consumer_tag = yield channel.basic_consume(
                                                    queue=name,
                                                    no_ack=False)
-        readmsg(None,channel,queue_object,consumer_tag, 0)
+        readmsg(None, channel, queue_object, consumer_tag, 0)
 
 
 
@@ -484,10 +449,9 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True)
     urlargs = dict(urlparse.parse_qsl(urlp.query))
 
     # random lifetime requests counter
-    lifetime = random.randint(820,1032)
+    lifetime = random.randint(820, 1032)
 
     # AMQP connection details are sent through the cmdline parameter '-Q'
-    
     creds = pika.PlainCredentials(urlp.username or 'guest',
                                   urlp.password or 'guest')
     parameters = pika.ConnectionParameters(urlp.hostname,





More information about the tor-commits mailing list