[tor-commits] [ooni-probe/master] Add minimal outline of the ooniprobe-agent and new deck format

art at torproject.org art at torproject.org
Mon Sep 19 12:14:24 UTC 2016


commit 7829363f1066a469995c0410025d7e895522363f
Author: Arturo Filastò <arturo at filasto.net>
Date:   Mon Jul 25 16:08:05 2016 +0200

    Add minimal outline of the ooniprobe-agent and new deck format
    
    * Use DuckDuckGo to perform geoip lookups instead of torproject.org
    
    * Big refactoring of the Director
---
 ooni/agent/__init__.py         |   0
 ooni/agent/agent.py            |  21 ++
 ooni/agent/scheduler.py        |  27 ++
 ooni/deck.py                   | 715 +++++++++++++++++++++++++++++------------
 ooni/director.py               | 231 ++++++++-----
 ooni/geoip.py                  |   8 +-
 ooni/measurements.py           |  43 ---
 ooni/nettest.py                |   3 +-
 ooni/results.py                |  39 +++
 ooni/tests/bases.py            |   2 +-
 ooni/tests/test_deck.py        |  57 +++-
 ooni/tests/test_director.py    |  12 +-
 ooni/tests/test_nettest.py     |   8 +-
 ooni/tests/test_oonideckgen.py |   7 +-
 ooni/tests/test_oonireport.py  |  18 +-
 ooni/ui/cli.py                 |   8 +-
 ooni/ui/web/client/index.html  |   2 +-
 ooni/ui/web/server.py          | 203 ++++++------
 ooni/ui/web/web.py             |  56 +---
 ooni/utils/onion.py            |  14 -
 20 files changed, 974 insertions(+), 500 deletions(-)

diff --git a/ooni/agent/__init__.py b/ooni/agent/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/ooni/agent/agent.py b/ooni/agent/agent.py
new file mode 100644
index 0000000..a1394f0
--- /dev/null
+++ b/ooni/agent/agent.py
@@ -0,0 +1,21 @@
+from twisted.application import service
+from ooni.director import Director
+from ooni.settings import config
+
+from ooni.ui.web.web import WebUIService
+from ooni.agent.scheduler import SchedulerService
+
+class AgentService(service.MultiService):
+    def __init__(self):
+        service.MultiService.__init__(self)
+
+        director = Director()
+        config.set_paths()
+        config.initialize_ooni_home()
+        config.read_config_file()
+
+        self.web_ui_service = WebUIService(director)
+        self.web_ui_service.setServiceParent(self)
+
+        self.scheduler_service = SchedulerService(director)
+        self.scheduler_service.setServiceParent(self)
diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py
new file mode 100644
index 0000000..1004597
--- /dev/null
+++ b/ooni/agent/scheduler.py
@@ -0,0 +1,27 @@
+from twisted.application import service
+from twisted.internet import task
+
+class SchedulerService(service.MultiService):
+    """
+    This service is responsible for running the periodic tasks.
+    """
+    def __init__(self, director, interval=30):
+        service.MultiService.__init__(self)
+        self.director = director
+        self.interval = interval
+        self._looping_call = task.LoopingCall(self._should_run)
+
+    def _should_run(self):
+        """
+        This function is called every self.interval seconds to check
+        which periodic tasks should be run.
+        """
+        pass
+
+    def startService(self):
+        service.MultiService.startService(self)
+        self._looping_call.start(self.interval)
+
+    def stopService(self):
+        service.MultiService.stopService(self)
+        self._looping_call.stop()
diff --git a/ooni/deck.py b/ooni/deck.py
index 9f17530..1746d26 100644
--- a/ooni/deck.py
+++ b/ooni/deck.py
@@ -1,27 +1,28 @@
 # -*- coding: utf-8 -*-
-import csv
 import os
-import yaml
+import csv
 import json
 
+from copy import deepcopy
 from hashlib import sha256
-from datetime import datetime
-from ooni.backend_client import CollectorClient, BouncerClient
-from ooni.backend_client import WebConnectivityClient, guess_backend_type
-from ooni.nettest import NetTestLoader
-from ooni.settings import config
 
-from ooni.otime import timestampNowISO8601UTC
+import yaml
 
-from ooni.resources.update import check_for_update
+from twisted.internet import defer
+from twisted.python.filepath import FilePath
 
-from ooni.utils import log
 from ooni import constants
 from ooni import errors as e
+from ooni.backend_client import CollectorClient, BouncerClient
+from ooni.backend_client import WebConnectivityClient, guess_backend_type
+from ooni.nettest import NetTestLoader
+from ooni.otime import timestampNowISO8601UTC
+from ooni.resources import check_for_update
+from ooni.settings import config
+from ooni.utils import generate_filename
+from ooni.utils import log
 
-from twisted.python.filepath import FilePath
-from twisted.internet import defer
-
+from ooni.results import generate_summary
 
 class InputFile(object):
     def __init__(self, input_hash, base_path=config.inputs_directory):
@@ -116,6 +117,25 @@ def nettest_to_path(path, allow_arbitrary_paths=False):
     return found_path
 
 
+def get_preferred_bouncer():
+    preferred_backend = config.advanced.get(
+        "preferred_backend", "onion"
+    )
+    bouncer_address = getattr(
+        constants, "CANONICAL_BOUNCER_{0}".format(
+            preferred_backend.upper()
+        )
+    )
+    if preferred_backend == "cloudfront":
+        return BouncerClient(
+            settings={
+                'address': bouncer_address[0],
+                'front': bouncer_address[1],
+                'type': 'cloudfront'
+        })
+    else:
+        return BouncerClient(bouncer_address)
+
 class Deck(InputFile):
     # this exists so we can mock it out in unittests
     _BouncerClient = BouncerClient
@@ -227,175 +247,10 @@ class Deck(InputFile):
         if self.bouncer:
             log.msg("Looking up collector and test helpers with {0}".format(
                 self.bouncer.base_address))
