[tor-commits] [ooni-probe/master] Create a set of scheduled tasks to be run by the agent in background

art at torproject.org art at torproject.org
Mon Sep 19 12:14:24 UTC 2016


commit 546d1e3b2d9b1cbf0bebe2b032acc0d8b87e13e4
Author: Arturo Filastò <arturo at filasto.net>
Date:   Tue Jul 26 21:18:04 2016 +0200

    Create a set of scheduled tasks to be run by the agent in background
    
    * Remove unneeded filename hash and sprinkle notes on it's future deprecation
    * Fix some bugs in the resources update procedure
---
 ooni/agent/scheduler.py                            | 144 ++++++++++++++++++++-
 ooni/contrib/__init__.py                           |   2 +-
 ooni/contrib/croniter.py                           |  11 ++
 ooni/deck.py                                       |  38 +++---
 ooni/director.py                                   |  35 +++--
 ooni/geoip.py                                      |  12 +-
 ooni/nettest.py                                    |  25 +---
 .../manipulation/http_invalid_request_line.py      |  14 +-
 ooni/resources.py                                  |  71 ++++++----
 ooni/settings.py                                   |   2 +
 ooni/ui/cli.py                                     |   3 +-
 ooni/ui/web/server.py                              |  34 +----
 12 files changed, 271 insertions(+), 120 deletions(-)

diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py
index 1004597..ace3bc5 100644
--- a/ooni/agent/scheduler.py
+++ b/ooni/agent/scheduler.py
@@ -1,5 +1,126 @@
+from datetime import datetime
+
 from twisted.application import service
