commit 7829363f1066a469995c0410025d7e895522363f Author: Arturo Filastò arturo@filasto.net Date: Mon Jul 25 16:08:05 2016 +0200
Add minimal outline of the ooniprobe-agent and new deck format
* Use DuckDuckGo to perform geoip lookups instead of torproject.org
* Big refactoring of the Director --- ooni/agent/__init__.py | 0 ooni/agent/agent.py | 21 ++ ooni/agent/scheduler.py | 27 ++ ooni/deck.py | 715 +++++++++++++++++++++++++++++------------ ooni/director.py | 231 ++++++++----- ooni/geoip.py | 8 +- ooni/measurements.py | 43 --- ooni/nettest.py | 3 +- ooni/results.py | 39 +++ ooni/tests/bases.py | 2 +- ooni/tests/test_deck.py | 57 +++- ooni/tests/test_director.py | 12 +- ooni/tests/test_nettest.py | 8 +- ooni/tests/test_oonideckgen.py | 7 +- ooni/tests/test_oonireport.py | 18 +- ooni/ui/cli.py | 8 +- ooni/ui/web/client/index.html | 2 +- ooni/ui/web/server.py | 203 ++++++------ ooni/ui/web/web.py | 56 +--- ooni/utils/onion.py | 14 - 20 files changed, 974 insertions(+), 500 deletions(-)
diff --git a/ooni/agent/__init__.py b/ooni/agent/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ooni/agent/agent.py b/ooni/agent/agent.py new file mode 100644 index 0000000..a1394f0 --- /dev/null +++ b/ooni/agent/agent.py @@ -0,0 +1,21 @@ +from twisted.application import service +from ooni.director import Director +from ooni.settings import config + +from ooni.ui.web.web import WebUIService +from ooni.agent.scheduler import SchedulerService + +class AgentService(service.MultiService): + def __init__(self): + service.MultiService.__init__(self) + + director = Director() + config.set_paths() + config.initialize_ooni_home() + config.read_config_file() + + self.web_ui_service = WebUIService(director) + self.web_ui_service.setServiceParent(self) + + self.scheduler_service = SchedulerService(director) + self.scheduler_service.setServiceParent(self) diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py new file mode 100644 index 0000000..1004597 --- /dev/null +++ b/ooni/agent/scheduler.py @@ -0,0 +1,27 @@ +from twisted.application import service +from twisted.internet import task + +class SchedulerService(service.MultiService): + """ + This service is responsible for running the periodic tasks. + """ + def __init__(self, director, interval=30): + service.MultiService.__init__(self) + self.director = director + self.interval = interval + self._looping_call = task.LoopingCall(self._should_run) + + def _should_run(self): + """ + This function is called every self.interval seconds to check + which periodic tasks should be run. + """ + pass + + def startService(self): + service.MultiService.startService(self) + self._looping_call.start(self.interval) + + def stopService(self): + service.MultiService.stopService(self) + self._looping_call.stop() diff --git a/ooni/deck.py b/ooni/deck.py index 9f17530..1746d26 100644 --- a/ooni/deck.py +++ b/ooni/deck.py @@ -1,27 +1,28 @@ # -*- coding: utf-8 -*- -import csv import os -import yaml +import csv import json
+from copy import deepcopy from hashlib import sha256 -from datetime import datetime -from ooni.backend_client import CollectorClient, BouncerClient -from ooni.backend_client import WebConnectivityClient, guess_backend_type -from ooni.nettest import NetTestLoader -from ooni.settings import config
-from ooni.otime import timestampNowISO8601UTC +import yaml
-from ooni.resources.update import check_for_update +from twisted.internet import defer +from twisted.python.filepath import FilePath
-from ooni.utils import log from ooni import constants from ooni import errors as e +from ooni.backend_client import CollectorClient, BouncerClient +from ooni.backend_client import WebConnectivityClient, guess_backend_type +from ooni.nettest import NetTestLoader +from ooni.otime import timestampNowISO8601UTC +from ooni.resources import check_for_update +from ooni.settings import config +from ooni.utils import generate_filename +from ooni.utils import log
-from twisted.python.filepath import FilePath -from twisted.internet import defer - +from ooni.results import generate_summary
class InputFile(object): def __init__(self, input_hash, base_path=config.inputs_directory): @@ -116,6 +117,25 @@ def nettest_to_path(path, allow_arbitrary_paths=False): return found_path
+def get_preferred_bouncer(): + preferred_backend = config.advanced.get( + "preferred_backend", "onion" + ) + bouncer_address = getattr( + constants, "CANONICAL_BOUNCER_{0}".format( + preferred_backend.upper() + ) + ) + if preferred_backend == "cloudfront": + return BouncerClient( + settings={ + 'address': bouncer_address[0], + 'front': bouncer_address[1], + 'type': 'cloudfront' + }) + else: + return BouncerClient(bouncer_address) + class Deck(InputFile): # this exists so we can mock it out in unittests _BouncerClient = BouncerClient @@ -227,175 +247,10 @@ class Deck(InputFile): if self.bouncer: log.msg("Looking up collector and test helpers with {0}".format( self.bouncer.base_address)) - yield self.lookupCollectorAndTestHelpers() - - - def sortAddressesByPriority(self, priority_address, alternate_addresses): - prioritised_addresses = [] - - backend_type = guess_backend_type(priority_address) - priority_address = { - 'address': priority_address, - 'type': backend_type - } - address_priority = ['onion', 'https', 'cloudfront', 'http'] - address_priority.remove(self.preferred_backend) - address_priority.insert(0, self.preferred_backend) - - def filter_by_type(collectors, collector_type): - return filter(lambda x: x['type'] == collector_type, collectors) - - if (priority_address['type'] != self.preferred_backend): - valid_alternatives = filter_by_type(alternate_addresses, - self.preferred_backend) - if len(valid_alternatives) > 0: - alternate_addresses += [priority_address] - priority_address = valid_alternatives[0] - alternate_addresses.remove(priority_address) - - prioritised_addresses += [priority_address] - for address_type in address_priority: - prioritised_addresses += filter_by_type(alternate_addresses, - address_type) - - return prioritised_addresses - - @defer.inlineCallbacks - def getReachableCollector(self, collector_address, collector_alternate): - # We prefer onion collector to https collector to cloudfront - # collectors to plaintext collectors - for collector_settings in self.sortAddressesByPriority(collector_address, - collector_alternate): - collector = self._CollectorClient(settings=collector_settings) - if not collector.isSupported(): - log.err("Unsupported %s collector %s" % ( - collector_settings['type'], - collector_settings['address'])) - continue - reachable = yield collector.isReachable() - if not reachable: - log.err("Unreachable %s collector %s" % ( - collector_settings['type'], - collector_settings['address'])) - continue - defer.returnValue(collector) - - raise e.NoReachableCollectors - - @defer.inlineCallbacks - def getReachableTestHelper(self, test_helper_name, test_helper_address, - test_helper_alternate): - # For the moment we look for alternate addresses only of - # web_connectivity test helpers. - if test_helper_name == 'web-connectivity': - for web_connectivity_settings in self.sortAddressesByPriority( - test_helper_address, test_helper_alternate): - web_connectivity_test_helper = WebConnectivityClient( - settings=web_connectivity_settings) - if not web_connectivity_test_helper.isSupported(): - log.err("Unsupported %s web_connectivity test_helper " - "%s" % ( - web_connectivity_settings['type'], - web_connectivity_settings['address'] - )) - continue - reachable = yield web_connectivity_test_helper.isReachable() - if not reachable: - log.err("Unreachable %s web_connectivity test helper %s" % ( - web_connectivity_settings['type'], - web_connectivity_settings['address'] - )) - continue - defer.returnValue(web_connectivity_settings) - raise e.NoReachableTestHelpers - else: - defer.returnValue(test_helper_address.encode('ascii')) - - @defer.inlineCallbacks - def getReachableTestHelpersAndCollectors(self, net_tests): - for net_test in net_tests: - - primary_address = net_test['collector'] - alternate_addresses = net_test.get('collector-alternate', []) - net_test['collector'] = yield self.getReachableCollector( - primary_address, - alternate_addresses - ) - - for test_helper_name, test_helper_address in net_test['test-helpers'].items(): - test_helper_alternate = \ - net_test.get('test-helpers-alternate', {}).get(test_helper_name, []) - net_test['test-helpers'][test_helper_name] = \ - yield self.getReachableTestHelper( - test_helper_name, - test_helper_address, - test_helper_alternate) - - defer.returnValue(net_tests) - - @defer.inlineCallbacks - def lookupCollectorAndTestHelpers(self): - required_nettests = [] - - requires_test_helpers = False - requires_collector = False - for net_test_loader in self.netTestLoaders: - nettest = { - 'name': net_test_loader.testName, - 'version': net_test_loader.testVersion, - 'test-helpers': [], - 'input-hashes': [x['hash'] for x in net_test_loader.inputFiles] - } - if not net_test_loader.collector and not self.no_collector: - requires_collector = True - - if len(net_test_loader.missingTestHelpers) > 0: - requires_test_helpers = True - nettest['test-helpers'] += map(lambda x: x[1], - net_test_loader.missingTestHelpers) - - required_nettests.append(nettest) - - if not requires_test_helpers and not requires_collector: - defer.returnValue(None) - - response = yield self.bouncer.lookupTestCollector(required_nettests) - try: - provided_net_tests = yield self.getReachableTestHelpersAndCollectors(response['net-tests']) - except e.NoReachableCollectors: - log.err("Could not find any reachable collector") - raise - except e.NoReachableTestHelpers: - log.err("Could not find any reachable test helpers") - raise - - def find_collector_and_test_helpers(test_name, test_version, input_files): - input_files = [u""+x['hash'] for x in input_files] - for net_test in provided_net_tests: - if net_test['name'] != test_name: - continue - if net_test['version'] != test_version: - continue - if set(net_test['input-hashes']) != set(input_files): - continue - return net_test['collector'], net_test['test-helpers'] - - for net_test_loader in self.netTestLoaders: - log.msg("Setting collector and test helpers for %s" % - net_test_loader.testName) - - collector, test_helpers = \ - find_collector_and_test_helpers(test_name=net_test_loader.testName, - test_version=net_test_loader.testVersion, - input_files=net_test_loader.inputFiles) - - for option, name in net_test_loader.missingTestHelpers: - test_helper_address_or_settings = test_helpers[name] - net_test_loader.localOptions[option] = test_helper_address_or_settings - net_test_loader.testHelpers[option] = test_helper_address_or_settings - - if not net_test_loader.collector: - net_test_loader.collector = collector + yield lookup_collector_and_test_helpers(self.netTestLoaders, + self.bouncer, + self.preferred_backend, + self.no_collector)
@defer.inlineCallbacks def fetchAndVerifyNetTestInput(self, net_test_loader): @@ -419,17 +274,197 @@ class Deck(InputFile): i['test_options'][i['key']] = input_file.cached_file
+def lookup_collector_and_test_helpers(net_test_loaders, + bouncer, + preferred_backend, + no_collector=False): + required_nettests = [] + + requires_test_helpers = False + requires_collector = False + for net_test_loader in net_test_loaders: + nettest = { + 'name': net_test_loader.testName, + 'version': net_test_loader.testVersion, + 'test-helpers': [], + 'input-hashes': [x['hash'] for x in net_test_loader.inputFiles] + } + if not net_test_loader.collector and not no_collector: + requires_collector = True + + if len(net_test_loader.missingTestHelpers) > 0: + requires_test_helpers = True + nettest['test-helpers'] += map(lambda x: x[1], + net_test_loader.missingTestHelpers) + + required_nettests.append(nettest) + + if not requires_test_helpers and not requires_collector: + defer.returnValue(None) + + response = yield bouncer.lookupTestCollector(required_nettests) + try: + provided_net_tests = yield get_reachable_test_helpers_and_collectors( + response['net-tests'], preferred_backend) + except e.NoReachableCollectors: + log.err("Could not find any reachable collector") + raise + except e.NoReachableTestHelpers: + log.err("Could not find any reachable test helpers") + raise + + def find_collector_and_test_helpers(test_name, test_version, input_files): + input_files = [u""+x['hash'] for x in input_files] + for net_test in provided_net_tests: + if net_test['name'] != test_name: + continue + if net_test['version'] != test_version: + continue + if set(net_test['input-hashes']) != set(input_files): + continue + return net_test['collector'], net_test['test-helpers'] + + for net_test_loader in net_test_loaders: + log.msg("Setting collector and test helpers for %s" % + net_test_loader.testName) + + collector, test_helpers = \ + find_collector_and_test_helpers(test_name=net_test_loader.testName, + test_version=net_test_loader.testVersion, + input_files=net_test_loader.inputFiles) + + for option, name in net_test_loader.missingTestHelpers: + test_helper_address_or_settings = test_helpers[name] + net_test_loader.localOptions[option] = test_helper_address_or_settings + net_test_loader.testHelpers[option] = test_helper_address_or_settings + + if not net_test_loader.collector: + net_test_loader.collector = collector + + +@defer.inlineCallbacks +def get_reachable_test_helpers_and_collectors(net_tests, preferred_backend): + for net_test in net_tests: + primary_address = net_test['collector'] + alternate_addresses = net_test.get('collector-alternate', []) + net_test['collector'] = yield get_reachable_collector( + primary_address, alternate_addresses, preferred_backend) + + for test_helper_name, test_helper_address in net_test['test-helpers'].items(): + test_helper_alternate = \ + net_test.get('test-helpers-alternate', {}).get(test_helper_name, []) + net_test['test-helpers'][test_helper_name] = \ + yield get_reachable_test_helper(test_helper_name, + test_helper_address, + test_helper_alternate, + preferred_backend) + + defer.returnValue(net_tests) + +@defer.inlineCallbacks +def get_reachable_collector(collector_address, collector_alternate, + preferred_backend): + # We prefer onion collector to https collector to cloudfront + # collectors to plaintext collectors + for collector_settings in sort_addresses_by_priority( + collector_address, + collector_alternate, + preferred_backend): + collector = CollectorClient(settings=collector_settings) + if not collector.isSupported(): + log.err("Unsupported %s collector %s" % ( + collector_settings['type'], + collector_settings['address'])) + continue + reachable = yield collector.isReachable() + if not reachable: + log.err("Unreachable %s collector %s" % ( + collector_settings['type'], + collector_settings['address'])) + continue + defer.returnValue(collector) + + raise e.NoReachableCollectors + + +@defer.inlineCallbacks +def get_reachable_test_helper(test_helper_name, test_helper_address, + test_helper_alternate, preferred_backend): + # For the moment we look for alternate addresses only of + # web_connectivity test helpers. + if test_helper_name == 'web-connectivity': + for web_connectivity_settings in sort_addresses_by_priority( + test_helper_address, test_helper_alternate, + preferred_backend): + web_connectivity_test_helper = WebConnectivityClient( + settings=web_connectivity_settings) + if not web_connectivity_test_helper.isSupported(): + log.err("Unsupported %s web_connectivity test_helper " + "%s" % ( + web_connectivity_settings['type'], + web_connectivity_settings['address'] + )) + continue + reachable = yield web_connectivity_test_helper.isReachable() + if not reachable: + log.err("Unreachable %s web_connectivity test helper %s" % ( + web_connectivity_settings['type'], + web_connectivity_settings['address'] + )) + continue + defer.returnValue(web_connectivity_settings) + raise e.NoReachableTestHelpers + else: + defer.returnValue(test_helper_address.encode('ascii')) + +def sort_addresses_by_priority(priority_address, + alternate_addresses, + preferred_backend): + prioritised_addresses = [] + + backend_type = guess_backend_type(priority_address) + priority_address = { + 'address': priority_address, + 'type': backend_type + } + address_priority = ['onion', 'https', 'cloudfront', 'http'] + address_priority.remove(preferred_backend) + address_priority.insert(0, preferred_backend) + + def filter_by_type(collectors, collector_type): + return filter(lambda x: x['type'] == collector_type, collectors) + + if (priority_address['type'] != preferred_backend): + valid_alternatives = filter_by_type(alternate_addresses, + preferred_backend) + if len(valid_alternatives) > 0: + alternate_addresses += [priority_address] + priority_address = valid_alternatives[0] + alternate_addresses.remove(priority_address) + + prioritised_addresses += [priority_address] + for address_type in address_priority: + prioritised_addresses += filter_by_type(alternate_addresses, + address_type) + + return prioritised_addresses + + +class InputNotFound(Exception): + pass + + class InputStore(object): def __init__(self): self.path = FilePath(config.inputs_directory) self.resources = FilePath(config.resources_directory) + self._cache_stale = True + self._cache = {}
@defer.inlineCallbacks def update_url_lists(self, country_code): countries = ["global"] - if country_code == "ZZ": - country_code = None - else: + if country_code != "ZZ": countries.append(country_code)
for cc in countries: @@ -466,30 +501,58 @@ class InputStore(object): "name": name, "filepath": out_file.path, "last_updated": timestampNowISO8601UTC(), - "id": "citizenlab_test_lists_{0}_txt".format(cc), + "id": "citizenlab_{0}_urls".format(cc), "type": "file/url" }, out_fh)
@defer.inlineCallbacks def create(self, country_code=None): + # XXX This is a hax to avoid race conditions in testing because this + # object is a singleton and config can have a custom home directory + # passed at runtime. + self.path = FilePath(config.inputs_directory) + self.resources = FilePath(config.resources_directory) + self.path.child("descriptors").makedirs(ignoreExistingDirectory=True) self.path.child("data").makedirs(ignoreExistingDirectory=True) yield self.update_url_lists(country_code)
@defer.inlineCallbacks def update(self, country_code=None): - yield self.update_url_lists(country_code) + # XXX why do we make a difference between create and update? + yield self.create(country_code)
- def list(self): - inputs = [] + def _update_cache(self): descs = self.path.child("descriptors") if not descs.exists(): - return inputs + self._cache = {} + return
for fn in descs.listdir(): with descs.child(fn).open("r") as in_fh: - inputs.append(json.load(in_fh)) - return inputs + input_desc = json.load(in_fh) + self._cache[input_desc.pop("id")] = input_desc + self._cache_stale = False + return + + def list(self): + if self._cache_stale: + self._update_cache() + return self._cache + + def get(self, input_id): + if self._cache_stale: + self._update_cache() + try: + input_desc = self._cache[input_id] + except KeyError: + raise InputNotFound(input_id) + return input_desc + + def getContent(self, input_id): + input_desc = self.get(input_id) + with open(input_desc["filepath"]) as fh: + return fh.read()
class DeckStore(object): def __init__(self): @@ -501,10 +564,268 @@ class DeckStore(object): def get(self): pass
-class NGInput(object): - def __init__(self, input_name): - pass +def resolve_file_path(v, prepath=None): + if v.startswith("$"): + # This raises InputNotFound and we let it carry onto the caller + return input_store.get(v[1:])["filepath"] + elif prepath is not None and (not os.path.isabs(v)): + return FilePath(prepath).preauthChild(v).path + return v + +def options_to_args(options, prepath=None): + args = [] + for k, v in options.items(): + if v is None: + continue + if k == "file": + v = resolve_file_path(v, prepath) + args.append('--'+k) + args.append(v) + return args + +class UnknownTaskKey(Exception): + pass + +class MissingTaskDataKey(Exception): + pass + +class DeckTask(object): + _metadata_keys = ["name"] + _supported_tasks = ["ooni"] + + def __init__(self, data, parent_metadata={}, cwd=None): + self.parent_metadata = parent_metadata + self.cwd = cwd + self.data = deepcopy(data) + + self.id = "" + + self.type = None + self.metadata = {} + self.requires_tor = False + self.requires_bouncer = False + + self.ooni = { + 'bouncer_client': None, + 'test_details': {} + } + + self._load(data) + + def _load_ooni(self, task_data): + required_keys = ["test_name"] + for required_key in required_keys: + if required_key not in task_data: + raise MissingTaskDataKey(required_key) + + # This raises e.NetTestNotFound, we let it go onto the caller + nettest_path = nettest_to_path(task_data.pop("test_name")) + + try: + annotations = task_data.pop('annotations') + except KeyError: + annotations = self.parent_metadata.get('annotations', {}) + + try: + collector_address = task_data.pop('collector') + except KeyError: + collector_address = self.parent_metadata.get('collector', None) + + net_test_loader = NetTestLoader( + options_to_args(task_data), + annotations=annotations, + test_file=nettest_path + ) + + if isinstance(collector_address, dict): + net_test_loader.collector = CollectorClient( + settings=collector_address + ) + elif collector_address is not None: + net_test_loader.collector = CollectorClient( + collector_address + ) + + if (net_test_loader.collector is not None and + net_test_loader.collector.backend_type == "onion"): + self.requires_tor = True + + try: + net_test_loader.checkOptions() + if net_test_loader.requiresTor: + self.requires_tor = True + except e.MissingTestHelper: + self.requires_bouncer = True + + self.ooni['net_test_loader'] = net_test_loader + # Need to ensure that this is called only once we have looked up the + # probe IP address and have geoip data. + self.ooni['test_details'] = net_test_loader.getTestDetails() + self.id = generate_filename(self.ooni['test_details']) + + def _load(self, data): + for key in self._metadata_keys: + try: + self.metadata[key] = data.pop(key) + except KeyError: + continue + + task_type, task_data = data.popitem() + if task_type not in self._supported_tasks: + raise UnknownTaskKey(task_type) + self.type = task_type + getattr(self, "_load_"+task_type)(task_data) + + assert len(data) == 0
class NGDeck(object): - def __init__(self, deck_path): - pass + def __init__(self, deck_data=None, + deck_path=None, no_collector=False): + # Used to resolve relative paths inside of decks. + self.deck_directory = None + self.requires_tor = False + self.no_collector = no_collector + self.name = "" + self.description = "" + self.schedule = None + + self.metadata = {} + self.bouncer = None + + self._measurement_path = FilePath(config.measurements_directory) + self._tasks = [] + self.task_ids = [] + + if deck_path is not None: + self.open(deck_path) + elif deck_data is not None: + self.load(deck_data) + + def open(self, deck_path): + with open(deck_path) as fh: + deck_data = yaml.safe_load(fh) + self.load(deck_data) + + def write(self, fh): + """ + Writes a properly formatted deck to the supplied file handle. + :param fh: an open file handle + :return: + """ + deck_data = { + "name": self.name, + "description": self.description, + "tasks": [task.data for task in self._tasks] + } + if self.schedule is not None: + deck_data["schedule"] = self.schedule + for key, value in self.metadata.items(): + deck_data[key] = value + + fh.write("---\n") + yaml.safe_dump(deck_data, fh, default_flow_style=False) + + def load(self, deck_data): + self.name = deck_data.pop("name", "Un-named Deck") + self.description = deck_data.pop("description", "No description") + + bouncer_address = deck_data.pop("bouncer", None) + if bouncer_address is None: + self.bouncer = get_preferred_bouncer() + elif isinstance(bouncer_address, dict): + self.bouncer = BouncerClient(settings=bouncer_address) + else: + self.bouncer = BouncerClient(bouncer_address) + + self.schedule = deck_data.pop("schedule", None) + + tasks_data = deck_data.pop("tasks", []) + for key, metadata in deck_data.items(): + self.metadata[key] = metadata + + for task_data in tasks_data: + deck_task = DeckTask(task_data, self.metadata, self.deck_directory) + if deck_task.requires_tor: + self.requires_tor = True + if (deck_task.requires_bouncer and + self.bouncer.backend_type == "onion"): + self.requires_tor = True + self._tasks.append(deck_task) + self.task_ids.append(deck_task.id) + + @defer.inlineCallbacks + def query_bouncer(self): + preferred_backend = config.advanced.get( + "preferred_backend", "onion" + ) + log.msg("Looking up collector and test helpers with {0}".format( + self.bouncer.base_address) + ) + net_test_loaders = [] + for task in self._tasks: + if task.type == "ooni": + net_test_loaders.append(task.ooni["net_test_loader"]) + + yield lookup_collector_and_test_helpers( + net_test_loaders, + self.bouncer, + preferred_backend, + self.no_collector + ) + + def _measurement_completed(self, result, measurement_id): + log.msg("{0}".format(result)) + measurement_dir = self._measurement_path.child(measurement_id) + measurement_dir.child("measurements.njson.progress").moveTo( + measurement_dir.child("measurements.njson") + ) + generate_summary( + measurement_dir.child("measurements.njson").path, + measurement_dir.child("summary.json").path + ) + measurement_dir.child("running.pid").remove() + + def _measurement_failed(self, failure, measurement_id): + measurement_dir = self._measurement_path.child(measurement_id) + measurement_dir.child("running.pid").remove() + # XXX do we also want to delete measurements.njson.progress? + return failure + + def _run_ooni_task(self, task, director): + net_test_loader = task.ooni["net_test_loader"] + test_details = task.ooni["test_details"] + measurement_id = task.id + + measurement_dir = self._measurement_path.child(measurement_id) + measurement_dir.createDirectory() + + report_filename = measurement_dir.child("measurements.njson.progress").path + pid_file = measurement_dir.child("running.pid") + + with pid_file.open('w') as out_file: + out_file.write("{0}".format(os.getpid())) + + d = director.start_net_test_loader( + net_test_loader, + report_filename, + test_details=test_details + ) + d.addCallback(self._measurement_completed, measurement_id) + d.addErrback(self._measurement_failed, measurement_id) + return d + + @defer.inlineCallbacks + def run(self, director): + tasks = [] + preferred_backend = config.advanced.get("preferred_backend", "onion") + yield self.query_bouncer() + for task in self._tasks: + if task.requires_tor: + yield director.start_tor() + elif task.requires_bouncer and preferred_backend == "onion": + yield director.start_tor() + if task.type == "ooni": + tasks.append(self._run_ooni_task(task, director)) + defer.returnValue(tasks) + +input_store = InputStore() diff --git a/ooni/director.py b/ooni/director.py index d39a11b..793975e 100644 --- a/ooni/director.py +++ b/ooni/director.py @@ -1,19 +1,24 @@ import pwd import os
+from twisted.internet import defer +from twisted.python.failure import Failure + from ooni.managers import ReportEntryManager, MeasurementManager from ooni.reporter import Report from ooni.utils import log, generate_filename from ooni.utils.net import randomFreePort from ooni.nettest import NetTest, getNetTestInformation from ooni.settings import config -from ooni import errors from ooni.nettest import normalizeTestName from ooni.deck import InputStore
from ooni.utils.onion import start_tor, connect_to_control_port
-from twisted.internet import defer +class DirectorEvent(object): + def __init__(self, type="update", message=""): + self.type = type + self.message = message
class Director(object): @@ -86,8 +91,6 @@ class Director(object):
self.failures = []
- self.torControlProtocol = None - # This deferred is fired once all the measurements and their reporting # tasks are completed. self.allTestsDone = defer.Deferred() @@ -95,6 +98,58 @@ class Director(object):
self.input_store = InputStore()
+ self._reset_director_state() + self._reset_tor_state() + + self._subscribers = [] + + def subscribe(self, handler): + self._subscribers.append(handler) + + def unsubscribe(self, handler): + self._subscribers.remove(handler) + + def notify(self, event): + for handler in self._subscribers: + handler(event) + + def _reset_director_state(self): + self._director_state = 'not-running' + self._director_starting = defer.Deferred() + self._director_starting.addErrback(self._director_startup_failure) + self._director_starting.addCallback(self._director_startup_success) + + def _director_startup_failure(self, failure): + self._reset_director_state() + self.notify(DirectorEvent("error", + "Failed to start the director")) + return failure + + def _director_startup_success(self, result): + self._director_state = 'running' + self.notify(DirectorEvent("success", + "Successfully started the director")) + return result + + def _reset_tor_state(self): + # This can be either 'not-running', 'starting' or 'running' + self._tor_state = 'not-running' + self._tor_starting = defer.Deferred() + self._tor_starting.addErrback(self._tor_startup_failure) + self._tor_starting.addCallback(self._tor_startup_success) + + def _tor_startup_failure(self, failure): + self._reset_tor_state() + self.notify(DirectorEvent("error", + "Failed to start Tor")) + return failure + + def _tor_startup_success(self, result): + self._tor_state = 'running' + self.notify(DirectorEvent("success", + "Successfully started Tor")) + return result + def getNetTests(self): nettests = {}
@@ -126,16 +181,11 @@ class Director(object): return nettests
@defer.inlineCallbacks - def start(self, start_tor=False, check_incoherences=True): + def _start(self, start_tor, check_incoherences): self.netTests = self.getNetTests()
if start_tor: - if check_incoherences: - yield config.check_tor() - if config.advanced.start_tor and config.tor_state is None: - yield self.startTor() - elif config.tor.control_port and config.tor_state is None: - yield connect_to_control_port() + yield self.start_tor(check_incoherences)
if config.global_options.get('no-geoip'): aux = [False] @@ -146,8 +196,21 @@ class Director(object): log.msg("You should add annotations for the country, city and ASN") else: yield config.probe_ip.lookup() + self.notify(DirectorEvent("success", + "Looked up Probe IP"))
yield self.input_store.create(config.probe_ip.geodata["countrycode"]) + self.notify(DirectorEvent("success", + "Created input store")) + + @defer.inlineCallbacks + def start(self, start_tor=False, check_incoherences=True): + self._director_state = 'starting' + try: + yield self._start(start_tor, check_incoherences) + self._director_starting.callback(self._director_state) + except Exception as exc: + self._director_starting.errback(Failure(exc))
@property def measurementSuccessRatio(self): @@ -217,26 +280,17 @@ class Director(object): measurement.result = failure return measurement
- def reporterFailed(self, failure, net_test): - """ - This gets called every time a reporter is failing and has been removed - from the reporters of a NetTest. - Once a report has failed to be created that net_test will never use the - reporter again. - - XXX hook some logic here. - note: failure contains an extra attribute called failure.reporter - """ - pass - def netTestDone(self, net_test): + self.notify(DirectorEvent("success", + "Successfully ran net_test")) self.activeNetTests.remove(net_test) if len(self.activeNetTests) == 0: self.allTestsDone.callback(None)
@defer.inlineCallbacks - def startNetTest(self, net_test_loader, report_filename, - collector_client=None, no_yamloo=False): + def start_net_test_loader(self, net_test_loader, report_filename, + collector_client=None, no_yamloo=False, + test_details=None): """ Create the Report for the NetTest and start the report NetTest.
@@ -244,14 +298,15 @@ class Director(object): net_test_loader: an instance of :class:ooni.nettest.NetTestLoader """ - test_details = net_test_loader.getTestDetails() + if test_details is None: + test_details = net_test_loader.getTestDetails() test_cases = net_test_loader.getTestCases()
if self.allTestsDone.called: self.allTestsDone = defer.Deferred()
if config.privacy.includepcap or config.global_options.get('pcapfile', None): - self.startSniffing(test_details) + self.start_sniffing(test_details) report = Report(test_details, report_filename, self.reportEntryManager, collector_client, @@ -271,7 +326,7 @@ class Director(object): finally: self.netTestDone(net_test)
- def startSniffing(self, test_details): + def start_sniffing(self, test_details): """ Start sniffing with Scapy. Exits if required privileges (root) are not available. """ @@ -303,57 +358,83 @@ class Director(object): log.msg("Starting packet capture to: %s" % filename_pcap)
- def startTor(self): + @defer.inlineCallbacks + def start_tor(self, check_incoherences=False): """ Starts Tor Launches a Tor with :param: socks_port :param: control_port :param: tor_binary set in ooniprobe.conf """ - log.msg("Starting Tor...") - from txtorcon import TorConfig + if self._tor_state == 'running': + log.debug("Tor is already running") + defer.returnValue(self._tor_state) + elif self._tor_state == 'starting': + yield self._tor_starting + defer.returnValue(self._tor_state)
- tor_config = TorConfig() - if config.tor.control_port is None: - config.tor.control_port = int(randomFreePort()) - if config.tor.socks_port is None: - config.tor.socks_port = int(randomFreePort()) - - tor_config.ControlPort = config.tor.control_port - tor_config.SocksPort = config.tor.socks_port - - if config.tor.data_dir: - data_dir = os.path.expanduser(config.tor.data_dir) - - if not os.path.exists(data_dir): - log.msg("%s does not exist. Creating it." % data_dir) - os.makedirs(data_dir) - tor_config.DataDirectory = data_dir - - if config.tor.bridges: - tor_config.UseBridges = 1 - if config.advanced.obfsproxy_binary: - tor_config.ClientTransportPlugin = ( - 'obfs2,obfs3 exec %s managed' % - config.advanced.obfsproxy_binary - ) - bridges = [] - with open(config.tor.bridges) as f: - for bridge in f: - if 'obfs' in bridge: - if config.advanced.obfsproxy_binary: + log.msg("Starting Tor...") + self._tor_state = 'starting' + if check_incoherences: + yield config.check_tor() + + if config.advanced.start_tor and config.tor_state is None: + tor_config = TorConfig() + if config.tor.control_port is None: + config.tor.control_port = int(randomFreePort()) + if config.tor.socks_port is None: + config.tor.socks_port = int(randomFreePort()) + + tor_config.ControlPort = config.tor.control_port + tor_config.SocksPort = config.tor.socks_port + + if config.tor.data_dir: + data_dir = os.path.expanduser(config.tor.data_dir) + + if not os.path.exists(data_dir): + log.msg("%s does not exist. Creating it." % data_dir) + os.makedirs(data_dir) + tor_config.DataDirectory = data_dir + + if config.tor.bridges: + tor_config.UseBridges = 1 + if config.advanced.obfsproxy_binary: + tor_config.ClientTransportPlugin = ( + 'obfs2,obfs3 exec %s managed' % + config.advanced.obfsproxy_binary + ) + bridges = [] + with open(config.tor.bridges) as f: + for bridge in f: + if 'obfs' in bridge: + if config.advanced.obfsproxy_binary: + bridges.append(bridge.strip()) + else: bridges.append(bridge.strip()) - else: - bridges.append(bridge.strip()) - tor_config.Bridge = bridges - - if config.tor.torrc: - for i in config.tor.torrc.keys(): - setattr(tor_config, i, config.tor.torrc[i]) - - if os.geteuid() == 0: - tor_config.User = pwd.getpwuid(os.geteuid()).pw_name - - tor_config.save() - log.debug("Setting control port as %s" % tor_config.ControlPort) - log.debug("Setting SOCKS port as %s" % tor_config.SocksPort) - return start_tor(tor_config) + tor_config.Bridge = bridges + + if config.tor.torrc: + for i in config.tor.torrc.keys(): + setattr(tor_config, i, config.tor.torrc[i]) + + if os.geteuid() == 0: + tor_config.User = pwd.getpwuid(os.geteuid()).pw_name + + tor_config.save() + log.debug("Setting control port as %s" % tor_config.ControlPort) + log.debug("Setting SOCKS port as %s" % tor_config.SocksPort) + try: + yield start_tor(tor_config) + log.err("Calling tor callback") + self._tor_starting.callback(self._tor_state) + log.err("called") + except Exception as exc: + log.err("Failed to start tor") + log.exc(exc) + self._tor_starting.errback(Failure(exc)) + + elif config.tor.control_port and config.tor_state is None: + try: + yield connect_to_control_port() + self._tor_starting.callback(self._tor_state) + except Exception as exc: + self._tor_starting.errback(Failure(exc)) diff --git a/ooni/geoip.py b/ooni/geoip.py index fa6d1ae..28e0e1e 100644 --- a/ooni/geoip.py +++ b/ooni/geoip.py @@ -136,11 +136,11 @@ class UbuntuGeoIP(HTTPGeoIPLookupper): probe_ip = m.group(1) return probe_ip
-class TorProjectGeoIP(HTTPGeoIPLookupper): - url = "https://check.torproject.org/" +class DuckDuckGoGeoIP(HTTPGeoIPLookupper): + url = "https://duckduckgo.com/?q=ip&ia=answer"
def parseResponse(self, response_body): - regexp = "Your IP address appears to be: <strong>((\d+.)+(\d+))" + regexp = "Your IP address is (.*) in " probe_ip = re.search(regexp, response_body).group(1) return probe_ip
@@ -151,7 +151,7 @@ class ProbeIP(object): def __init__(self): self.geoIPServices = { 'ubuntu': UbuntuGeoIP, - 'torproject': TorProjectGeoIP + 'duckduckgo': DuckDuckGoGeoIP } self.geodata = { 'asn': 'AS0', diff --git a/ooni/measurements.py b/ooni/measurements.py deleted file mode 100644 index 976b125..0000000 --- a/ooni/measurements.py +++ /dev/null @@ -1,43 +0,0 @@ -import json - -class GenerateResults(object): - supported_tests = [ - "web_connectivity" - ] - - def __init__(self, input_file): - self.input_file = input_file - - def process_web_connectivity(self, entry): - result = {} - result['anomaly'] = False - if entry['test_keys']['blocking'] is not False: - result['anomaly'] = True - result['url'] = entry['input'] - return result - - def output(self, output_file): - results = {} - with open(self.input_file) as in_file: - for idx, line in enumerate(in_file): - entry = json.loads(line.strip()) - if entry['test_name'] not in self.supported_tests: - raise Exception("Unsupported test") - result = getattr(self, 'process_'+entry['test_name'])(entry) - result['idx'] = idx - results['test_name'] = entry['test_name'] - results['country_code'] = entry['probe_cc'] - results['asn'] = entry['probe_asn'] - results['results'] = results.get('results', []) - results['results'].append(result) - - with open(output_file, "w") as fw: - json.dump(results, fw) - -if __name__ == "__main__": - import sys - if len(sys.argv) != 3: - print("Usage: {0} [input_file] [output_file]".format(sys.argv[0])) - sys.exit(1) - gr = GenerateResults(sys.argv[1]) - gr.output(sys.argv[2]) diff --git a/ooni/nettest.py b/ooni/nettest.py index d01cf7b..2a33a2f 100644 --- a/ooni/nettest.py +++ b/ooni/nettest.py @@ -254,7 +254,8 @@ class NetTestLoader(object): h = sha256() for l in f: h.update(l) - except: + except Exception as exc: + log.exception(exc) raise e.InvalidInputFile(filename) input_file['hash'] = h.hexdigest() self.inputFiles.append(input_file) diff --git a/ooni/results.py b/ooni/results.py new file mode 100644 index 0000000..21fe997 --- /dev/null +++ b/ooni/results.py @@ -0,0 +1,39 @@ +import json + +class Process(): + supported_tests = [ + "web_connectivity" + ] + @staticmethod + def web_connectivity(entry): + result = {} + result['anomaly'] = False + if entry['test_keys']['blocking'] is not False: + result['anomaly'] = True + result['url'] = entry['input'] + return result + +def generate_summary(input_file, output_file): + results = {} + with open(input_file) as in_file: + for idx, line in enumerate(in_file): + entry = json.loads(line.strip()) + result = {} + if entry['test_name'] in Process.supported_tests: + result = getattr(Process, entry['test_name'])(entry) + result['idx'] = idx + results['test_name'] = entry['test_name'] + results['country_code'] = entry['probe_cc'] + results['asn'] = entry['probe_asn'] + results['results'] = results.get('results', []) + results['results'].append(result) + + with open(output_file, "w") as fw: + json.dump(results, fw) + +if __name__ == "__main__": + import sys + if len(sys.argv) != 3: + print("Usage: {0} [input_file] [output_file]".format(sys.argv[0])) + sys.exit(1) + generate_summary(sys.argv[1], sys.argv[2]) diff --git a/ooni/tests/bases.py b/ooni/tests/bases.py index 904aaba..40e3b5e 100644 --- a/ooni/tests/bases.py +++ b/ooni/tests/bases.py @@ -10,7 +10,7 @@ class ConfigTestCase(unittest.TestCase): def setUp(self): self.ooni_home_dir = os.path.abspath("ooni_home") self.config = config - self.config.initialize_ooni_home("ooni_home") + self.config.initialize_ooni_home(self.ooni_home_dir) super(ConfigTestCase, self).setUp()
def skipTest(self, reason): diff --git a/ooni/tests/test_deck.py b/ooni/tests/test_deck.py index ba87ec8..455f4e1 100644 --- a/ooni/tests/test_deck.py +++ b/ooni/tests/test_deck.py @@ -1,11 +1,15 @@ import os
+from copy import deepcopy + from twisted.internet import defer from twisted.trial import unittest
from hashlib import sha256 from ooni import errors -from ooni.deck import InputFile, Deck, nettest_to_path +from ooni.deck import input_store, lookup_collector_and_test_helpers +from ooni.nettest import NetTestLoader +from ooni.deck import InputFile, Deck, nettest_to_path, DeckTask, NGDeck from ooni.tests.bases import ConfigTestCase from ooni.tests.mocks import MockBouncerClient, MockCollectorClient
@@ -182,7 +186,8 @@ class TestDeck(BaseTestCase, ConfigTestCase):
self.assertEqual(len(deck.netTestLoaders[0].missingTestHelpers), 1)
- yield deck.lookupCollectorAndTestHelpers() + yield lookup_collector_and_test_helpers(deck.preferred_backend, + deck.netTestLoaders)
self.assertEqual(deck.netTestLoaders[0].collector.settings['address'], 'httpo://thirteenchars123.onion') @@ -229,7 +234,8 @@ class TestDeck(BaseTestCase, ConfigTestCase):
self.assertEqual(len(deck.netTestLoaders[0].missingTestHelpers), 1)
- yield deck.lookupCollectorAndTestHelpers() + yield lookup_collector_and_test_helpers(deck.preferred_backend, + deck.netTestLoaders)
self.assertEqual( deck.netTestLoaders[0].collector.settings['address'], @@ -258,7 +264,8 @@ class TestDeck(BaseTestCase, ConfigTestCase):
self.assertEqual(len(deck.netTestLoaders[0].missingTestHelpers), 1)
- yield deck.lookupCollectorAndTestHelpers() + yield lookup_collector_and_test_helpers(deck.preferred_backend, + deck.netTestLoaders)
self.assertEqual( deck.netTestLoaders[0].collector.settings['address'], @@ -269,3 +276,45 @@ class TestDeck(BaseTestCase, ConfigTestCase): deck.netTestLoaders[0].localOptions['backend'], '127.0.0.1' ) + +class TestInputStore(ConfigTestCase): + @defer.inlineCallbacks + def test_update_input_store(self): + self.skipTest("antani") + yield input_store.update("ZZ") + print os.listdir(os.path.join( + self.config.resources_directory, "citizenlab-test-lists")) + print os.listdir(os.path.join(self.config.inputs_directory)) + +TASK_DATA = { + "name": "Some Task", + "ooni": { + "test_name": "web_connectivity", + "file": "$citizen_lab_global_urls" + } +} + +DECK_DATA = { + "name": "My deck", + "description": "Something", + "tasks": [ + deepcopy(TASK_DATA) + ] +} +class TestNGDeck(ConfigTestCase): + skip = True + def test_deck_task(self): + if self.skip: + self.skipTest("Skip is set to true") + yield input_store.update("ZZ") + deck_task = DeckTask(TASK_DATA) + self.assertIsInstance(deck_task.ooni["net_test_loader"], + NetTestLoader) + + @defer.inlineCallbacks + def test_deck_load(self): + if self.skip: + self.skipTest("Skip is set to true") + yield input_store.update("ZZ") + deck = NGDeck(deck_data=DECK_DATA) + self.assertEqual(len(deck.tasks), 1) diff --git a/ooni/tests/test_director.py b/ooni/tests/test_director.py index 18377f5..6638adb 100644 --- a/ooni/tests/test_director.py +++ b/ooni/tests/test_director.py @@ -73,7 +73,7 @@ class TestDirector(ConfigTestCase): @defer.inlineCallbacks def director_start_tor(): director = Director() - yield director.startTor() + yield director.start_tor() assert config.tor.socks_port == 4242 assert config.tor.control_port == 4242
@@ -93,7 +93,7 @@ class TestDirector(ConfigTestCase): net_test_loader.loadNetTestString(test_failing_twice) director = Director() director.netTestDone = net_test_done - director.startNetTest(net_test_loader, None, no_yamloo=True) + director.start_net_test_loader(net_test_loader, None, no_yamloo=True) return finished
@@ -113,7 +113,7 @@ class TestStartSniffing(unittest.TestCase): def test_start_sniffing_once(self): with patch('ooni.settings.config.scapyFactory') as mock_scapy_factory: with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer: - self.director.startSniffing(self.testDetails) + self.director.start_sniffing(self.testDetails) sniffer = mock_scapy_sniffer.return_value mock_scapy_factory.registerProtocol.assert_called_once_with(sniffer)
@@ -122,7 +122,7 @@ class TestStartSniffing(unittest.TestCase): with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer: sniffer = mock_scapy_sniffer.return_value sniffer.pcapwriter.filename = 'foo1_filename' - self.director.startSniffing(self.testDetails) + self.director.start_sniffing(self.testDetails) self.assertEqual(len(self.director.sniffers), 1)
self.testDetails = { @@ -132,13 +132,13 @@ class TestStartSniffing(unittest.TestCase): with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer: sniffer = mock_scapy_sniffer.return_value sniffer.pcapwriter.filename = 'foo2_filename' - self.director.startSniffing(self.testDetails) + self.director.start_sniffing(self.testDetails) self.assertEqual(len(self.director.sniffers), 2)
def test_measurement_succeeded(self): with patch('ooni.settings.config.scapyFactory') as mock_scapy_factory: with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer: - self.director.startSniffing(self.testDetails) + self.director.start_sniffing(self.testDetails) self.assertEqual(len(self.director.sniffers), 1) measurement = MagicMock() measurement.testInstance = self.FooTestCase() diff --git a/ooni/tests/test_nettest.py b/ooni/tests/test_nettest.py index 239080a..1592149 100644 --- a/ooni/tests/test_nettest.py +++ b/ooni/tests/test_nettest.py @@ -355,7 +355,7 @@ class TestNetTest(ConfigTestCase): director = Director()
self.filename = 'dummy_report.yamloo' - d = director.startNetTest(ntl, self.filename) + d = director.start_net_test_loader(ntl, self.filename)
@d.addCallback def complete(result): @@ -382,7 +382,7 @@ class TestNetTest(ConfigTestCase):
director = Director() self.filename = 'dummy_report.yamloo' - d = director.startNetTest(ntl, self.filename) + d = director.start_net_test_loader(ntl, self.filename)
@d.addCallback def complete(result): @@ -410,7 +410,7 @@ class TestNetTest(ConfigTestCase):
director = Director() self.filename = 'dummy_report.yamloo' - d = director.startNetTest(ntl, self.filename) + d = director.start_net_test_loader(ntl, self.filename)
@d.addCallback def complete(result): @@ -469,7 +469,7 @@ class TestNettestTimeout(ConfigTestCase): director = Director()
self.filename = 'dummy_report.yamloo' - d = director.startNetTest(ntl, self.filename) + d = director.start_net_test_loader(ntl, self.filename)
@d.addCallback def complete(result): diff --git a/ooni/tests/test_oonideckgen.py b/ooni/tests/test_oonideckgen.py index 4a52377..11a852f 100644 --- a/ooni/tests/test_oonideckgen.py +++ b/ooni/tests/test_oonideckgen.py @@ -1,10 +1,11 @@ import os -import yaml import tempfile
+import yaml + +from ooni.scripts import oonideckgen from .bases import ConfigTestCase
-from ooni.deckgen import cli
class TestOONIDeckgen(ConfigTestCase): def setUp(self): @@ -25,7 +26,7 @@ class TestOONIDeckgen(ConfigTestCase):
def test_generate_deck(self): temp_dir = tempfile.mkdtemp() - cli.generate_deck({ + oonideckgen.generate_deck({ "country-code": "it", "output": temp_dir, "collector": None, diff --git a/ooni/tests/test_oonireport.py b/ooni/tests/test_oonireport.py index 2275672..d71a403 100644 --- a/ooni/tests/test_oonireport.py +++ b/ooni/tests/test_oonireport.py @@ -5,8 +5,6 @@ from mock import patch, MagicMock from twisted.internet import defer from ooni.tests.bases import ConfigTestCase
-from ooni.report import tool - mock_tor_check = MagicMock(return_value=True)
class TestOONIReport(ConfigTestCase): @@ -51,9 +49,9 @@ class TestOONIReport(ConfigTestCase): cli.run(["upload"]) self.assertTrue(mock_tool.upload_all.called)
- @patch('ooni.report.tool.CollectorClient') - @patch('ooni.report.tool.OONIBReportLog') - @patch('ooni.report.tool.OONIBReporter') + @patch('ooni.report.cli.CollectorClient') + @patch('ooni.report.cli.OONIBReportLog') + @patch('ooni.report.cli.OONIBReporter') def test_tool_upload(self, mock_oonib_reporter, mock_oonib_report_log, mock_collector_client):
@@ -70,7 +68,7 @@ class TestOONIReport(ConfigTestCase): self._create_reporting_yaml(report_name) self._write_dummy_report(report_name)
- d = tool.upload(report_name) + d = cli.upload(report_name) @d.addCallback def cb(result): mock_oonib_reporter_i.writeReportEntry.assert_called_with( @@ -78,9 +76,9 @@ class TestOONIReport(ConfigTestCase): ) return d
- @patch('ooni.report.tool.CollectorClient') - @patch('ooni.report.tool.OONIBReportLog') - @patch('ooni.report.tool.OONIBReporter') + @patch('ooni.report.cli.CollectorClient') + @patch('ooni.report.cli.OONIBReportLog') + @patch('ooni.report.cli.OONIBReporter') def test_tool_upload_all(self, mock_oonib_reporter, mock_oonib_report_log, mock_collector_client):
@@ -98,7 +96,7 @@ class TestOONIReport(ConfigTestCase): self._create_reporting_yaml(report_name) self._write_dummy_report(report_name)
- d = tool.upload_all() + d = cli.upload_all() @d.addCallback def cb(result): mock_oonib_reporter_i.writeReportEntry.assert_called_with( diff --git a/ooni/ui/cli.py b/ooni/ui/cli.py index 7a0036e..fe24bf6 100644 --- a/ooni/ui/cli.py +++ b/ooni/ui/cli.py @@ -334,10 +334,10 @@ def runTestWithDirector(director, global_options, url=None, start_tor=True): collector_client = setupCollector(global_options, net_test_loader.collector)
- yield director.startNetTest(net_test_loader, - global_options['reportfile'], - collector_client, - global_options['no-yamloo']) + yield director.start_net_test_loader(net_test_loader, + global_options['reportfile'], + collector_client, + global_options['no-yamloo'])
d.addCallback(setup_nettest) d.addCallback(post_director_start) diff --git a/ooni/ui/web/client/index.html b/ooni/ui/web/client/index.html index cc45067..e306ef2 100644 --- a/ooni/ui/web/client/index.html +++ b/ooni/ui/web/client/index.html @@ -13,5 +13,5 @@ <app> Loading... </app> - <script type="text/javascript" src="app.bundle.js?27ae67e2c74ae4ae9a82"></script></body> + <script type="text/javascript" src="app.bundle.js?7ed7d7510803fa1a4ad8"></script></body> </html> diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py index e63c08f..a862fe7 100644 --- a/ooni/ui/web/server.py +++ b/ooni/ui/web/server.py @@ -13,44 +13,58 @@ from werkzeug.exceptions import NotFound
from ooni import __version__ as ooniprobe_version from ooni import errors -from ooni.deck import Deck +from ooni.deck import NGDeck from ooni.settings import config -from ooni.nettest import NetTestLoader -from ooni.measurements import GenerateResults -from ooni.utils import generate_filename +from ooni.utils import log +from ooni.director import DirectorEvent + +config.advanced.debug = True
def rpath(*path): context = os.path.abspath(os.path.dirname(__file__)) return os.path.join(context, *path)
-def getNetTestLoader(test_options, test_file): - """ - Args: - test_options: (dict) containing as keys the option names. - - test_file: (string) the path to the test_file to be run. - Returns: - an instance of :class:`ooni.nettest.NetTestLoader` with the specified - test_file and the specified options. - """ - options = [] - for k, v in test_options.items(): - if v is None: - print("Skipping %s because none" % k) - continue - options.append('--'+k) - options.append(v) - - net_test_loader = NetTestLoader(options, - test_file=test_file) - return net_test_loader - - class WebUIError(Exception): def __init__(self, code, message): self.code = code self.message = message
+class LongPoller(object): + def __init__(self, timeout, _reactor=reactor): + self.lock = defer.DeferredLock() + + self.deferred_subscribers = [] + self._reactor = _reactor + self._timeout = timeout + + self.timer = task.LoopingCall( + self.notify, + DirectorEvent("null", "No updates"), + ) + self.timer.clock = self._reactor + + def start(self): + self.timer.start(self._timeout) + + def stop(self): + self.timer.stop() + + def _notify(self, lock, event): + for d in self.deferred_subscribers[:]: + assert not d.called, "Deferred is already called" + d.callback(event) + self.deferred_subscribers.remove(d) + self.timer.reset() + lock.release() + + def notify(self, event=None): + self.lock.acquire().addCallback(self._notify, event) + + def get(self): + d = defer.Deferred() + self.deferred_subscribers.append(d) + return d + class WebUIAPI(object): app = Klein() # Maximum number in seconds after which to return a result even if not @@ -58,7 +72,7 @@ class WebUIAPI(object): _long_polling_timeout = 5 _reactor = reactor
- def __init__(self, config, director): + def __init__(self, config, director, _reactor=reactor): self.director = director self.config = config self.measurement_path = FilePath(config.measurements_directory) @@ -74,12 +88,26 @@ class WebUIAPI(object): "director_started": False, "failures": [] } - self.status_updates = [] - d = self.director.start(start_tor=True) + + self.status_poller = LongPoller( + self._long_polling_timeout, _reactor) + self.director_event_poller = LongPoller( + self._long_polling_timeout, _reactor) + + # XXX move this elsewhere + self.director_event_poller.start() + self.status_poller.start() + + self.director.subscribe(self.handle_director_event) + d = self.director.start()
d.addCallback(self.director_started) d.addErrback(self.director_startup_failed) - d.addBoth(lambda _: self.broadcast_status_update()) + d.addBoth(lambda _: self.status_poller.notify()) + + def handle_director_event(self, event): + log.msg("Handling event {0}".format(event.type)) + self.director_event_poller.notify(event)
def add_failure(self, failure): self.status['failures'].append(str(failure)) @@ -92,26 +120,12 @@ class WebUIAPI(object): def director_startup_failed(self, failure): self.add_failure(failure)
- def broadcast_status_update(self): - for su in self.status_updates: - if not su.called: - su.callback(None) - def completed_measurement(self, measurement_id): del self.status['active_measurements'][measurement_id] self.status['completed_measurements'].append(measurement_id) - measurement_dir = self.measurement_path.child(measurement_id) - - measurement = measurement_dir.child('measurements.njson.progress') - - # Generate the summary.json file - summary = measurement_dir.child('summary.json') - gr = GenerateResults(measurement.path) - gr.output(summary.path) - - measurement.moveTo(measurement_dir.child('measurements.njson'))
def failed_measurement(self, measurement_id, failure): + log.exception(failure) del self.status['active_measurements'][measurement_id] self.add_failure(str(failure))
@@ -119,8 +133,9 @@ class WebUIAPI(object): def not_found(self, request, _): request.redirect('/client/')
- @app.handle_error(WebUIError) - def web_ui_error(self, request, error): + @app.handle_errors(WebUIError) + def web_ui_error(self, request, failure): + error = failure.value request.setResponseCode(error.code) return self.render_json({ "error_code": error.code, @@ -133,24 +148,28 @@ class WebUIAPI(object): request.setHeader('Content-Length', len(json_string)) return json_string
+ @app.route('/api/notify', methods=["GET"]) + def api_notify(self, request): + def got_director_event(event): + return self.render_json({ + "type": event.type, + "message": event.message + }, request) + d = self.director_event_poller.get() + d.addCallback(got_director_event) + return d + @app.route('/api/status', methods=["GET"]) def api_status(self, request): return self.render_json(self.status, request)
@app.route('/api/status/update', methods=["GET"]) def api_status_update(self, request): - status_update = defer.Deferred() - status_update.addCallback(lambda _: - self.status_updates.remove(status_update)) - status_update.addCallback(lambda _: self.api_status(request)) - - self.status_updates.append(status_update) - - # After long_polling_timeout we fire the callback - task.deferLater(self._reactor, self._long_polling_timeout, - status_update.callback, None) - - return status_update + def got_status_update(event): + return self.api_status(request) + d = self.status_poller.get() + d.addCallback(got_status_update) + return d
@app.route('/api/deck/generate', methods=["GET"]) def api_deck_generate(self, request): @@ -167,37 +186,23 @@ class WebUIAPI(object):
return self.render_json({"command": "deck-list"}, request)
- @defer.inlineCallbacks def run_deck(self, deck): - yield deck.setup() - measurement_ids = [] - for net_test_loader in deck.netTestLoaders: - # XXX synchronize this with startNetTest - test_details = net_test_loader.getTestDetails() - measurement_id = generate_filename(test_details) - - measurement_dir = self.measurement_path.child(measurement_id) - measurement_dir.createDirectory() - - report_filename = measurement_dir.child( - "measurements.njson.progress").path - - measurement_ids.append(measurement_id) - self.status['active_measurements'][measurement_id] = { - 'test_name': test_details['test_name'], - 'test_start_time': test_details['test_start_time'] + for task_id in deck.task_ids: + self.status['active_measurements'][task_id] = { + 'test_name': 'foobar', + 'test_start_time': 'some start time' } - self.broadcast_status_update() - d = self.director.startNetTest(net_test_loader, report_filename) - d.addCallback(lambda _: - self.completed_measurement(measurement_id)) - d.addErrback(lambda failure: - self.failed_measurement(measurement_id, failure)) + self.status_poller.notify() + d = deck.run(self.director) + d.addCallback(lambda _: + self.completed_measurement(task_id)) + d.addErrback(lambda failure: + self.failed_measurement(task_id, failure))
@app.route('/api/nettest/string:test_name/start', methods=["POST"]) def api_nettest_start(self, request, test_name): try: - net_test = self.director.netTests[test_name] + _ = self.director.netTests[test_name] except KeyError: raise WebUIError(500, 'Could not find the specified test')
@@ -206,10 +211,15 @@ class WebUIAPI(object): except ValueError: raise WebUIError(500, 'Invalid JSON message recevied')
- deck = Deck(no_collector=True) # XXX remove no_collector - net_test_loader = getNetTestLoader(test_options, net_test['path']) + test_options["test_name"] = test_name + deck_data = { + "tasks": [ + {"ooni": test_options} + ] + } try: - deck.insert(net_test_loader) + deck = NGDeck(no_collector=True) + deck.load(deck_data) self.run_deck(deck)
except errors.MissingRequiredOption, option_name: @@ -237,6 +247,19 @@ class WebUIAPI(object): def api_input_list(self, request): return self.render_json(self.director.input_store.list(), request)
+ @app.route('/api/input/string:input_id/content', methods=["GET"]) + def api_input_content(self, request, input_id): + content = self.director.input_store.getContent(input_id) + request.setHeader('Content-Type', 'text/plain') + request.setHeader('Content-Length', len(content)) + return content + + @app.route('/api/input/string:input_id', methods=["GET"]) + def api_input_details(self, request, input_id): + return self.render_json( + self.director.input_store.get(input_id), request + ) + @app.route('/api/measurement', methods=["GET"]) def api_measurement_list(self, request): measurements = [] @@ -299,7 +322,3 @@ class WebUIAPI(object): def static(self, request): path = rpath("client") return static.File(path) -<<<<<<< acda284b56fa3a75acbe7d000fbdefb643839948 - -======= ->>>>>>> [Web UI] Refactoring of web UI diff --git a/ooni/ui/web/web.py b/ooni/ui/web/web.py index 40ee3b4..eca75cb 100644 --- a/ooni/ui/web/web.py +++ b/ooni/ui/web/web.py @@ -1,53 +1,27 @@ -import os - -from twisted.scripts import twistd -from twisted.python import usage -from twisted.internet import reactor from twisted.web import server +from twisted.internet import reactor from twisted.application import service
+from ooni.ui.web.server import WebUIAPI from ooni.settings import config -from ooni.director import Director -from ooni.utils import log - -from .server import WebUIAPI
class WebUIService(service.MultiService): - portNum = 8822 + def __init__(self, director, port_number=8842): + service.MultiService.__init__(self) + + self.director = director + self.port_number = port_number + def startService(self): service.MultiService.startService(self) - config.set_paths() - config.initialize_ooni_home() - config.read_config_file() - director = Director() - web_ui_api = WebUIAPI(config, director) - root = server.Site(web_ui_api.app.resource()) - self._port = reactor.listenTCP(self.portNum, root) - d = director.start() + + web_ui_api = WebUIAPI(config, self.director) + self._port = reactor.listenTCP( + self.port_number, + server.Site(web_ui_api.app.resource()) + )
def stopService(self): + service.MultiService.stopService(self) if self._port: self._port.stopListening() - -class StartOoniprobeWebUIPlugin: - tapname = "ooniprobe" - def makeService(self, so): - return WebUIService() - -class OoniprobeTwistdConfig(twistd.ServerOptions): - subCommands = [("StartOoniprobeWebUI", None, usage.Options, "ooniprobe web ui")] - -def start(): - twistd_args = ["--nodaemon"] - twistd_config = OoniprobeTwistdConfig() - twistd_args.append("StartOoniprobeWebUI") - try: - twistd_config.parseOptions(twistd_args) - except usage.error, ue: - print("ooniprobe: usage error from twistd: {}\n".format(ue)) - twistd_config.loadedPlugins = {"StartOoniprobeWebUI": StartOoniprobeWebUIPlugin()} - twistd.runApp(twistd_config) - return 0 - -if __name__ == "__main__": - start() diff --git a/ooni/utils/onion.py b/ooni/utils/onion.py index cc2f2ff..e18a6ee 100644 --- a/ooni/utils/onion.py +++ b/ooni/utils/onion.py @@ -237,20 +237,6 @@ class TorLauncherWithRetries(object): continue setattr(new_tor_config, key, getattr(self.tor_config, key)) self.tor_config = new_tor_config - self.timeout = timeout - - def _reset_tor_config(self): - """ - This is used to reset the Tor configuration to before launch_tor - modified it. This is in particular used to force the regeneration of the - DataDirectory. - """ - new_tor_config = TorConfig() - for key in self.tor_config: - if config.tor.data_dir is None and key == "DataDirectory": - continue - setattr(new_tor_config, key, getattr(self.tor_config, key)) - self.tor_config = new_tor_config
def _progress_updates(self, prog, tag, summary): log.msg("%d%%: %s" % (prog, summary))
tor-commits@lists.torproject.org