-            yield self.lookupCollectorAndTestHelpers()
-
-
-    def sortAddressesByPriority(self, priority_address, alternate_addresses):
-        prioritised_addresses = []
-
-        backend_type = guess_backend_type(priority_address)
-        priority_address = {
-            'address': priority_address,
-            'type': backend_type
-        }
-        address_priority = ['onion', 'https', 'cloudfront', 'http']
-        address_priority.remove(self.preferred_backend)
-        address_priority.insert(0, self.preferred_backend)
-
-        def filter_by_type(collectors, collector_type):
-            return filter(lambda x: x['type'] == collector_type, collectors)
-
-        if (priority_address['type'] != self.preferred_backend):
-            valid_alternatives = filter_by_type(alternate_addresses,
-                                                self.preferred_backend)
-            if len(valid_alternatives) > 0:
-                alternate_addresses += [priority_address]
-                priority_address = valid_alternatives[0]
-                alternate_addresses.remove(priority_address)
-
-        prioritised_addresses += [priority_address]
-        for address_type in address_priority:
-            prioritised_addresses += filter_by_type(alternate_addresses,
-                                                    address_type)
-
-        return prioritised_addresses
-
-    @defer.inlineCallbacks
-    def getReachableCollector(self, collector_address, collector_alternate):
-        # We prefer onion collector to https collector to cloudfront
-        # collectors to plaintext collectors
-        for collector_settings in self.sortAddressesByPriority(collector_address,
-                                                               collector_alternate):
-            collector = self._CollectorClient(settings=collector_settings)
-            if not collector.isSupported():
-                log.err("Unsupported %s collector %s" % (
-                            collector_settings['type'],
-                            collector_settings['address']))
-                continue
-            reachable = yield collector.isReachable()
-            if not reachable:
-                log.err("Unreachable %s collector %s" % (
-                            collector_settings['type'],
-                            collector_settings['address']))
-                continue
-            defer.returnValue(collector)
-
-        raise e.NoReachableCollectors
-
-    @defer.inlineCallbacks
-    def getReachableTestHelper(self, test_helper_name, test_helper_address,
-                               test_helper_alternate):
-        # For the moment we look for alternate addresses only of
-        # web_connectivity test helpers.
-        if test_helper_name == 'web-connectivity':
-            for web_connectivity_settings in self.sortAddressesByPriority(
-                    test_helper_address, test_helper_alternate):
-                web_connectivity_test_helper = WebConnectivityClient(
-                    settings=web_connectivity_settings)
-                if not web_connectivity_test_helper.isSupported():
-                    log.err("Unsupported %s web_connectivity test_helper "
-                            "%s" % (
-                            web_connectivity_settings['type'],
-                            web_connectivity_settings['address']
-                    ))
-                    continue
-                reachable = yield web_connectivity_test_helper.isReachable()
-                if not reachable:
-                    log.err("Unreachable %s web_connectivity test helper %s" % (
-                        web_connectivity_settings['type'],
-                        web_connectivity_settings['address']
-                    ))
-                    continue
-                defer.returnValue(web_connectivity_settings)
-            raise e.NoReachableTestHelpers
-        else:
-            defer.returnValue(test_helper_address.encode('ascii'))
-
-    @defer.inlineCallbacks
-    def getReachableTestHelpersAndCollectors(self, net_tests):
-        for net_test in net_tests:
-
-            primary_address = net_test['collector']
-            alternate_addresses = net_test.get('collector-alternate', [])
-            net_test['collector'] = yield self.getReachableCollector(
-                        primary_address,
-                        alternate_addresses
-            )
-
-            for test_helper_name, test_helper_address in net_test['test-helpers'].items():
-                 test_helper_alternate = \
-                     net_test.get('test-helpers-alternate', {}).get(test_helper_name, [])
-                 net_test['test-helpers'][test_helper_name] = \
-                            yield self.getReachableTestHelper(
-                                test_helper_name,
-                                test_helper_address,
-                                test_helper_alternate)
-
-        defer.returnValue(net_tests)
-
-    @defer.inlineCallbacks
-    def lookupCollectorAndTestHelpers(self):
-        required_nettests = []
-
-        requires_test_helpers = False
-        requires_collector = False
-        for net_test_loader in self.netTestLoaders:
-            nettest = {
-                'name': net_test_loader.testName,
-                'version': net_test_loader.testVersion,
-                'test-helpers': [],
-                'input-hashes': [x['hash'] for x in net_test_loader.inputFiles]
-            }
-            if not net_test_loader.collector and not self.no_collector:
-                requires_collector = True
-
-            if len(net_test_loader.missingTestHelpers) > 0:
-                requires_test_helpers = True
-                nettest['test-helpers'] += map(lambda x: x[1],
-                                               net_test_loader.missingTestHelpers)
-
-            required_nettests.append(nettest)
-
-        if not requires_test_helpers and not requires_collector:
-            defer.returnValue(None)
-
-        response = yield self.bouncer.lookupTestCollector(required_nettests)
-        try:
-            provided_net_tests = yield self.getReachableTestHelpersAndCollectors(response['net-tests'])
-        except e.NoReachableCollectors:
-            log.err("Could not find any reachable collector")
-            raise
-        except e.NoReachableTestHelpers:
-            log.err("Could not find any reachable test helpers")
-            raise
-
-        def find_collector_and_test_helpers(test_name, test_version, input_files):
-            input_files = [u""+x['hash'] for x in input_files]
-            for net_test in provided_net_tests:
-                if net_test['name'] != test_name:
-                    continue
-                if net_test['version'] != test_version:
-                    continue
-                if set(net_test['input-hashes']) != set(input_files):
-                    continue
-                return net_test['collector'], net_test['test-helpers']
-
-        for net_test_loader in self.netTestLoaders:
-            log.msg("Setting collector and test helpers for %s" %
-                    net_test_loader.testName)
-
-            collector, test_helpers = \
-                find_collector_and_test_helpers(test_name=net_test_loader.testName,
-                                                test_version=net_test_loader.testVersion,
-                                                input_files=net_test_loader.inputFiles)
-
-            for option, name in net_test_loader.missingTestHelpers:
-                test_helper_address_or_settings = test_helpers[name]
-                net_test_loader.localOptions[option] = test_helper_address_or_settings
-                net_test_loader.testHelpers[option] = test_helper_address_or_settings
-
-            if not net_test_loader.collector:
-                net_test_loader.collector = collector
+            yield lookup_collector_and_test_helpers(self.netTestLoaders,
+                                                    self.bouncer,
+                                                    self.preferred_backend,
+                                                    self.no_collector)
 
     @defer.inlineCallbacks
     def fetchAndVerifyNetTestInput(self, net_test_loader):
@@ -419,17 +274,197 @@ class Deck(InputFile):
                 i['test_options'][i['key']] = input_file.cached_file
 
 
+def lookup_collector_and_test_helpers(net_test_loaders,
+                                      bouncer,
+                                      preferred_backend,
+                                      no_collector=False):
+    required_nettests = []
+
+    requires_test_helpers = False
+    requires_collector = False
+    for net_test_loader in net_test_loaders:
+        nettest = {
+            'name': net_test_loader.testName,
+            'version': net_test_loader.testVersion,
+            'test-helpers': [],
+            'input-hashes': [x['hash'] for x in net_test_loader.inputFiles]
+        }
+        if not net_test_loader.collector and not no_collector:
+            requires_collector = True
+
+        if len(net_test_loader.missingTestHelpers) > 0:
+            requires_test_helpers = True
+            nettest['test-helpers'] += map(lambda x: x[1],
+                                           net_test_loader.missingTestHelpers)
+
+        required_nettests.append(nettest)
+
+    if not requires_test_helpers and not requires_collector:
+        defer.returnValue(None)
+
+    response = yield bouncer.lookupTestCollector(required_nettests)
+    try:
+        provided_net_tests = yield get_reachable_test_helpers_and_collectors(
+            response['net-tests'], preferred_backend)
+    except e.NoReachableCollectors:
+        log.err("Could not find any reachable collector")
+        raise
+    except e.NoReachableTestHelpers:
+        log.err("Could not find any reachable test helpers")
+        raise
+
+    def find_collector_and_test_helpers(test_name, test_version, input_files):
+        input_files = [u""+x['hash'] for x in input_files]
+        for net_test in provided_net_tests:
+            if net_test['name'] != test_name:
+                continue
+            if net_test['version'] != test_version:
+                continue
+            if set(net_test['input-hashes']) != set(input_files):
+                continue
+            return net_test['collector'], net_test['test-helpers']
+
+    for net_test_loader in net_test_loaders:
+        log.msg("Setting collector and test helpers for %s" %
+                net_test_loader.testName)
+
+        collector, test_helpers = \
+            find_collector_and_test_helpers(test_name=net_test_loader.testName,
+                                            test_version=net_test_loader.testVersion,
+                                            input_files=net_test_loader.inputFiles)
+
+        for option, name in net_test_loader.missingTestHelpers:
+            test_helper_address_or_settings = test_helpers[name]
+            net_test_loader.localOptions[option] = test_helper_address_or_settings
+            net_test_loader.testHelpers[option] = test_helper_address_or_settings
+
+        if not net_test_loader.collector:
+            net_test_loader.collector = collector
+
+
+ at defer.inlineCallbacks
+def get_reachable_test_helpers_and_collectors(net_tests, preferred_backend):
+    for net_test in net_tests:
+        primary_address = net_test['collector']
+        alternate_addresses = net_test.get('collector-alternate', [])
+        net_test['collector'] = yield get_reachable_collector(
+            primary_address, alternate_addresses, preferred_backend)
+
+        for test_helper_name, test_helper_address in net_test['test-helpers'].items():
+             test_helper_alternate = \
+                 net_test.get('test-helpers-alternate', {}).get(test_helper_name, [])
+             net_test['test-helpers'][test_helper_name] = \
+                        yield get_reachable_test_helper(test_helper_name,
+                                                        test_helper_address,
+                                                        test_helper_alternate,
+                                                        preferred_backend)
+
+    defer.returnValue(net_tests)
+
+ at defer.inlineCallbacks
+def get_reachable_collector(collector_address, collector_alternate,
+                            preferred_backend):
+    # We prefer onion collector to https collector to cloudfront
+    # collectors to plaintext collectors
+    for collector_settings in sort_addresses_by_priority(
+            collector_address,
+            collector_alternate,
+            preferred_backend):
+        collector = CollectorClient(settings=collector_settings)
+        if not collector.isSupported():
+            log.err("Unsupported %s collector %s" % (
+                        collector_settings['type'],
+                        collector_settings['address']))
+            continue
+        reachable = yield collector.isReachable()
+        if not reachable:
+            log.err("Unreachable %s collector %s" % (
+                        collector_settings['type'],
+                        collector_settings['address']))
+            continue
+        defer.returnValue(collector)
+
+    raise e.NoReachableCollectors
+
+
+ at defer.inlineCallbacks
+def get_reachable_test_helper(test_helper_name, test_helper_address,
+                              test_helper_alternate, preferred_backend):
+    # For the moment we look for alternate addresses only of
+    # web_connectivity test helpers.
+    if test_helper_name == 'web-connectivity':
+        for web_connectivity_settings in sort_addresses_by_priority(
+                test_helper_address, test_helper_alternate,
+                preferred_backend):
+            web_connectivity_test_helper = WebConnectivityClient(
+                settings=web_connectivity_settings)
+            if not web_connectivity_test_helper.isSupported():
+                log.err("Unsupported %s web_connectivity test_helper "
+                        "%s" % (
+                        web_connectivity_settings['type'],
+                        web_connectivity_settings['address']
+                ))
+                continue
+            reachable = yield web_connectivity_test_helper.isReachable()
+            if not reachable:
+                log.err("Unreachable %s web_connectivity test helper %s" % (
+                    web_connectivity_settings['type'],
+                    web_connectivity_settings['address']
+                ))
+                continue
+            defer.returnValue(web_connectivity_settings)
+        raise e.NoReachableTestHelpers
+    else:
+        defer.returnValue(test_helper_address.encode('ascii'))
+
+def sort_addresses_by_priority(priority_address,
+                               alternate_addresses,
+                               preferred_backend):
+    prioritised_addresses = []
+
+    backend_type = guess_backend_type(priority_address)
+    priority_address = {
+        'address': priority_address,
+        'type': backend_type
+    }
+    address_priority = ['onion', 'https', 'cloudfront', 'http']
+    address_priority.remove(preferred_backend)
+    address_priority.insert(0, preferred_backend)
+
+    def filter_by_type(collectors, collector_type):
+        return filter(lambda x: x['type'] == collector_type, collectors)
+
+    if (priority_address['type'] != preferred_backend):
+        valid_alternatives = filter_by_type(alternate_addresses,
+                                            preferred_backend)
+        if len(valid_alternatives) > 0:
+            alternate_addresses += [priority_address]
+            priority_address = valid_alternatives[0]
+            alternate_addresses.remove(priority_address)
+
+    prioritised_addresses += [priority_address]
+    for address_type in address_priority:
+        prioritised_addresses += filter_by_type(alternate_addresses,
+                                                address_type)
+
+    return prioritised_addresses
+
+
+class InputNotFound(Exception):
+    pass
+
+
 class InputStore(object):
     def __init__(self):
         self.path = FilePath(config.inputs_directory)
         self.resources = FilePath(config.resources_directory)