-from twisted.internet import task
+from twisted.internet import task, defer
+from twisted.python.filepath import FilePath
+
+from ooni import resources
+from ooni.utils import log
+from ooni.deck import input_store
+from ooni.settings import config
+from ooni.contrib import croniter
+
+class ScheduledTask(object):
+    _time_format = "%Y-%m-%dT%H:%M:%SZ"
+    schedule = None
+    identifier = None
+
+    def __init__(self, schedule=None):
+        if schedule is not None:
+            self.schedule = schedule
+
+        assert self.identifier is not None, "self.identifier must be set"
+        assert self.schedule is not None, "self.schedule must be set"
+        scheduler_directory = config.scheduler_directory
+
+        self._last_run = FilePath(scheduler_directory).child(self.identifier)
+        self._last_run_lock = defer.DeferredFilesystemLock(
+            FilePath(scheduler_directory).child(self.identifier + ".lock").path
+        )
+
+    @property
+    def should_run(self):
+        current_time = datetime.utcnow()
+        next_cycle = croniter(self.schedule, self.last_run).get_next(datetime)
+        if next_cycle <= current_time:
+            return True
+        return False
+
+    @property
+    def last_run(self):
+        if not self._last_run.exists():
+            return datetime.fromtimestamp(0)
+        with self._last_run.open('r') as in_file:
+            date_str = in_file.read()
+        return datetime.strptime(date_str, self._time_format)
+
+    def _update_last_run(self):
+        with self._last_run.open('w') as out_file:
+            current_time = datetime.utcnow()
+            out_file.write(current_time.strftime(self._time_format))
+
+    def task(self):
+        raise NotImplemented
+
+    @defer.inlineCallbacks
+    def run(self):
+        yield self._last_run_lock.deferUntilLocked()
+        if not self.should_run:
+            self._last_run_lock.unlock()
+            defer.returnValue(None)
+        try:
+            yield self.task()
+            self._update_last_run()
+        except:
+            raise
+        finally:
+            self._last_run_lock.unlock()
+
+class UpdateInputsAndResources(ScheduledTask):
+    identifier = "ooni-update-inputs"
+    schedule = "@daily"
+
+    @defer.inlineCallbacks
+    def task(self):
+        log.debug("Updating the inputs")
+        yield resources.check_for_update(config.probe_ip.geodata['countrycode'])
+        yield input_store.update(config.probe_ip.geodata['countrycode'])
+
+class UpdateProbeIp(ScheduledTask):
+    identifier = "ooni-update-probe-ip"
+    schedule = "@hourly"
+    # XXX we need to ensure this is always run the first time ooniprobe or
+    # ooniprobe-agent is started or implement on disk caching of the users
+    # IP address.
+
+    def task(self):
+        log.debug("Updating the probe IP")
+        return config.probe_ip.lookup()
+
+class CleanupInProgressReports(ScheduledTask):
+    identifier = 'ooni-cleanup-reports'
+    schedule = '@daily'
+
+class UploadMissingReports(ScheduledTask):
+    identifier = 'ooni-cleanup-reports'
+    schedule = '@weekly'
+
+# Order mattters
+SYSTEM_TASKS = [
+    UpdateProbeIp,
+    UpdateInputsAndResources
+]
+
+ at defer.inlineCallbacks
+def run_system_tasks(no_geoip=False, no_input_store=False):
+    task_classes = SYSTEM_TASKS[:]
+
+    if no_geoip:
+        log.debug("Not updating probe IP")
+        task_classes.pop(UpdateProbeIp)
+
+    if no_input_store:
+        log.debug("Not updating the inputs")
+        task_classes.pop(UpdateInputsAndResources)
+
+    for task_class in task_classes:
+        task = task_class()
+        log.debug("Running task {0}".format(task.identifier))
+        try:
+            yield task.run()
+        except Exception as exc:
+            log.err("Failed to run task {0}".format(task.identifier))
+            log.exception(exc)
 
 class SchedulerService(service.MultiService):
     """
@@ -10,16 +131,35 @@ class SchedulerService(service.MultiService):
         self.director = director
         self.interval = interval
         self._looping_call = task.LoopingCall(self._should_run)
+        self._scheduled_tasks = []
+
+    def schedule(self, task):
+        self._scheduled_tasks.append(task)
+
+    def _task_failed(self, failure, task):
+        log.msg("Failed to run {0}".format(task.identifier))
+        log.exception(failure)
+
+    def _task_success(self, result, task):
+        log.msg("Ran {0}".format(task.identifier))
 
     def _should_run(self):
         """
         This function is called every self.interval seconds to check
         which periodic tasks should be run.
         """
-        pass
+        for task in self._scheduled_tasks:
+            log.debug("Running task {0}".format(task.identifier))
+            d = task.run()
+            d.addErrback(self._task_failed, task)
+            d.addCallback(self._task_success, task)
 
     def startService(self):
         service.MultiService.startService(self)
+
+        self.schedule(UpdateProbeIp())
+        self.schedule(UpdateInputsAndResources())
+
         self._looping_call.start(self.interval)
 
     def stopService(self):
diff --git a/ooni/contrib/__init__.py b/ooni/contrib/__init__.py
index 50b6b54..28aad30 100644
--- a/ooni/contrib/__init__.py
+++ b/ooni/contrib/__init__.py
@@ -1 +1 @@
-from ._crontab import CronTab
+from .croniter import croniter
diff --git a/ooni/contrib/croniter.py b/ooni/contrib/croniter.py
index 5864603..653dbbf 100644
--- a/ooni/contrib/croniter.py
+++ b/ooni/contrib/croniter.py
@@ -49,6 +49,15 @@ class croniter(object):
         {},
     )
 
+    ALIASES = {
+        '@yearly':  '0 0 1 1 *',
+        '@annually':  '0 0 1 1 *',
+        '@monthly': '0 0 1 * *',
+        '@weekly':  '0 0 * * 0',
+        '@daily':   '0 0 * * *',
+        '@hourly':  '0 * * * *',
+    }
+
     bad_length = 'Exactly 5 or 6 columns has to be specified for iterator' \
                  'expression.'
 
@@ -63,6 +72,8 @@ class croniter(object):
             start_time = self._datetime_to_timestamp(start_time)
 
         self.cur = start_time
+        if expr_format in self.ALIASES:
+            expr_format = self.ALIASES[expr_format]
         self.exprs = expr_format.split()
 
         if len(self.exprs) != 5 and len(self.exprs) != 6:
diff --git a/ooni/deck.py b/ooni/deck.py
index 4794d30..0434d81 100644
--- a/ooni/deck.py
+++ b/ooni/deck.py
@@ -4,7 +4,6 @@ import csv
 import json
 
 from copy import deepcopy
-from hashlib import sha256
 
 import yaml
 
@@ -236,7 +235,8 @@ def lookup_collector_and_test_helpers(net_test_loaders,
             'name': net_test_loader.testName,
             'version': net_test_loader.testVersion,
             'test-helpers': [],
-            'input-hashes': [x['hash'] for x in net_test_loader.inputFiles]
+            # XXX deprecate this very soon
+            'input-hashes': []
         }
         if not net_test_loader.collector and not no_collector:
             requires_collector = True
@@ -262,15 +262,16 @@ def lookup_collector_and_test_helpers(net_test_loaders,
         log.err("Could not find any reachable test helpers")
         raise
 
-    def find_collector_and_test_helpers(test_name, test_version, input_files):
-        input_files = [u""+x['hash'] for x in input_files]
+    def find_collector_and_test_helpers(test_name, test_version):
+        # input_files = [u""+x['hash'] for x in input_files]
         for net_test in provided_net_tests:
             if net_test['name'] != test_name:
                 continue
             if net_test['version'] != test_version:
                 continue
-            if set(net_test['input-hashes']) != set(input_files):
-                continue
+            # XXX remove the notion of policies based on input file hashes
+            # if set(net_test['input-hashes']) != set(input_files):
+            #    continue
             return net_test['collector'], net_test['test-helpers']
 
     for net_test_loader in net_test_loaders:
@@ -280,8 +281,8 @@ def lookup_collector_and_test_helpers(net_test_loaders,
         collector, test_helpers = \
             find_collector_and_test_helpers(
                 test_name=net_test_loader.testName,
-                test_version=net_test_loader.testVersion,
-                input_files=net_test_loader.inputFiles
+                test_version=net_test_loader.testVersion
+                # input_files=net_test_loader.inputFiles
             )
 
         for option, name in net_test_loader.missingTestHelpers:
@@ -455,6 +456,7 @@ class InputStore(object):
                     "id": "citizenlab_{0}_urls".format(cc),
                     "type": "file/url"
                 }, out_fh)
+        self._cache_stale = True
 
     @defer.inlineCallbacks
     def create(self, country_code=None):
@@ -523,13 +525,11 @@ def resolve_file_path(v, prepath=None):
         return FilePath(prepath).preauthChild(v).path
     return v
 
-def options_to_args(options, prepath=None):
+def options_to_args(options):
     args = []
     for k, v in options.items():
         if v is None:
             continue
-        if k == "file":
-            v = resolve_file_path(v, prepath)
         if v == False or v == 0:
             continue
         if (len(k)) == 1:
@@ -625,7 +625,7 @@ class DeckTask(object):
             collector_address = None
 
         net_test_loader = NetTestLoader(
-            options_to_args(task_data, self.cwd),
+            options_to_args(task_data),
             annotations=annotations,
             test_file=nettest_path
         )
@@ -653,6 +653,9 @@ class DeckTask(object):
         self.ooni['net_test_loader'] = net_test_loader
 
     def _setup_ooni(self):
+        for input_file in self.ooni['net_test_loader'].inputFiles:
+            file_path = resolve_file_path(input_file['filename'], self.cwd)
+            input_file['test_options'][input_file['key']] = file_path
         self.ooni['test_details'] = self.ooni['net_test_loader'].getTestDetails()
         self.id = generate_filename(self.ooni['test_details'])
 
@@ -670,15 +673,8 @@ class DeckTask(object):
         if task_type not in self._supported_tasks:
             raise UnknownTaskKey(task_type)
         self.type = task_type
-        try:
-            getattr(self, "_load_"+task_type)(task_data)
-        except InputNotFound:
-            log.debug(
-                "Will skip running this test because I can't find the input"
-            )
-            self._skip = True
-
-        assert len(data) == 0
+        getattr(self, "_load_"+task_type)(task_data)
+        assert len(data) == 0, "Got an unidentified key"
 
 class NotAnOption(Exception):
     pass
diff --git a/ooni/director.py b/ooni/director.py
index 84bc9aa..1ab076b 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -13,6 +13,7 @@ from ooni.settings import config
 from ooni.nettest import normalizeTestName
 from ooni.deck import InputStore
 
+from ooni.agent.scheduler import run_system_tasks
 from ooni.utils.onion import start_tor, connect_to_control_port
 
 class DirectorEvent(object):
@@ -139,12 +140,15 @@ class Director(object):
         self._tor_starting.addCallback(self._tor_startup_success)
 
     def _tor_startup_failure(self, failure):
+        log.msg("Failed to start tor")
+        log.exception(failure)
         self._reset_tor_state()
         self.notify(DirectorEvent("error",
                                   "Failed to start Tor"))
         return failure
 
     def _tor_startup_success(self, result):
+        log.msg("Tor has started")
         self._tor_state = 'running'
         self.notify(DirectorEvent("success",
                                   "Successfully started Tor"))
@@ -187,22 +191,21 @@ class Director(object):
         if start_tor:
             yield self.start_tor(check_incoherences)
 
-        if config.global_options.get('no-geoip'):
+        no_geoip = config.global_options.get('no-geoip', False)
+        if no_geoip:
             aux = [False]
             if config.global_options.get('annotations') is not None:
                 annotations = [k.lower() for k in config.global_options['annotations'].keys()]
                 aux = map(lambda x: x in annotations, ["city", "country", "asn"])
             if not all(aux):
                 log.msg("You should add annotations for the country, city and ASN")
-        else:
-            yield config.probe_ip.lookup()
-            self.notify(DirectorEvent("success",
-                                      "Looked up Probe IP"))
 
-        if create_input_store:
-            yield self.input_store.create(config.probe_ip.geodata["countrycode"])
-            self.notify(DirectorEvent("success",
-                                      "Created input store"))
+        self.notify(DirectorEvent("success",
+                                  "Running system tasks"))
+        yield run_system_tasks(no_geoip=no_geoip,
+                               no_input_store=not create_input_store)
+        self.notify(DirectorEvent("success",
+                                  "Ran system tasks"))
 
     @defer.inlineCallbacks
     def start(self, start_tor=False, check_incoherences=True,
@@ -284,7 +287,8 @@ class Director(object):
 
     def netTestDone(self, net_test):
         self.notify(DirectorEvent("success",
-                                  "Successfully ran net_test"))
+                                  "Successfully ran test {0}".format(
+                                      net_test.testDetails['test_name'])))
         self.activeNetTests.remove(net_test)
         if len(self.activeNetTests) == 0:
             self.allTestsDone.callback(None)
@@ -371,13 +375,18 @@ class Director(object):
             log.debug("Tor is already running")
             defer.returnValue(self._tor_state)
         elif self._tor_state == 'starting':
+            log.debug("Tor is starting")
             yield self._tor_starting
             defer.returnValue(self._tor_state)
 
         log.msg("Starting Tor...")
         self._tor_state = 'starting'
         if check_incoherences:
-            yield config.check_tor()
+            try:
+                yield config.check_tor()
+            except Exception as exc:
+                self._tor_starting.errback(Failure(exc))
+                raise exc
 
         if config.advanced.start_tor and config.tor_state is None:
             tor_config = TorConfig()
@@ -438,3 +447,7 @@ class Director(object):
                 self._tor_starting.callback(self._tor_state)
             except Exception as exc:
                 self._tor_starting.errback(Failure(exc))
+        else:
+            # This happens when we require tor to not be started and the
+            # socks port is set.
+            self._tor_starting.callback(self._tor_state)
diff --git a/ooni/geoip.py b/ooni/geoip.py
index f118268..2a7ec92 100644
--- a/ooni/geoip.py
+++ b/ooni/geoip.py
@@ -31,8 +31,12 @@ class GeoIPDataFilesNotFound(Exception):
 def IPToLocation(ipaddr):
     from ooni.settings import config
 
-    country_file = config.get_data_file_path('GeoIP/GeoIP.dat')
-    asn_file = config.get_data_file_path('GeoIP/GeoIPASNum.dat')
+    country_file = config.get_data_file_path(
+        'resources/maxmind-geoip/GeoIP.dat'
+    )
+    asn_file = config.get_data_file_path(
+        'resources/maxmind-geoip/GeoIPASNum.dat'
+    )
 
     location = {'city': None, 'countrycode': 'ZZ', 'asn': 'AS0'}
     if not asn_file or not country_file:
@@ -69,7 +73,9 @@ def database_version():
     }
 
     for key in version.keys():
-        geoip_file = config.get_data_file_path("GeoIP/" + key + ".dat")
+        geoip_file = config.get_data_file_path(
+            "resources/maxmind-geoip/" + key + ".dat"
+        )
         if not geoip_file or not os.path.isfile(geoip_file):
             continue
         timestamp = os.stat(geoip_file).st_mtime
diff --git a/ooni/nettest.py b/ooni/nettest.py
index 2a33a2f..88e4953 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -2,7 +2,6 @@ import os
 import re
 import time
 import sys
-from hashlib import sha256
 
 from twisted.internet import defer
 from twisted.trial.runner import filenameToModule
@@ -199,6 +198,7 @@ class NetTestLoader(object):
             'probe_city': config.probe_ip.geodata['city'],
             'software_name': 'ooniprobe',
             'software_version': ooniprobe_version,
+            # XXX only sanitize the input files
             'options': sanitize_options(self.options),
             'annotations': self.annotations,
             'data_format_version': '0.2.0',
@@ -206,8 +206,8 @@ class NetTestLoader(object):
             'test_version': self.testVersion,
             'test_helpers': self.testHelpers,
             'test_start_time': otime.timestampNowLongUTC(),
-            'input_hashes': [input_file['hash']
-                             for input_file in self.inputFiles],
+            # XXX We should deprecate this key very soon
+            'input_hashes': [],
             'report_id': self.reportId
         }
 
@@ -235,29 +235,14 @@ class NetTestLoader(object):
         input_file = {
             'key': key,
             'test_options': self.localOptions,
-            'hash': None,
-
-            'url': None,
-            'address': None,
-
             'filename': None
         }
         m = ONION_INPUT_REGEXP.match(filename)
         if m:
-            input_file['url'] = filename
-            input_file['address'] = m.group(1)
-            input_file['hash'] = m.group(2)
+            raise e.InvalidInputFile("Input files hosted on hidden services "
+                                     "are not longer supported")
         else:
             input_file['filename'] = filename
-            try:
-                with open(filename) as f:
-                    h = sha256()
-                    for l in f:
-                        h.update(l)
-            except Exception as exc:
-                log.exception(exc)
-                raise e.InvalidInputFile(filename)
-            input_file['hash'] = h.hexdigest()
         self.inputFiles.append(input_file)
 
     def _accumulateTestOptions(self, test_class):
diff --git a/ooni/nettests/manipulation/http_invalid_request_line.py b/ooni/nettests/manipulation/http_invalid_request_line.py
index 94b0b99..be0497c 100644
--- a/ooni/nettests/manipulation/http_invalid_request_line.py
+++ b/ooni/nettests/manipulation/http_invalid_request_line.py
@@ -42,14 +42,14 @@ class HTTPInvalidRequestLine(tcpt.TCPTest):
         self.address = self.localOptions['backend']
         self.report['tampering'] = None
 
-    def check_for_manipulation(self, response, payload):
+    def check_for_manipulation(self, response, payload, manipulation_type):
         log.debug("Checking if %s == %s" % (response, payload))
         if response != payload:
-            log.msg("Detected manipulation!")
+            log.msg("{0}: Detected manipulation!".format(manipulation_type))
             log.msg(response)
             self.report['tampering'] = True
         else:
-            log.msg("No manipulation detected.")
+            log.msg("{0}: No manipulation detected.".format(manipulation_type))
             self.report['tampering'] = False
 
     def test_random_invalid_method(self):
@@ -75,7 +75,7 @@ class HTTPInvalidRequestLine(tcpt.TCPTest):
         payload = randomSTR(4) + " / HTTP/1.1\n\r"
 
         d = self.sendPayload(payload)
-        d.addCallback(self.check_for_manipulation, payload)
+        d.addCallback(self.check_for_manipulation, payload, 'random_invalid_method')
         return d
 
     def test_random_invalid_field_count(self):
@@ -91,7 +91,7 @@ class HTTPInvalidRequestLine(tcpt.TCPTest):
         payload += "\n\r"
 
         d = self.sendPayload(payload)
-        d.addCallback(self.check_for_manipulation, payload)
+        d.addCallback(self.check_for_manipulation, payload, 'random_invalid_field_count')
         return d
 
     def test_random_big_request_method(self):
@@ -103,7 +103,7 @@ class HTTPInvalidRequestLine(tcpt.TCPTest):
         payload = randomStr(1024) + ' / HTTP/1.1\n\r'
 
         d = self.sendPayload(payload)
-        d.addCallback(self.check_for_manipulation, payload)
+        d.addCallback(self.check_for_manipulation, payload, 'random_big_request_method')
         return d
 
     def test_random_invalid_version_number(self):
@@ -116,5 +116,5 @@ class HTTPInvalidRequestLine(tcpt.TCPTest):
         payload += '\n\r'
 
         d = self.sendPayload(payload)
-        d.addCallback(self.check_for_manipulation, payload)
+        d.addCallback(self.check_for_manipulation, payload, 'random_invalid_version_number')
         return d
diff --git a/ooni/resources.py b/ooni/resources.py
index d49e679..d67908c 100644
--- a/ooni/resources.py
+++ b/ooni/resources.py
@@ -1,5 +1,9 @@
+import os
+import gzip
 import json
+import shutil
 
+from twisted.python.runtime import platform
 from twisted.python.filepath import FilePath
 from twisted.internet import defer
 from twisted.web.client import downloadPage, getPage
@@ -66,11 +70,27 @@ def get_out_of_date_resources(current_manifest, new_manifest,
             #  the manifest claims we have a more up to date version.
             # This happens if an update by country_code happened and a new
             # country code is now required.
+            if filename.endswith(".gz"):
+                filename = filename[:-3]
             if not _resources.child(pre_path).child(filename).exists():
                 paths_to_update.append(info)
 
     return paths_to_update, paths_to_delete
 
+def gunzip(file_path):
+    tmp_location = FilePath(file_path).temporarySibling()
+    in_file = gzip.open(file_path)
+    with tmp_location.open('w') as out_file:
+        shutil.copyfileobj(in_file, out_file)
+    in_file.close()
+    rename(tmp_location.path, file_path)
+
+def rename(src, dst):
+    # Best effort atomic renaming
+    if platform.isWindows() and os.path.exists(dst):
+        os.unlink(dst)
+    os.rename(src, dst)
+
 @defer.inlineCallbacks
 def check_for_update(country_code=None):
     """
