commit 814b2879843a04450f1822dbdd42909906e876df Author: Arturo Filastò arturo@filasto.net Date: Fri Feb 5 13:33:27 2016 +0100
Refactoring of the ORG queue
* Do not start tor if we already have a tor state * Fix failure to import GeoIP --- ooni/director.py | 2 +- ooni/geoip.py | 1 + ooni/oonicli.py | 166 ++++++++++++++++++++++--------------------------------- 3 files changed, 67 insertions(+), 102 deletions(-)
diff --git a/ooni/director.py b/ooni/director.py index 9bac49d..f633048 100644 --- a/ooni/director.py +++ b/ooni/director.py @@ -130,7 +130,7 @@ class Director(object): if start_tor: if check_incoherences: yield config.check_tor() - if config.advanced.start_tor: + if config.advanced.start_tor and config.tor_state is None: yield self.startTor() elif config.tor.control_port and config.tor_state is None: log.msg("Connecting to Tor Control Port...") diff --git a/ooni/geoip.py b/ooni/geoip.py index cb66675..11800b6 100644 --- a/ooni/geoip.py +++ b/ooni/geoip.py @@ -1,3 +1,4 @@ +from __future__ import absolute_import import re import os import random diff --git a/ooni/oonicli.py b/ooni/oonicli.py index 47aacfb..fc5eca0 100644 --- a/ooni/oonicli.py +++ b/ooni/oonicli.py @@ -226,8 +226,8 @@ def setupCollector(global_options, net_test_loader): return collector
-def createDeck(global_options,url=None,filename=None): - log.msg("Creating deck for: %s" %(url or filename,) ) +def createDeck(global_options, url=None, filename=None): + log.msg("Creating deck for: %s" % (url or filename,))
deck = Deck(no_collector=global_options['no-collector']) deck.bouncer = global_options['bouncer'] @@ -239,9 +239,9 @@ def createDeck(global_options,url=None,filename=None): log.debug("No test deck detected") test_file = nettest_to_path(global_options['test_file'], True) if url is not None: - args = ('-u',url) + args = ('-u', url) elif filename is not None: - args = ('-f',filename) + args = ('-f', filename) else: args = tuple() if any(global_options['subargs']): @@ -270,48 +270,13 @@ def createDeck(global_options,url=None,filename=None): sys.exit(5) return deck
-def runWithDirector(logging=True, start_tor=True, check_incoherences=True): - """ - Instance the director, parse command line options and start an ooniprobe - test! - """ - - global_options = setupGlobalOptions(logging, start_tor, check_incoherences) - - director = Director() - if global_options['list']: - print "# Installed nettests" - for net_test_id, net_test in director.getNetTests().items(): - print "* %s (%s/%s)" % (net_test['name'], - net_test['category'], - net_test['id']) - print " %s" % net_test['description'] - - sys.exit(0) - - elif global_options['printdeck']: - del global_options['printdeck'] - print "# Copy and paste the lines below into a test deck to run the specified test with the specified arguments" - print yaml.safe_dump([{'options': global_options}]).strip() - - sys.exit(0) - - if global_options.get('annotations') is not None: - annotations = setupAnnotations(global_options) - - if global_options['no-collector']: - log.msg("Not reporting using a collector") - global_options['collector'] = None - start_tor = False - else: - start_tor = True - - if global_options['collector']: - start_tor |= True
- deck = createDeck(global_options) +def runTestWithDirector(director, global_options, url=None, filename=None, + start_tor=True, check_incoherences=True): + deck = createDeck(global_options, url=url, filename=filename)
start_tor |= deck.requiresTor + d = director.start(start_tor=start_tor, check_incoherences=check_incoherences)
@@ -341,18 +306,59 @@ def runWithDirector(logging=True, start_tor=True, check_incoherences=True): test_details['annotations'] = global_options['annotations']
director.startNetTest(net_test_loader, - global_options['reportfile'], - collector) + global_options['reportfile'], + collector) return director.allTestsDone
- d.addCallback(setup_nettest) d.addCallback(post_director_start) d.addErrback(director_startup_handled_failures) d.addErrback(director_startup_other_failures) return d
- +def runWithDirector(logging=True, start_tor=True, check_incoherences=True): + """ + Instance the director, parse command line options and start an ooniprobe + test! + """ + + global_options = setupGlobalOptions(logging, start_tor, check_incoherences) + + director = Director() + if global_options['list']: + print "# Installed nettests" + for net_test_id, net_test in director.getNetTests().items(): + print "* %s (%s/%s)" % (net_test['name'], + net_test['category'], + net_test['id']) + print " %s" % net_test['description'] + + sys.exit(0) + + elif global_options['printdeck']: + del global_options['printdeck'] + print "# Copy and paste the lines below into a test deck to run the specified test with the specified arguments" + print yaml.safe_dump([{'options': global_options}]).strip() + + sys.exit(0) + + if global_options.get('annotations') is not None: + annotations = setupAnnotations(global_options) + + if global_options['no-collector']: + log.msg("Not reporting using a collector") + global_options['collector'] = None + start_tor = False + else: + start_tor = True + + if global_options['collector']: + start_tor |= True + + return runTestWithDirector(director=director, + global_options=global_options, + check_incoherences=check_incoherences) +
# this variant version of runWithDirector splits the process in two, # allowing a single director instance to be reused with multiple decks. @@ -373,7 +379,6 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True) sys.exit(7)
- global_options = setupGlobalOptions(logging, start_tor, check_incoherences)
director = Director() @@ -388,80 +393,40 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True) else: start_tor = True
- - def run_test(global_options, url=None, filename=None): - assert url is not None or filename is not None - - deck = createDeck(global_options, url=url, filename=filename) - - d = director.start(start_tor=True, - check_incoherences=check_incoherences) - - def setup_nettest(_): - try: - return deck.setup() - except errors.UnableToLoadDeckInput as error: - return defer.failure.Failure(error) - - - - # Wait until director has started up (including bootstrapping Tor) - # before adding tests - def post_director_start(_): - for net_test_loader in deck.netTestLoaders: - # Decks can specify different collectors - # for each net test, so that each NetTest - # may be paired with a test_helper and its collector - # However, a user can override this behavior by - # specifying a collector from the command-line (-c). - # If a collector is not specified in the deck, or the - # deck is a singleton, the default collector set in - # ooniprobe.conf will be used - - collector = setupCollector(global_options, net_test_loader) - - test_details = net_test_loader.testDetails - test_details['annotations'] = global_options['annotations'] - - director.startNetTest(net_test_loader, - global_options['reportfile'], - collector) - return director.allTestsDone - - d.addCallback(setup_nettest) - d.addCallback(post_director_start) - d.addErrback(director_startup_handled_failures) - d.addErrback(director_startup_other_failures) - return d - finished = defer.Deferred()
@defer.inlineCallbacks - def readmsg(_,channel, queue_object, consumer_tag, counter): + def readmsg(_, channel, queue_object, consumer_tag, counter):
# Wait for a message and decode it. if counter >= lifetime: + log.msg("Counter") queue_object.close(LifetimeExceeded()) yield channel.basic_cancel(consumer_tag=consumer_tag) finished.callback(None)
else: log.msg("Waiting for message") - + try: ch, method, properties, body = yield queue_object.get() log.msg("Got message") data = json.loads(body) counter += 1
- log.msg("Received %d/%d: %s" %(counter, lifetime, data['url'],)) + log.msg("Received %d/%d: %s" % (counter, lifetime, data['url'],)) # acknowledge the message ch.basic_ack(delivery_tag=method.delivery_tag)
- d = run_test(global_options, url=data['url'].encode('utf8')) + d = runTestWithDirector(director=director, + global_options=global_options, + url=data['url'].encode('utf8'), + check_incoherences=check_incoherences) # When the test has been completed, go back to waiting for a message. d.addCallback(readmsg, channel, queue_object, consumer_tag, counter+1) except exceptions.AMQPError,v: + log.msg("Error") + log.exception(v) finished.errback(v)
@@ -474,7 +439,7 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True) queue_object, consumer_tag = yield channel.basic_consume( queue=name, no_ack=False) - readmsg(None,channel,queue_object,consumer_tag, 0) + readmsg(None, channel, queue_object, consumer_tag, 0)
@@ -484,10 +449,9 @@ def runWithDaemonDirector(logging=True, start_tor=True, check_incoherences=True) urlargs = dict(urlparse.parse_qsl(urlp.query))
# random lifetime requests counter - lifetime = random.randint(820,1032) + lifetime = random.randint(820, 1032)
# AMQP connection details are sent through the cmdline parameter '-Q' - creds = pika.PlainCredentials(urlp.username or 'guest', urlp.password or 'guest') parameters = pika.ConnectionParameters(urlp.hostname,