+        self._cache_stale = True
+        self._cache = {}
 
     @defer.inlineCallbacks
     def update_url_lists(self, country_code):
         countries = ["global"]
-        if country_code == "ZZ":
-            country_code = None
-        else:
+        if country_code != "ZZ":
             countries.append(country_code)
 
         for cc in countries:
@@ -466,30 +501,58 @@ class InputStore(object):
                     "name": name,
                     "filepath": out_file.path,
                     "last_updated": timestampNowISO8601UTC(),
-                    "id": "citizenlab_test_lists_{0}_txt".format(cc),
+                    "id": "citizenlab_{0}_urls".format(cc),
                     "type": "file/url"
                 }, out_fh)
 
     @defer.inlineCallbacks
     def create(self, country_code=None):
+        # XXX This is a hax to avoid race conditions in testing because this
+        #  object is a singleton and config can have a custom home directory
+        #  passed at runtime.
+        self.path = FilePath(config.inputs_directory)
+        self.resources = FilePath(config.resources_directory)
+
         self.path.child("descriptors").makedirs(ignoreExistingDirectory=True)
         self.path.child("data").makedirs(ignoreExistingDirectory=True)
         yield self.update_url_lists(country_code)
 
     @defer.inlineCallbacks
     def update(self, country_code=None):
-        yield self.update_url_lists(country_code)
+        # XXX why do we make a difference between create and update?
+        yield self.create(country_code)
 
-    def list(self):
-        inputs = []
+    def _update_cache(self):
         descs = self.path.child("descriptors")
         if not descs.exists():
-            return inputs
+            self._cache = {}
+            return
 
         for fn in descs.listdir():
             with descs.child(fn).open("r") as in_fh:
-                inputs.append(json.load(in_fh))
-        return inputs
+                input_desc = json.load(in_fh)
+                self._cache[input_desc.pop("id")] = input_desc
+        self._cache_stale = False
+        return
+
+    def list(self):
+        if self._cache_stale:
+            self._update_cache()
+        return self._cache
+
+    def get(self, input_id):
+        if self._cache_stale:
+            self._update_cache()
+        try:
+            input_desc = self._cache[input_id]
+        except KeyError:
+            raise InputNotFound(input_id)
+        return input_desc
+
+    def getContent(self, input_id):
+        input_desc = self.get(input_id)
+        with open(input_desc["filepath"]) as fh:
+            return fh.read()
 
 class DeckStore(object):
     def __init__(self):
@@ -501,10 +564,268 @@ class DeckStore(object):
     def get(self):
         pass
 
-class NGInput(object):
-    def __init__(self, input_name):
-        pass
+def resolve_file_path(v, prepath=None):
+    if v.startswith("$"):
+        # This raises InputNotFound and we let it carry onto the caller
+        return input_store.get(v[1:])["filepath"]
+    elif prepath is not None and (not os.path.isabs(v)):
+        return FilePath(prepath).preauthChild(v).path
+    return v
+
+def options_to_args(options, prepath=None):
+    args = []
+    for k, v in options.items():
+        if v is None:
+            continue
+        if k == "file":
+            v = resolve_file_path(v, prepath)
+        args.append('--'+k)
+        args.append(v)
+    return args
+
+class UnknownTaskKey(Exception):
+    pass
+
+class MissingTaskDataKey(Exception):
+    pass
+
+class DeckTask(object):
+    _metadata_keys = ["name"]
+    _supported_tasks = ["ooni"]
+
+    def __init__(self, data, parent_metadata={}, cwd=None):
+        self.parent_metadata = parent_metadata
+        self.cwd = cwd
+        self.data = deepcopy(data)
+
+        self.id = ""
+
+        self.type = None
+        self.metadata = {}
+        self.requires_tor = False
+        self.requires_bouncer = False
+
+        self.ooni = {
+            'bouncer_client': None,
+            'test_details': {}
+        }
+
+        self._load(data)
+
+    def _load_ooni(self, task_data):
+        required_keys = ["test_name"]
+        for required_key in required_keys:
+            if required_key not in task_data:
+                raise MissingTaskDataKey(required_key)
+
+        # This raises e.NetTestNotFound, we let it go onto the caller
+        nettest_path = nettest_to_path(task_data.pop("test_name"))
+
+        try:
+            annotations = task_data.pop('annotations')
+        except KeyError:
+            annotations = self.parent_metadata.get('annotations', {})
+
+        try:
+            collector_address = task_data.pop('collector')
+        except KeyError:
+            collector_address = self.parent_metadata.get('collector', None)
+
+        net_test_loader = NetTestLoader(
+            options_to_args(task_data),
+            annotations=annotations,
+            test_file=nettest_path
+        )
+
+        if isinstance(collector_address, dict):
+            net_test_loader.collector = CollectorClient(
+                settings=collector_address
+            )
+        elif collector_address is not None:
+            net_test_loader.collector = CollectorClient(
+                collector_address
+            )
+
+        if (net_test_loader.collector is not None and
+                net_test_loader.collector.backend_type == "onion"):
+            self.requires_tor = True
+
+        try:
+            net_test_loader.checkOptions()
+            if net_test_loader.requiresTor:
+                self.requires_tor = True
+        except e.MissingTestHelper:
+            self.requires_bouncer = True
+
+        self.ooni['net_test_loader'] = net_test_loader
+        # Need to ensure that this is called only once we have looked up the
+        #  probe IP address and have geoip data.
+        self.ooni['test_details'] = net_test_loader.getTestDetails()
+        self.id = generate_filename(self.ooni['test_details'])
+
+    def _load(self, data):
+        for key in self._metadata_keys:
+            try:
+                self.metadata[key] = data.pop(key)
+            except KeyError:
+                continue
+
+        task_type, task_data = data.popitem()
+        if task_type not in self._supported_tasks:
+            raise UnknownTaskKey(task_type)
+        self.type = task_type
+        getattr(self, "_load_"+task_type)(task_data)
+
+        assert len(data) == 0
 
 class NGDeck(object):