@@ -88,44 +108,48 @@ def check_for_update(country_code=None):
     current_version = get_current_version()
     latest_version = yield get_latest_version()
 
-    # We are already at the latest version
-    if current_version == latest_version:
-        defer.returnValue(latest_version)
-
     resources_dir = FilePath(config.resources_directory)
     resources_dir.makedirs(ignoreExistingDirectory=True)
     current_manifest = resources_dir.child("manifest.json")
 
-    new_manifest = current_manifest.temporarySibling()
-    new_manifest.alwaysCreate = 0
-
-    temporary_files.append((current_manifest, new_manifest))
-
-    try:
-        yield downloadPage(
-            get_download_url(latest_version, "manifest.json"),
-            new_manifest.path
-        )
-    except:
-        cleanup()
-        raise UpdateFailure("Failed to download manifest")
-
-    new_manifest_data = json.loads(new_manifest.getContent())
-
     if current_manifest.exists():
         with current_manifest.open("r") as f:
-            current_manifest_data = json.loads(f)
+            current_manifest_data = json.load(f)
     else:
         current_manifest_data = {
             "resources": []
         }
 
+    # We should download a newer manifest
+    if current_version < latest_version:
+        new_manifest = current_manifest.temporarySibling()
+        new_manifest.alwaysCreate = 0
+
+        temporary_files.append((current_manifest, new_manifest))
+
+        try:
+            yield downloadPage(
+                get_download_url(latest_version, "manifest.json"),
+                new_manifest.path
+            )
+        except:
+            cleanup()
+            raise UpdateFailure("Failed to download manifest")
+
+        new_manifest_data = json.loads(new_manifest.getContent())
+    else:
+        new_manifest_data = current_manifest_data
+
     to_update, to_delete = get_out_of_date_resources(
             current_manifest_data, new_manifest_data, country_code)
 
     try:
         for resource in to_update:
+            gzipped = False
             pre_path, filename = resource["path"].split("/")
+            if filename.endswith(".gz"):
+                filename = filename[:-3]
+                gzipped = True
             dst_file = resources_dir.child(pre_path).child(filename)
             dst_file.parent().makedirs(ignoreExistingDirectory=True)
             src_file = dst_file.temporarySibling()
@@ -135,8 +159,9 @@ def check_for_update(country_code=None):
             # The paths for the download require replacing "/" with "."
             download_url = get_download_url(latest_version,
                                             resource["path"].replace("/", "."))
-            print("Downloading {0}".format(download_url))
             yield downloadPage(download_url, src_file.path)
+            if gzipped:
+                gunzip(src_file.path)
     except Exception as exc:
         cleanup()
         log.exception(exc)
@@ -145,7 +170,7 @@ def check_for_update(country_code=None):
     for dst_file, src_file in temporary_files:
         log.msg("Moving {0} to {1}".format(src_file.path,
                                            dst_file.path))
-        src_file.moveTo(dst_file)
+        rename(src_file.path, dst_file.path)
 
     for resource in to_delete:
         log.msg("Deleting old resources")
diff --git a/ooni/settings.py b/ooni/settings.py
index b73e2f2..632dbe4 100644
--- a/ooni/settings.py
+++ b/ooni/settings.py
@@ -101,6 +101,8 @@ class OConfig(object):
         else:
             self.inputs_directory = os.path.join(self.ooni_home, 'inputs')
 