-    def __init__(self, deck_path):
-        pass
+    def __init__(self, deck_data=None,
+                 deck_path=None, no_collector=False):
+        # Used to resolve relative paths inside of decks.
+        self.deck_directory = None
+        self.requires_tor = False
+        self.no_collector = no_collector
+        self.name = ""
+        self.description = ""
+        self.schedule = None
+
+        self.metadata = {}
+        self.bouncer = None
+
+        self._measurement_path = FilePath(config.measurements_directory)
+        self._tasks = []
+        self.task_ids = []
+
+        if deck_path is not None:
+            self.open(deck_path)
+        elif deck_data is not None:
+            self.load(deck_data)
+
+    def open(self, deck_path):
+        with open(deck_path) as fh:
+            deck_data = yaml.safe_load(fh)
+        self.load(deck_data)
+
+    def write(self, fh):
+        """
+        Writes a properly formatted deck to the supplied file handle.
+        :param fh: an open file handle
+        :return:
+        """
+        deck_data = {
+            "name": self.name,
+            "description": self.description,
+            "tasks": [task.data for task in self._tasks]
+        }
+        if self.schedule is not None:
+            deck_data["schedule"] = self.schedule
+        for key, value in self.metadata.items():
+            deck_data[key] = value
+
+        fh.write("---\n")
+        yaml.safe_dump(deck_data, fh, default_flow_style=False)
+
+    def load(self, deck_data):
+        self.name = deck_data.pop("name", "Un-named Deck")
+        self.description = deck_data.pop("description", "No description")
+
+        bouncer_address = deck_data.pop("bouncer", None)
+        if bouncer_address is None:
+            self.bouncer = get_preferred_bouncer()
+        elif isinstance(bouncer_address, dict):
+            self.bouncer = BouncerClient(settings=bouncer_address)
+        else:
+            self.bouncer = BouncerClient(bouncer_address)
+
+        self.schedule = deck_data.pop("schedule", None)
+
+        tasks_data = deck_data.pop("tasks", [])
+        for key, metadata in deck_data.items():
+            self.metadata[key] = metadata
+
+        for task_data in tasks_data:
+            deck_task = DeckTask(task_data, self.metadata, self.deck_directory)
+            if deck_task.requires_tor:
+                self.requires_tor = True
+            if (deck_task.requires_bouncer and
+                    self.bouncer.backend_type == "onion"):
+                self.requires_tor = True
+            self._tasks.append(deck_task)
+            self.task_ids.append(deck_task.id)
+
+    @defer.inlineCallbacks
+    def query_bouncer(self):
+        preferred_backend = config.advanced.get(
+            "preferred_backend", "onion"
+        )
+        log.msg("Looking up collector and test helpers with {0}".format(
+            self.bouncer.base_address)
+        )
+        net_test_loaders = []
+        for task in self._tasks:
+            if task.type == "ooni":
+                net_test_loaders.append(task.ooni["net_test_loader"])
+
+        yield lookup_collector_and_test_helpers(
+            net_test_loaders,
+            self.bouncer,
+            preferred_backend,
+            self.no_collector
+        )
+
+    def _measurement_completed(self, result, measurement_id):
+        log.msg("{0}".format(result))
+        measurement_dir = self._measurement_path.child(measurement_id)
+        measurement_dir.child("measurements.njson.progress").moveTo(
+            measurement_dir.child("measurements.njson")
+        )
+        generate_summary(
+            measurement_dir.child("measurements.njson").path,
+            measurement_dir.child("summary.json").path
+        )
+        measurement_dir.child("running.pid").remove()
+
+    def _measurement_failed(self, failure, measurement_id):
+        measurement_dir = self._measurement_path.child(measurement_id)
+        measurement_dir.child("running.pid").remove()
+        # XXX do we also want to delete measurements.njson.progress?
+        return failure
+
+    def _run_ooni_task(self, task, director):
+        net_test_loader = task.ooni["net_test_loader"]
+        test_details = task.ooni["test_details"]
+        measurement_id = task.id
+
+        measurement_dir = self._measurement_path.child(measurement_id)
+        measurement_dir.createDirectory()
+
+        report_filename = measurement_dir.child("measurements.njson.progress").path
+        pid_file = measurement_dir.child("running.pid")
+
+        with pid_file.open('w') as out_file:
+            out_file.write("{0}".format(os.getpid()))
+
+        d = director.start_net_test_loader(
+            net_test_loader,
+            report_filename,
+            test_details=test_details
+        )
+        d.addCallback(self._measurement_completed, measurement_id)
+        d.addErrback(self._measurement_failed, measurement_id)
+        return d
+
+    @defer.inlineCallbacks
+    def run(self, director):
+        tasks = []
+        preferred_backend = config.advanced.get("preferred_backend", "onion")
+        yield self.query_bouncer()
+        for task in self._tasks:
+            if task.requires_tor:
+                yield director.start_tor()
+            elif task.requires_bouncer and preferred_backend == "onion":
+                yield director.start_tor()
+            if task.type == "ooni":
+                tasks.append(self._run_ooni_task(task, director))
+        defer.returnValue(tasks)
+
+input_store = InputStore()
diff --git a/ooni/director.py b/ooni/director.py
index d39a11b..793975e 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -1,19 +1,24 @@
 import pwd
 import os
 
+from twisted.internet import defer
+from twisted.python.failure import Failure
+
 from ooni.managers import ReportEntryManager, MeasurementManager
 from ooni.reporter import Report
 from ooni.utils import log, generate_filename
 from ooni.utils.net import randomFreePort
 from ooni.nettest import NetTest, getNetTestInformation
 from ooni.settings import config
-from ooni import errors
 from ooni.nettest import normalizeTestName
 from ooni.deck import InputStore
 
 from ooni.utils.onion import start_tor, connect_to_control_port
 
-from twisted.internet import defer
+class DirectorEvent(object):
+    def __init__(self, type="update", message=""):
+        self.type = type
+        self.message = message
 
 
 class Director(object):
@@ -86,8 +91,6 @@ class Director(object):
 
         self.failures = []
 
-        self.torControlProtocol = None
-
         # This deferred is fired once all the measurements and their reporting
         # tasks are completed.
         self.allTestsDone = defer.Deferred()
@@ -95,6 +98,58 @@ class Director(object):
 
         self.input_store = InputStore()
 
+        self._reset_director_state()
+        self._reset_tor_state()
+
+        self._subscribers = []
+
+    def subscribe(self, handler):
+        self._subscribers.append(handler)
+
+    def unsubscribe(self, handler):
+        self._subscribers.remove(handler)
+
+    def notify(self, event):
+        for handler in self._subscribers:
+            handler(event)
+
+    def _reset_director_state(self):
+        self._director_state = 'not-running'
+        self._director_starting = defer.Deferred()
+        self._director_starting.addErrback(self._director_startup_failure)
+        self._director_starting.addCallback(self._director_startup_success)
+
+    def _director_startup_failure(self, failure):
+        self._reset_director_state()
+        self.notify(DirectorEvent("error",
+                                  "Failed to start the director"))
+        return failure
+
+    def _director_startup_success(self, result):
+        self._director_state = 'running'
+        self.notify(DirectorEvent("success",
+                                  "Successfully started the director"))
+        return result
+
+    def _reset_tor_state(self):
+        # This can be either 'not-running', 'starting' or 'running'
+        self._tor_state = 'not-running'
+        self._tor_starting = defer.Deferred()
+        self._tor_starting.addErrback(self._tor_startup_failure)
+        self._tor_starting.addCallback(self._tor_startup_success)
+
+    def _tor_startup_failure(self, failure):
+        self._reset_tor_state()
+        self.notify(DirectorEvent("error",
+                                  "Failed to start Tor"))
+        return failure
+
+    def _tor_startup_success(self, result):
+        self._tor_state = 'running'
+        self.notify(DirectorEvent("success",
+                                  "Successfully started Tor"))
+        return result
+
     def getNetTests(self):
         nettests = {}
 
@@ -126,16 +181,11 @@ class Director(object):
         return nettests
 
     @defer.inlineCallbacks
-    def start(self, start_tor=False, check_incoherences=True):
+    def _start(self, start_tor, check_incoherences):
         self.netTests = self.getNetTests()
 
         if start_tor:
-            if check_incoherences:
-                yield config.check_tor()
-            if config.advanced.start_tor and config.tor_state is None:
-                yield self.startTor()
-            elif config.tor.control_port and config.tor_state is None:
-                yield connect_to_control_port()
+            yield self.start_tor(check_incoherences)
 
         if config.global_options.get('no-geoip'):
             aux = [False]
@@ -146,8 +196,21 @@ class Director(object):
                 log.msg("You should add annotations for the country, city and ASN")
         else:
             yield config.probe_ip.lookup()
+            self.notify(DirectorEvent("success",
+                                      "Looked up Probe IP"))
 
         yield self.input_store.create(config.probe_ip.geodata["countrycode"])
+        self.notify(DirectorEvent("success",
+                                  "Created input store"))
+
+    @defer.inlineCallbacks
+    def start(self, start_tor=False, check_incoherences=True):
+        self._director_state = 'starting'
+        try:
+            yield self._start(start_tor, check_incoherences)
+            self._director_starting.callback(self._director_state)
+        except Exception as exc:
+            self._director_starting.errback(Failure(exc))
 
     @property
     def measurementSuccessRatio(self):
@@ -217,26 +280,17 @@ class Director(object):
         measurement.result = failure
         return measurement
 
-    def reporterFailed(self, failure, net_test):
-        """
-        This gets called every time a reporter is failing and has been removed
-        from the reporters of a NetTest.
-        Once a report has failed to be created that net_test will never use the
-        reporter again.
-
-        XXX hook some logic here.
-        note: failure contains an extra attribute called failure.reporter
-        """
-        pass
-
     def netTestDone(self, net_test):
+        self.notify(DirectorEvent("success",
+                                  "Successfully ran net_test"))
         self.activeNetTests.remove(net_test)
         if len(self.activeNetTests) == 0:
             self.allTestsDone.callback(None)
 
     @defer.inlineCallbacks
-    def startNetTest(self, net_test_loader, report_filename,
-                     collector_client=None, no_yamloo=False):
+    def start_net_test_loader(self, net_test_loader, report_filename,
+                              collector_client=None, no_yamloo=False,
+                              test_details=None):
         """
         Create the Report for the NetTest and start the report NetTest.
 
@@ -244,14 +298,15 @@ class Director(object):
             net_test_loader:
                 an instance of :class:ooni.nettest.NetTestLoader
         """
-        test_details = net_test_loader.getTestDetails()
+        if test_details is None:
+            test_details = net_test_loader.getTestDetails()
         test_cases = net_test_loader.getTestCases()
 
         if self.allTestsDone.called:
             self.allTestsDone = defer.Deferred()
 
         if config.privacy.includepcap or config.global_options.get('pcapfile', None):
-            self.startSniffing(test_details)
+            self.start_sniffing(test_details)
         report = Report(test_details, report_filename,
                         self.reportEntryManager,
                         collector_client,
@@ -271,7 +326,7 @@ class Director(object):
         finally:
             self.netTestDone(net_test)
 
-    def startSniffing(self, test_details):
+    def start_sniffing(self, test_details):
         """ Start sniffing with Scapy. Exits if required privileges (root) are not
         available.
         """
@@ -303,57 +358,83 @@ class Director(object):
         log.msg("Starting packet capture to: %s" % filename_pcap)
 
 
-    def startTor(self):
+    @defer.inlineCallbacks
+    def start_tor(self, check_incoherences=False):
         """ Starts Tor
         Launches a Tor with :param: socks_port :param: control_port
         :param: tor_binary set in ooniprobe.conf
         """
-        log.msg("Starting Tor...")
-
         from txtorcon import TorConfig
+        if self._tor_state == 'running':
+            log.debug("Tor is already running")
+            defer.returnValue(self._tor_state)
+        elif self._tor_state == 'starting':
+            yield self._tor_starting
+            defer.returnValue(self._tor_state)
 
-        tor_config = TorConfig()
-        if config.tor.control_port is None:
-            config.tor.control_port = int(randomFreePort())
-        if config.tor.socks_port is None:
-            config.tor.socks_port = int(randomFreePort())
-
-        tor_config.ControlPort = config.tor.control_port
-        tor_config.SocksPort = config.tor.socks_port
-
-        if config.tor.data_dir:
-            data_dir = os.path.expanduser(config.tor.data_dir)
-
-            if not os.path.exists(data_dir):
-                log.msg("%s does not exist. Creating it." % data_dir)
-                os.makedirs(data_dir)
-            tor_config.DataDirectory = data_dir
-
-        if config.tor.bridges:
-            tor_config.UseBridges = 1
-            if config.advanced.obfsproxy_binary:
-                tor_config.ClientTransportPlugin = (
-                    'obfs2,obfs3 exec %s managed' %
-                    config.advanced.obfsproxy_binary
-                )
-            bridges = []
-            with open(config.tor.bridges) as f:
-                for bridge in f:
-                    if 'obfs' in bridge:
-                        if config.advanced.obfsproxy_binary:
+        log.msg("Starting Tor...")
+        self._tor_state = 'starting'
+        if check_incoherences:
+            yield config.check_tor()
+
+        if config.advanced.start_tor and config.tor_state is None:
+            tor_config = TorConfig()
+            if config.tor.control_port is None:
+                config.tor.control_port = int(randomFreePort())
+            if config.tor.socks_port is None:
+                config.tor.socks_port = int(randomFreePort())
+
+            tor_config.ControlPort = config.tor.control_port
+            tor_config.SocksPort = config.tor.socks_port
+
+            if config.tor.data_dir:
+                data_dir = os.path.expanduser(config.tor.data_dir)
+
+                if not os.path.exists(data_dir):
+                    log.msg("%s does not exist. Creating it." % data_dir)
+                    os.makedirs(data_dir)
+                tor_config.DataDirectory = data_dir
+
+            if config.tor.bridges:
+                tor_config.UseBridges = 1
+                if config.advanced.obfsproxy_binary:
+                    tor_config.ClientTransportPlugin = (
+                        'obfs2,obfs3 exec %s managed' %
+                        config.advanced.obfsproxy_binary
+                    )
+                bridges = []
+                with open(config.tor.bridges) as f:
+                    for bridge in f:
+                        if 'obfs' in bridge:
+                            if config.advanced.obfsproxy_binary:
+                                bridges.append(bridge.strip())
+                        else:
                             bridges.append(bridge.strip())
-                    else:
-                        bridges.append(bridge.strip())
-            tor_config.Bridge = bridges
-
-        if config.tor.torrc:
-            for i in config.tor.torrc.keys():
-                setattr(tor_config, i, config.tor.torrc[i])
-
-        if os.geteuid() == 0:
-            tor_config.User = pwd.getpwuid(os.geteuid()).pw_name
-
-        tor_config.save()
-        log.debug("Setting control port as %s" % tor_config.ControlPort)
-        log.debug("Setting SOCKS port as %s" % tor_config.SocksPort)
-        return start_tor(tor_config)
+                tor_config.Bridge = bridges
+
+            if config.tor.torrc:
+                for i in config.tor.torrc.keys():
+                    setattr(tor_config, i, config.tor.torrc[i])
+
+            if os.geteuid() == 0:
+                tor_config.User = pwd.getpwuid(os.geteuid()).pw_name
+
+            tor_config.save()
+            log.debug("Setting control port as %s" % tor_config.ControlPort)
+            log.debug("Setting SOCKS port as %s" % tor_config.SocksPort)
+            try:
+                yield start_tor(tor_config)
+                log.err("Calling tor callback")
+                self._tor_starting.callback(self._tor_state)
+                log.err("called")
+            except Exception as exc:
+                log.err("Failed to start tor")
+                log.exc(exc)
+                self._tor_starting.errback(Failure(exc))
+
+        elif config.tor.control_port and config.tor_state is None:
+            try:
+                yield connect_to_control_port()
+                self._tor_starting.callback(self._tor_state)
+            except Exception as exc:
+                self._tor_starting.errback(Failure(exc))
diff --git a/ooni/geoip.py b/ooni/geoip.py
index fa6d1ae..28e0e1e 100644
--- a/ooni/geoip.py
+++ b/ooni/geoip.py
@@ -136,11 +136,11 @@ class UbuntuGeoIP(HTTPGeoIPLookupper):
         probe_ip = m.group(1)
         return probe_ip
 
-class TorProjectGeoIP(HTTPGeoIPLookupper):
-    url = "https://check.torproject.org/"
+class DuckDuckGoGeoIP(HTTPGeoIPLookupper):
+    url = "https://duckduckgo.com/?q=ip&ia=answer"
 
     def parseResponse(self, response_body):
-        regexp = "Your IP address appears to be:  <strong>((\d+\.)+(\d+))"
+        regexp = "Your IP address is (.*) in "
         probe_ip = re.search(regexp, response_body).group(1)
         return probe_ip
 
@@ -151,7 +151,7 @@ class ProbeIP(object):
     def __init__(self):
         self.geoIPServices = {
             'ubuntu': UbuntuGeoIP,
-            'torproject': TorProjectGeoIP
+            'duckduckgo': DuckDuckGoGeoIP
         }
         self.geodata = {
             'asn': 'AS0',
diff --git a/ooni/measurements.py b/ooni/measurements.py
deleted file mode 100644
index 976b125..0000000
--- a/ooni/measurements.py
+++ /dev/null
@@ -1,43 +0,0 @@
-import json
-
-class GenerateResults(object):
-    supported_tests = [
-        "web_connectivity"
-    ]
-
-    def __init__(self, input_file):
-        self.input_file = input_file
-
-    def process_web_connectivity(self, entry):
-        result = {}
-        result['anomaly'] = False
-        if entry['test_keys']['blocking'] is not False:
-            result['anomaly'] = True
-        result['url'] = entry['input']
-        return result
-
-    def output(self, output_file):
-        results = {}
-        with open(self.input_file) as in_file:
-            for idx, line in enumerate(in_file):
-                entry = json.loads(line.strip())
-                if entry['test_name'] not in self.supported_tests:
-                    raise Exception("Unsupported test")
-                result = getattr(self, 'process_'+entry['test_name'])(entry)
-                result['idx'] = idx
-                results['test_name'] = entry['test_name']
-                results['country_code'] = entry['probe_cc']
-                results['asn'] = entry['probe_asn']
-                results['results'] = results.get('results', [])
-                results['results'].append(result)
-
-        with open(output_file, "w") as fw:
-            json.dump(results, fw)
-
-if __name__ == "__main__":
-    import sys
-    if len(sys.argv) != 3:
-        print("Usage: {0} [input_file] [output_file]".format(sys.argv[0]))
-        sys.exit(1)
-    gr = GenerateResults(sys.argv[1])
-    gr.output(sys.argv[2])
diff --git a/ooni/nettest.py b/ooni/nettest.py
index d01cf7b..2a33a2f 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -254,7 +254,8 @@ class NetTestLoader(object):
                     h = sha256()
                     for l in f:
                         h.update(l)
-            except:
+            except Exception as exc:
+                log.exception(exc)
                 raise e.InvalidInputFile(filename)
             input_file['hash'] = h.hexdigest()
         self.inputFiles.append(input_file)
diff --git a/ooni/results.py b/ooni/results.py
new file mode 100644
index 0000000..21fe997
--- /dev/null
+++ b/ooni/results.py
@@ -0,0 +1,39 @@
+import json
+
+class Process():
+    supported_tests = [
+        "web_connectivity"
+    ]
+    @staticmethod
+    def web_connectivity(entry):
+        result = {}
+        result['anomaly'] = False
+        if entry['test_keys']['blocking'] is not False:
+            result['anomaly'] = True
+        result['url'] = entry['input']
+        return result
+
+def generate_summary(input_file, output_file):
+    results = {}
+    with open(input_file) as in_file:
+        for idx, line in enumerate(in_file):
+            entry = json.loads(line.strip())
+            result = {}
+            if entry['test_name'] in Process.supported_tests:
+                result = getattr(Process, entry['test_name'])(entry)
+            result['idx'] = idx
+            results['test_name'] = entry['test_name']
+            results['country_code'] = entry['probe_cc']
+            results['asn'] = entry['probe_asn']
+            results['results'] = results.get('results', [])
+            results['results'].append(result)
+
+    with open(output_file, "w") as fw:
+        json.dump(results, fw)
+
+if __name__ == "__main__":
+    import sys
+    if len(sys.argv) != 3:
+        print("Usage: {0} [input_file] [output_file]".format(sys.argv[0]))
+        sys.exit(1)
+    generate_summary(sys.argv[1], sys.argv[2])
diff --git a/ooni/tests/bases.py b/ooni/tests/bases.py
index 904aaba..40e3b5e 100644
--- a/ooni/tests/bases.py
+++ b/ooni/tests/bases.py
@@ -10,7 +10,7 @@ class ConfigTestCase(unittest.TestCase):
     def setUp(self):
         self.ooni_home_dir = os.path.abspath("ooni_home")
         self.config = config
-        self.config.initialize_ooni_home("ooni_home")
+        self.config.initialize_ooni_home(self.ooni_home_dir)
         super(ConfigTestCase, self).setUp()
 
     def skipTest(self, reason):
diff --git a/ooni/tests/test_deck.py b/ooni/tests/test_deck.py
index ba87ec8..455f4e1 100644
--- a/ooni/tests/test_deck.py
+++ b/ooni/tests/test_deck.py
@@ -1,11 +1,15 @@
 import os
 
+from copy import deepcopy
+
 from twisted.internet import defer
 from twisted.trial import unittest
 
 from hashlib import sha256
 from ooni import errors
-from ooni.deck import InputFile, Deck, nettest_to_path
+from ooni.deck import input_store, lookup_collector_and_test_helpers
+from ooni.nettest import NetTestLoader
+from ooni.deck import InputFile, Deck, nettest_to_path, DeckTask, NGDeck
 from ooni.tests.bases import ConfigTestCase
 from ooni.tests.mocks import MockBouncerClient, MockCollectorClient
 
@@ -182,7 +186,8 @@ class TestDeck(BaseTestCase, ConfigTestCase):
 
         self.assertEqual(len(deck.netTestLoaders[0].missingTestHelpers), 1)
 
-        yield deck.lookupCollectorAndTestHelpers()
+        yield lookup_collector_and_test_helpers(deck.preferred_backend,
+                                                deck.netTestLoaders)
 
         self.assertEqual(deck.netTestLoaders[0].collector.settings['address'],
                          'httpo://thirteenchars123.onion')
@@ -229,7 +234,8 @@ class TestDeck(BaseTestCase, ConfigTestCase):
 
         self.assertEqual(len(deck.netTestLoaders[0].missingTestHelpers), 1)
 
-        yield deck.lookupCollectorAndTestHelpers()
+        yield lookup_collector_and_test_helpers(deck.preferred_backend,
+                                                deck.netTestLoaders)
 
         self.assertEqual(
             deck.netTestLoaders[0].collector.settings['address'],
@@ -258,7 +264,8 @@ class TestDeck(BaseTestCase, ConfigTestCase):
 
         self.assertEqual(len(deck.netTestLoaders[0].missingTestHelpers), 1)
 
-        yield deck.lookupCollectorAndTestHelpers()
+        yield lookup_collector_and_test_helpers(deck.preferred_backend,
+                                                deck.netTestLoaders)
 
         self.assertEqual(
             deck.netTestLoaders[0].collector.settings['address'],
@@ -269,3 +276,45 @@ class TestDeck(BaseTestCase, ConfigTestCase):
             deck.netTestLoaders[0].localOptions['backend'],
             '127.0.0.1'
         )
+
+class TestInputStore(ConfigTestCase):
+    @defer.inlineCallbacks
+    def test_update_input_store(self):
+        self.skipTest("antani")
+        yield input_store.update("ZZ")
+        print os.listdir(os.path.join(
+            self.config.resources_directory, "citizenlab-test-lists"))
+        print os.listdir(os.path.join(self.config.inputs_directory))
+
+TASK_DATA = {
+    "name": "Some Task",
+    "ooni": {
+        "test_name": "web_connectivity",
+        "file": "$citizen_lab_global_urls"
+    }
+}
+
+DECK_DATA = {
+    "name": "My deck",
+    "description": "Something",
+    "tasks": [
+        deepcopy(TASK_DATA)
+    ]
+}
+class TestNGDeck(ConfigTestCase):
+    skip = True
+    def test_deck_task(self):
+        if self.skip:
+            self.skipTest("Skip is set to true")
+        yield input_store.update("ZZ")
+        deck_task = DeckTask(TASK_DATA)
+        self.assertIsInstance(deck_task.ooni["net_test_loader"],
+                              NetTestLoader)
+
+    @defer.inlineCallbacks
+    def test_deck_load(self):
+        if self.skip:
+            self.skipTest("Skip is set to true")
+        yield input_store.update("ZZ")
+        deck = NGDeck(deck_data=DECK_DATA)
+        self.assertEqual(len(deck.tasks), 1)
diff --git a/ooni/tests/test_director.py b/ooni/tests/test_director.py
index 18377f5..6638adb 100644
--- a/ooni/tests/test_director.py
+++ b/ooni/tests/test_director.py
@@ -73,7 +73,7 @@ class TestDirector(ConfigTestCase):
         @defer.inlineCallbacks
         def director_start_tor():
             director = Director()
-            yield director.startTor()
+            yield director.start_tor()
             assert config.tor.socks_port == 4242
             assert config.tor.control_port == 4242
 
@@ -93,7 +93,7 @@ class TestDirector(ConfigTestCase):
         net_test_loader.loadNetTestString(test_failing_twice)
         director = Director()
         director.netTestDone = net_test_done
-        director.startNetTest(net_test_loader, None, no_yamloo=True)
+        director.start_net_test_loader(net_test_loader, None, no_yamloo=True)
         return finished
 
 
@@ -113,7 +113,7 @@ class TestStartSniffing(unittest.TestCase):
     def test_start_sniffing_once(self):
         with patch('ooni.settings.config.scapyFactory') as mock_scapy_factory:
             with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer:
-                self.director.startSniffing(self.testDetails)
+                self.director.start_sniffing(self.testDetails)
                 sniffer = mock_scapy_sniffer.return_value
                 mock_scapy_factory.registerProtocol.assert_called_once_with(sniffer)
 
@@ -122,7 +122,7 @@ class TestStartSniffing(unittest.TestCase):
             with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer:
                 sniffer = mock_scapy_sniffer.return_value
                 sniffer.pcapwriter.filename = 'foo1_filename'
-                self.director.startSniffing(self.testDetails)
+                self.director.start_sniffing(self.testDetails)
                 self.assertEqual(len(self.director.sniffers), 1)
 
             self.testDetails = {
@@ -132,13 +132,13 @@ class TestStartSniffing(unittest.TestCase):
             with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer:
                 sniffer = mock_scapy_sniffer.return_value
                 sniffer.pcapwriter.filename = 'foo2_filename'
-                self.director.startSniffing(self.testDetails)
+                self.director.start_sniffing(self.testDetails)
                 self.assertEqual(len(self.director.sniffers), 2)
 
     def test_measurement_succeeded(self):
         with patch('ooni.settings.config.scapyFactory') as mock_scapy_factory:
             with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer:
-                self.director.startSniffing(self.testDetails)
+                self.director.start_sniffing(self.testDetails)
                 self.assertEqual(len(self.director.sniffers), 1)
                 measurement = MagicMock()
                 measurement.testInstance = self.FooTestCase()
diff --git a/ooni/tests/test_nettest.py b/ooni/tests/test_nettest.py
index 239080a..1592149 100644
--- a/ooni/tests/test_nettest.py
+++ b/ooni/tests/test_nettest.py
@@ -355,7 +355,7 @@ class TestNetTest(ConfigTestCase):
         director = Director()
 
         self.filename = 'dummy_report.yamloo'
-        d = director.startNetTest(ntl, self.filename)
+        d = director.start_net_test_loader(ntl, self.filename)
 
         @d.addCallback
         def complete(result):
@@ -382,7 +382,7 @@ class TestNetTest(ConfigTestCase):
 
         director = Director()
         self.filename = 'dummy_report.yamloo'
-        d = director.startNetTest(ntl, self.filename)
+        d = director.start_net_test_loader(ntl, self.filename)
 
         @d.addCallback
         def complete(result):
@@ -410,7 +410,7 @@ class TestNetTest(ConfigTestCase):
 
         director = Director()
         self.filename = 'dummy_report.yamloo'
-        d = director.startNetTest(ntl, self.filename)
+        d = director.start_net_test_loader(ntl, self.filename)
 
         @d.addCallback
         def complete(result):
@@ -469,7 +469,7 @@ class TestNettestTimeout(ConfigTestCase):
         director = Director()
 
         self.filename = 'dummy_report.yamloo'
-        d = director.startNetTest(ntl, self.filename)
+        d = director.start_net_test_loader(ntl, self.filename)
 
         @d.addCallback
         def complete(result):
diff --git a/ooni/tests/test_oonideckgen.py b/ooni/tests/test_oonideckgen.py
index 4a52377..11a852f 100644
--- a/ooni/tests/test_oonideckgen.py
+++ b/ooni/tests/test_oonideckgen.py
@@ -1,10 +1,11 @@
 import os
-import yaml
 import tempfile
 
+import yaml
+
+from ooni.scripts import oonideckgen
 from .bases import ConfigTestCase
 
-from ooni.deckgen import cli
 
 class TestOONIDeckgen(ConfigTestCase):
     def setUp(self):
@@ -25,7 +26,7 @@ class TestOONIDeckgen(ConfigTestCase):
 
     def test_generate_deck(self):
         temp_dir = tempfile.mkdtemp()
-        cli.generate_deck({
+        oonideckgen.generate_deck({
             "country-code": "it",
             "output": temp_dir,
             "collector": None,
diff --git a/ooni/tests/test_oonireport.py b/ooni/tests/test_oonireport.py
index 2275672..d71a403 100644
--- a/ooni/tests/test_oonireport.py
+++ b/ooni/tests/test_oonireport.py
@@ -5,8 +5,6 @@ from mock import patch, MagicMock
 from twisted.internet import defer
 from ooni.tests.bases import ConfigTestCase
 
-from ooni.report import tool
-
 mock_tor_check = MagicMock(return_value=True)
 
 class TestOONIReport(ConfigTestCase):
@@ -51,9 +49,9 @@ class TestOONIReport(ConfigTestCase):
             cli.run(["upload"])
             self.assertTrue(mock_tool.upload_all.called)
 
-    @patch('ooni.report.tool.CollectorClient')
-    @patch('ooni.report.tool.OONIBReportLog')
-    @patch('ooni.report.tool.OONIBReporter')
+    @patch('ooni.report.cli.CollectorClient')
+    @patch('ooni.report.cli.OONIBReportLog')
+    @patch('ooni.report.cli.OONIBReporter')
     def test_tool_upload(self, mock_oonib_reporter, mock_oonib_report_log,
                          mock_collector_client):
 
@@ -70,7 +68,7 @@ class TestOONIReport(ConfigTestCase):
         self._create_reporting_yaml(report_name)
         self._write_dummy_report(report_name)
 
-        d = tool.upload(report_name)
+        d = cli.upload(report_name)
         @d.addCallback
         def cb(result):
             mock_oonib_reporter_i.writeReportEntry.assert_called_with(
@@ -78,9 +76,9 @@ class TestOONIReport(ConfigTestCase):
             )
         return d
 
-    @patch('ooni.report.tool.CollectorClient')
-    @patch('ooni.report.tool.OONIBReportLog')
-    @patch('ooni.report.tool.OONIBReporter')
+    @patch('ooni.report.cli.CollectorClient')
+    @patch('ooni.report.cli.OONIBReportLog')
+    @patch('ooni.report.cli.OONIBReporter')
     def test_tool_upload_all(self, mock_oonib_reporter, mock_oonib_report_log,
                          mock_collector_client):
 
@@ -98,7 +96,7 @@ class TestOONIReport(ConfigTestCase):
         self._create_reporting_yaml(report_name)
         self._write_dummy_report(report_name)
 
-        d = tool.upload_all()
+        d = cli.upload_all()
         @d.addCallback
         def cb(result):
             mock_oonib_reporter_i.writeReportEntry.assert_called_with(
diff --git a/ooni/ui/cli.py b/ooni/ui/cli.py
index 7a0036e..fe24bf6 100644
--- a/ooni/ui/cli.py
+++ b/ooni/ui/cli.py
@@ -334,10 +334,10 @@ def runTestWithDirector(director, global_options, url=None, start_tor=True):
                 collector_client = setupCollector(global_options,
                                                   net_test_loader.collector)
 
-            yield director.startNetTest(net_test_loader,
-                                        global_options['reportfile'],
-                                        collector_client,
-                                        global_options['no-yamloo'])
+            yield director.start_net_test_loader(net_test_loader,
+                                                 global_options['reportfile'],
+                                                 collector_client,
+                                                 global_options['no-yamloo'])
 
     d.addCallback(setup_nettest)
     d.addCallback(post_director_start)
diff --git a/ooni/ui/web/client/index.html b/ooni/ui/web/client/index.html
index cc45067..e306ef2 100644
--- a/ooni/ui/web/client/index.html
+++ b/ooni/ui/web/client/index.html
@@ -13,5 +13,5 @@
     <app>
       Loading...
     </app>
-  <script type="text/javascript" src="app.bundle.js?27ae67e2c74ae4ae9a82"></script></body>
+  <script type="text/javascript" src="app.bundle.js?7ed7d7510803fa1a4ad8"></script></body>
 </html>
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index e63c08f..a862fe7 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -13,44 +13,58 @@ from werkzeug.exceptions import NotFound
 
 from ooni import __version__ as ooniprobe_version
 from ooni import errors
-from ooni.deck import Deck
+from ooni.deck import NGDeck
 from ooni.settings import config
-from ooni.nettest import NetTestLoader
-from ooni.measurements import GenerateResults
-from ooni.utils import generate_filename
+from ooni.utils import log
+from ooni.director import DirectorEvent
+
+config.advanced.debug = True
 
 def rpath(*path):
     context = os.path.abspath(os.path.dirname(__file__))
     return os.path.join(context, *path)
 
-def getNetTestLoader(test_options, test_file):
-    """
-    Args:
-        test_options: (dict) containing as keys the option names.
-
-        test_file: (string) the path to the test_file to be run.
-    Returns:
-        an instance of :class:`ooni.nettest.NetTestLoader` with the specified
-        test_file and the specified options.
-        """
-    options = []
-    for k, v in test_options.items():
-        if v is None:
-            print("Skipping %s because none" % k)
-            continue
-        options.append('--'+k)
-        options.append(v)
-
-    net_test_loader = NetTestLoader(options,
-            test_file=test_file)
-    return net_test_loader
-
-
 class WebUIError(Exception):
     def __init__(self, code, message):
         self.code = code
         self.message = message
 
+class LongPoller(object):
+    def __init__(self, timeout, _reactor=reactor):
+        self.lock = defer.DeferredLock()
+
+        self.deferred_subscribers = []
+        self._reactor = _reactor
+        self._timeout = timeout
+
+        self.timer = task.LoopingCall(
+            self.notify,
+            DirectorEvent("null", "No updates"),
+        )
+        self.timer.clock = self._reactor
+
+    def start(self):
+        self.timer.start(self._timeout)
+
+    def stop(self):
+        self.timer.stop()
+
+    def _notify(self, lock, event):
+        for d in self.deferred_subscribers[:]:
+            assert not d.called, "Deferred is already called"
+            d.callback(event)
+            self.deferred_subscribers.remove(d)
+        self.timer.reset()
+        lock.release()
+
+    def notify(self, event=None):
+        self.lock.acquire().addCallback(self._notify, event)
+
+    def get(self):
+        d = defer.Deferred()
+        self.deferred_subscribers.append(d)
+        return d
+
 class WebUIAPI(object):
     app = Klein()
     # Maximum number in seconds after which to return a result even if not
@@ -58,7 +72,7 @@ class WebUIAPI(object):
     _long_polling_timeout = 5
     _reactor = reactor
 
-    def __init__(self, config, director):
+    def __init__(self, config, director, _reactor=reactor):
         self.director = director
         self.config = config
         self.measurement_path = FilePath(config.measurements_directory)
@@ -74,12 +88,26 @@ class WebUIAPI(object):
             "director_started": False,
             "failures": []
         }
-        self.status_updates = []
-        d = self.director.start(start_tor=True)
+
+        self.status_poller = LongPoller(
+            self._long_polling_timeout, _reactor)
+        self.director_event_poller = LongPoller(
+            self._long_polling_timeout, _reactor)
+
+        # XXX move this elsewhere
+        self.director_event_poller.start()
+        self.status_poller.start()
+
+        self.director.subscribe(self.handle_director_event)
+        d = self.director.start()
 
         d.addCallback(self.director_started)
         d.addErrback(self.director_startup_failed)
-        d.addBoth(lambda _: self.broadcast_status_update())
+        d.addBoth(lambda _: self.status_poller.notify())
+
+    def handle_director_event(self, event):
+        log.msg("Handling event {0}".format(event.type))
+        self.director_event_poller.notify(event)
 
     def add_failure(self, failure):
         self.status['failures'].append(str(failure))
@@ -92,26 +120,12 @@ class WebUIAPI(object):
     def director_startup_failed(self, failure):
         self.add_failure(failure)
 
-    def broadcast_status_update(self):
-        for su in self.status_updates:
-            if not su.called:
-                su.callback(None)
-
     def completed_measurement(self, measurement_id):
         del self.status['active_measurements'][measurement_id]
         self.status['completed_measurements'].append(measurement_id)
-        measurement_dir = self.measurement_path.child(measurement_id)
-
-        measurement = measurement_dir.child('measurements.njson.progress')
-
-        # Generate the summary.json file
-        summary = measurement_dir.child('summary.json')
-        gr = GenerateResults(measurement.path)
-        gr.output(summary.path)
-
-        measurement.moveTo(measurement_dir.child('measurements.njson'))
 
     def failed_measurement(self, measurement_id, failure):
+        log.exception(failure)
         del self.status['active_measurements'][measurement_id]
         self.add_failure(str(failure))
 
@@ -119,8 +133,9 @@ class WebUIAPI(object):
     def not_found(self, request, _):
         request.redirect('/client/')
 
-    @app.handle_error(WebUIError)
-    def web_ui_error(self, request, error):
+    @app.handle_errors(WebUIError)
+    def web_ui_error(self, request, failure):
+        error = failure.value
         request.setResponseCode(error.code)
         return self.render_json({
             "error_code": error.code,
@@ -133,24 +148,28 @@ class WebUIAPI(object):
         request.setHeader('Content-Length', len(json_string))
         return json_string
 
+    @app.route('/api/notify', methods=["GET"])
+    def api_notify(self, request):
+        def got_director_event(event):
+            return self.render_json({
+                "type": event.type,
+                "message": event.message
+            }, request)
+        d = self.director_event_poller.get()
+        d.addCallback(got_director_event)
+        return d
+
     @app.route('/api/status', methods=["GET"])
     def api_status(self, request):
         return self.render_json(self.status, request)
 
     @app.route('/api/status/update', methods=["GET"])
     def api_status_update(self, request):
-        status_update = defer.Deferred()
-        status_update.addCallback(lambda _:
-                                  self.status_updates.remove(status_update))
-        status_update.addCallback(lambda _: self.api_status(request))
-
-        self.status_updates.append(status_update)
-
-        # After long_polling_timeout we fire the callback
-        task.deferLater(self._reactor, self._long_polling_timeout,
-                        status_update.callback, None)
-
-        return status_update
+        def got_status_update(event):
+            return self.api_status(request)
+        d = self.status_poller.get()
+        d.addCallback(got_status_update)
+        return d
 
     @app.route('/api/deck/generate', methods=["GET"])
     def api_deck_generate(self, request):
@@ -167,37 +186,23 @@ class WebUIAPI(object):
 
         return self.render_json({"command": "deck-list"}, request)
 
-    @defer.inlineCallbacks
     def run_deck(self, deck):
-        yield deck.setup()
-        measurement_ids = []
-        for net_test_loader in deck.netTestLoaders:
-            # XXX synchronize this with startNetTest
-            test_details = net_test_loader.getTestDetails()
-            measurement_id = generate_filename(test_details)
-
-            measurement_dir = self.measurement_path.child(measurement_id)
-            measurement_dir.createDirectory()
-
-            report_filename = measurement_dir.child(
-                "measurements.njson.progress").path
-
-            measurement_ids.append(measurement_id)
-            self.status['active_measurements'][measurement_id] = {
-                'test_name': test_details['test_name'],
-                'test_start_time': test_details['test_start_time']
+        for task_id in deck.task_ids:
+            self.status['active_measurements'][task_id] = {
+                'test_name': 'foobar',
+                'test_start_time': 'some start time'
             }
-            self.broadcast_status_update()
-            d = self.director.startNetTest(net_test_loader, report_filename)
-            d.addCallback(lambda _:
-                          self.completed_measurement(measurement_id))
-            d.addErrback(lambda failure:
-                         self.failed_measurement(measurement_id, failure))
+        self.status_poller.notify()
+        d = deck.run(self.director)
+        d.addCallback(lambda _:
+                      self.completed_measurement(task_id))
+        d.addErrback(lambda failure:
+                     self.failed_measurement(task_id, failure))
 
     @app.route('/api/nettest/<string:test_name>/start', methods=["POST"])
     def api_nettest_start(self, request, test_name):
         try:
-            net_test = self.director.netTests[test_name]
+            _ = self.director.netTests[test_name]
         except KeyError:
             raise WebUIError(500, 'Could not find the specified test')
 
@@ -206,10 +211,15 @@ class WebUIAPI(object):
         except ValueError:
             raise WebUIError(500, 'Invalid JSON message recevied')
 
-        deck = Deck(no_collector=True) # XXX remove no_collector
-        net_test_loader = getNetTestLoader(test_options, net_test['path'])
+        test_options["test_name"] = test_name
+        deck_data = {
+            "tasks": [
+                {"ooni": test_options}
+            ]
+        }
         try:
-            deck.insert(net_test_loader)
+            deck = NGDeck(no_collector=True)
+            deck.load(deck_data)
             self.run_deck(deck)
 
         except errors.MissingRequiredOption, option_name:
@@ -237,6 +247,19 @@ class WebUIAPI(object):
     def api_input_list(self, request):
         return self.render_json(self.director.input_store.list(), request)
 
+    @app.route('/api/input/<string:input_id>/content', methods=["GET"])
+    def api_input_content(self, request, input_id):
+        content = self.director.input_store.getContent(input_id)
+        request.setHeader('Content-Type', 'text/plain')
+        request.setHeader('Content-Length', len(content))
+        return content
+
+    @app.route('/api/input/<string:input_id>', methods=["GET"])
+    def api_input_details(self, request, input_id):
+        return self.render_json(
+            self.director.input_store.get(input_id), request
+        )
+
     @app.route('/api/measurement', methods=["GET"])
     def api_measurement_list(self, request):
         measurements = []
@@ -299,7 +322,3 @@ class WebUIAPI(object):
     def static(self, request):
         path = rpath("client")
         return static.File(path)
-<<<<<<< acda284b56fa3a75acbe7d000fbdefb643839948
-
-=======
->>>>>>> [Web UI] Refactoring of web UI
diff --git a/ooni/ui/web/web.py b/ooni/ui/web/web.py
index 40ee3b4..eca75cb 100644
--- a/ooni/ui/web/web.py
+++ b/ooni/ui/web/web.py
@@ -1,53 +1,27 @@
-import os
-
-from twisted.scripts import twistd
-from twisted.python import usage
-from twisted.internet import reactor
 from twisted.web import server
+from twisted.internet import reactor
 from twisted.application import service
 
+from ooni.ui.web.server import WebUIAPI
 from ooni.settings import config
-from ooni.director import Director
-from ooni.utils import log
-
-from .server import WebUIAPI
 
 class WebUIService(service.MultiService):
-    portNum = 8822
+    def __init__(self, director, port_number=8842):
+        service.MultiService.__init__(self)
+
+        self.director = director
+        self.port_number = port_number
+
     def startService(self):
         service.MultiService.startService(self)
-        config.set_paths()
-        config.initialize_ooni_home()
-        config.read_config_file()
-        director = Director()
-        web_ui_api = WebUIAPI(config, director)
-        root = server.Site(web_ui_api.app.resource())
-        self._port = reactor.listenTCP(self.portNum, root)
-        d = director.start()
+
+        web_ui_api = WebUIAPI(config, self.director)
+        self._port = reactor.listenTCP(
+            self.port_number,
+            server.Site(web_ui_api.app.resource())
+        )
 
     def stopService(self):
+        service.MultiService.stopService(self)
         if self._port:
             self._port.stopListening()
-
-class StartOoniprobeWebUIPlugin:
-    tapname = "ooniprobe"
-    def makeService(self, so):
-        return WebUIService()
-
-class OoniprobeTwistdConfig(twistd.ServerOptions):
-    subCommands = [("StartOoniprobeWebUI", None, usage.Options, "ooniprobe web ui")]
-
-def start():
-    twistd_args = ["--nodaemon"]
-    twistd_config = OoniprobeTwistdConfig()
-    twistd_args.append("StartOoniprobeWebUI")
-    try:
-        twistd_config.parseOptions(twistd_args)
-    except usage.error, ue:
-        print("ooniprobe: usage error from twistd: {}\n".format(ue))
-    twistd_config.loadedPlugins = {"StartOoniprobeWebUI": StartOoniprobeWebUIPlugin()}
-    twistd.runApp(twistd_config)
-    return 0
-
-if __name__ == "__main__":
-    start()
diff --git a/ooni/utils/onion.py b/ooni/utils/onion.py
index cc2f2ff..e18a6ee 100644
--- a/ooni/utils/onion.py
+++ b/ooni/utils/onion.py
@@ -237,20 +237,6 @@ class TorLauncherWithRetries(object):
                 continue
             setattr(new_tor_config, key, getattr(self.tor_config, key))
         self.tor_config = new_tor_config
-        self.timeout = timeout
-
-    def _reset_tor_config(self):
-        """
-        This is used to reset the Tor configuration to before launch_tor
-        modified it. This is in particular used to force the regeneration of the
-        DataDirectory.
-        """
-        new_tor_config = TorConfig()
-        for key in self.tor_config:
-            if config.tor.data_dir is None and key == "DataDirectory":
-                continue
-            setattr(new_tor_config, key, getattr(self.tor_config, key))
-        self.tor_config = new_tor_config
 
     def _progress_updates(self, prog, tag, summary):
         log.msg("%d%%: %s" % (prog, summary))





More information about the tor-commits mailing list