+        self.scheduler_directory = os.path.join(self.ooni_home, 'scheduler')
+
         if self.advanced.decks_dir:
             self.decks_directory = self.advanced.decks_dir
         else:
diff --git a/ooni/ui/cli.py b/ooni/ui/cli.py
index 57924ec..e8d747c 100644
--- a/ooni/ui/cli.py
+++ b/ooni/ui/cli.py
@@ -305,7 +305,8 @@ def createDeck(global_options, url=None):
     return deck
 
 
-def runTestWithDirector(director, global_options, url=None, start_tor=True,
+def runTestWithDirector(director, global_options, url=None,
+                        start_tor=True,
                         create_input_store=True):
     deck = createDeck(global_options, url=url)
 
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index f9886ac..0a3d1ca 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -84,10 +84,7 @@ class WebUIAPI(object):
             "software_name": "ooniprobe",
             "asn": config.probe_ip.geodata['asn'],
             "country_code": config.probe_ip.geodata['countrycode'],
-            "active_measurements": {},
-            "completed_measurements": [],
-            "director_started": False,
-            "failures": []
+            "director_started": False
         }
 
         self.status_poller = LongPoller(
@@ -103,33 +100,17 @@ class WebUIAPI(object):
         d = self.director.start()
 
         d.addCallback(self.director_started)
-        d.addErrback(self.director_startup_failed)
         d.addBoth(lambda _: self.status_poller.notify())
 
     def handle_director_event(self, event):
         log.msg("Handling event {0}".format(event.type))
         self.director_event_poller.notify(event)
 
-    def add_failure(self, failure):
-        self.status['failures'].append(str(failure))
-
     def director_started(self, _):
         self.status['director_started'] = True
         self.status["asn"] = config.probe_ip.geodata['asn']
         self.status["country_code"] = config.probe_ip.geodata['countrycode']
 
-    def director_startup_failed(self, failure):
-        self.add_failure(failure)
-
-    def completed_measurement(self, measurement_id):
-        del self.status['active_measurements'][measurement_id]
-        self.status['completed_measurements'].append(measurement_id)
-
-    def failed_measurement(self, measurement_id, failure):
-        log.exception(failure)
-        del self.status['active_measurements'][measurement_id]
-        self.add_failure(str(failure))
-
     @app.handle_errors(NotFound)
     def not_found(self, request, _):
         request.redirect('/client/')
@@ -188,18 +169,9 @@ class WebUIAPI(object):
         return self.render_json({"command": "deck-list"}, request)
 
     def run_deck(self, deck):
-        for task_id in deck.task_ids:
-            self.status['active_measurements'][task_id] = {
-                'test_name': 'foobar',
-                'test_start_time': 'some start time'
-            }
-        self.status_poller.notify()
         deck.setup()
-        d = deck.run(self.director)
-        d.addCallback(lambda _:
-                      self.completed_measurement(task_id))
-        d.addErrback(lambda failure:
-                     self.failed_measurement(task_id, failure))
+        # Here there is a dangling deferred
+        deck.run(self.director)
 
     @app.route('/api/nettest/<string:test_name>/start', methods=["POST"])
     def api_nettest_start(self, request, test_name):





More information about the tor-commits mailing list