commit 546d1e3b2d9b1cbf0bebe2b032acc0d8b87e13e4 Author: Arturo Filastò arturo@filasto.net Date: Tue Jul 26 21:18:04 2016 +0200
Create a set of scheduled tasks to be run by the agent in background
* Remove unneeded filename hash and sprinkle notes on it's future deprecation * Fix some bugs in the resources update procedure --- ooni/agent/scheduler.py | 144 ++++++++++++++++++++- ooni/contrib/__init__.py | 2 +- ooni/contrib/croniter.py | 11 ++ ooni/deck.py | 38 +++--- ooni/director.py | 35 +++-- ooni/geoip.py | 12 +- ooni/nettest.py | 25 +--- .../manipulation/http_invalid_request_line.py | 14 +- ooni/resources.py | 71 ++++++---- ooni/settings.py | 2 + ooni/ui/cli.py | 3 +- ooni/ui/web/server.py | 34 +---- 12 files changed, 271 insertions(+), 120 deletions(-)
diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py index 1004597..ace3bc5 100644 --- a/ooni/agent/scheduler.py +++ b/ooni/agent/scheduler.py @@ -1,5 +1,126 @@ +from datetime import datetime + from twisted.application import service -from twisted.internet import task +from twisted.internet import task, defer +from twisted.python.filepath import FilePath + +from ooni import resources +from ooni.utils import log +from ooni.deck import input_store +from ooni.settings import config +from ooni.contrib import croniter + +class ScheduledTask(object): + _time_format = "%Y-%m-%dT%H:%M:%SZ" + schedule = None + identifier = None + + def __init__(self, schedule=None): + if schedule is not None: + self.schedule = schedule + + assert self.identifier is not None, "self.identifier must be set" + assert self.schedule is not None, "self.schedule must be set" + scheduler_directory = config.scheduler_directory + + self._last_run = FilePath(scheduler_directory).child(self.identifier) + self._last_run_lock = defer.DeferredFilesystemLock( + FilePath(scheduler_directory).child(self.identifier + ".lock").path + ) + + @property + def should_run(self): + current_time = datetime.utcnow() + next_cycle = croniter(self.schedule, self.last_run).get_next(datetime) + if next_cycle <= current_time: + return True + return False + + @property + def last_run(self): + if not self._last_run.exists(): + return datetime.fromtimestamp(0) + with self._last_run.open('r') as in_file: + date_str = in_file.read() + return datetime.strptime(date_str, self._time_format) + + def _update_last_run(self): + with self._last_run.open('w') as out_file: + current_time = datetime.utcnow() + out_file.write(current_time.strftime(self._time_format)) + + def task(self): + raise NotImplemented + + @defer.inlineCallbacks + def run(self): + yield self._last_run_lock.deferUntilLocked() + if not self.should_run: + self._last_run_lock.unlock() + defer.returnValue(None) + try: + yield self.task() + self._update_last_run() + except: + raise + finally: + self._last_run_lock.unlock() + +class UpdateInputsAndResources(ScheduledTask): + identifier = "ooni-update-inputs" + schedule = "@daily" + + @defer.inlineCallbacks + def task(self): + log.debug("Updating the inputs") + yield resources.check_for_update(config.probe_ip.geodata['countrycode']) + yield input_store.update(config.probe_ip.geodata['countrycode']) + +class UpdateProbeIp(ScheduledTask): + identifier = "ooni-update-probe-ip" + schedule = "@hourly" + # XXX we need to ensure this is always run the first time ooniprobe or + # ooniprobe-agent is started or implement on disk caching of the users + # IP address. + + def task(self): + log.debug("Updating the probe IP") + return config.probe_ip.lookup() + +class CleanupInProgressReports(ScheduledTask): + identifier = 'ooni-cleanup-reports' + schedule = '@daily' + +class UploadMissingReports(ScheduledTask): + identifier = 'ooni-cleanup-reports' + schedule = '@weekly' + +# Order mattters +SYSTEM_TASKS = [ + UpdateProbeIp, + UpdateInputsAndResources +] + +@defer.inlineCallbacks +def run_system_tasks(no_geoip=False, no_input_store=False): + task_classes = SYSTEM_TASKS[:] + + if no_geoip: + log.debug("Not updating probe IP") + task_classes.pop(UpdateProbeIp) + + if no_input_store: + log.debug("Not updating the inputs") + task_classes.pop(UpdateInputsAndResources) + + for task_class in task_classes: + task = task_class() + log.debug("Running task {0}".format(task.identifier)) + try: + yield task.run() + except Exception as exc: + log.err("Failed to run task {0}".format(task.identifier)) + log.exception(exc)
class SchedulerService(service.MultiService): """ @@ -10,16 +131,35 @@ class SchedulerService(service.MultiService): self.director = director self.interval = interval self._looping_call = task.LoopingCall(self._should_run) + self._scheduled_tasks = [] + + def schedule(self, task): + self._scheduled_tasks.append(task) + + def _task_failed(self, failure, task): + log.msg("Failed to run {0}".format(task.identifier)) + log.exception(failure) + + def _task_success(self, result, task): + log.msg("Ran {0}".format(task.identifier))
def _should_run(self): """ This function is called every self.interval seconds to check which periodic tasks should be run. """ - pass + for task in self._scheduled_tasks: + log.debug("Running task {0}".format(task.identifier)) + d = task.run() + d.addErrback(self._task_failed, task) + d.addCallback(self._task_success, task)
def startService(self): service.MultiService.startService(self) + + self.schedule(UpdateProbeIp()) + self.schedule(UpdateInputsAndResources()) + self._looping_call.start(self.interval)
def stopService(self): diff --git a/ooni/contrib/__init__.py b/ooni/contrib/__init__.py index 50b6b54..28aad30 100644 --- a/ooni/contrib/__init__.py +++ b/ooni/contrib/__init__.py @@ -1 +1 @@ -from ._crontab import CronTab +from .croniter import croniter diff --git a/ooni/contrib/croniter.py b/ooni/contrib/croniter.py index 5864603..653dbbf 100644 --- a/ooni/contrib/croniter.py +++ b/ooni/contrib/croniter.py @@ -49,6 +49,15 @@ class croniter(object): {}, )
+ ALIASES = { + '@yearly': '0 0 1 1 *', + '@annually': '0 0 1 1 *', + '@monthly': '0 0 1 * *', + '@weekly': '0 0 * * 0', + '@daily': '0 0 * * *', + '@hourly': '0 * * * *', + } + bad_length = 'Exactly 5 or 6 columns has to be specified for iterator' \ 'expression.'
@@ -63,6 +72,8 @@ class croniter(object): start_time = self._datetime_to_timestamp(start_time)
self.cur = start_time + if expr_format in self.ALIASES: + expr_format = self.ALIASES[expr_format] self.exprs = expr_format.split()
if len(self.exprs) != 5 and len(self.exprs) != 6: diff --git a/ooni/deck.py b/ooni/deck.py index 4794d30..0434d81 100644 --- a/ooni/deck.py +++ b/ooni/deck.py @@ -4,7 +4,6 @@ import csv import json
from copy import deepcopy -from hashlib import sha256
import yaml
@@ -236,7 +235,8 @@ def lookup_collector_and_test_helpers(net_test_loaders, 'name': net_test_loader.testName, 'version': net_test_loader.testVersion, 'test-helpers': [], - 'input-hashes': [x['hash'] for x in net_test_loader.inputFiles] + # XXX deprecate this very soon + 'input-hashes': [] } if not net_test_loader.collector and not no_collector: requires_collector = True @@ -262,15 +262,16 @@ def lookup_collector_and_test_helpers(net_test_loaders, log.err("Could not find any reachable test helpers") raise
- def find_collector_and_test_helpers(test_name, test_version, input_files): - input_files = [u""+x['hash'] for x in input_files] + def find_collector_and_test_helpers(test_name, test_version): + # input_files = [u""+x['hash'] for x in input_files] for net_test in provided_net_tests: if net_test['name'] != test_name: continue if net_test['version'] != test_version: continue - if set(net_test['input-hashes']) != set(input_files): - continue + # XXX remove the notion of policies based on input file hashes + # if set(net_test['input-hashes']) != set(input_files): + # continue return net_test['collector'], net_test['test-helpers']
for net_test_loader in net_test_loaders: @@ -280,8 +281,8 @@ def lookup_collector_and_test_helpers(net_test_loaders, collector, test_helpers = \ find_collector_and_test_helpers( test_name=net_test_loader.testName, - test_version=net_test_loader.testVersion, - input_files=net_test_loader.inputFiles + test_version=net_test_loader.testVersion + # input_files=net_test_loader.inputFiles )
for option, name in net_test_loader.missingTestHelpers: @@ -455,6 +456,7 @@ class InputStore(object): "id": "citizenlab_{0}_urls".format(cc), "type": "file/url" }, out_fh) + self._cache_stale = True
@defer.inlineCallbacks def create(self, country_code=None): @@ -523,13 +525,11 @@ def resolve_file_path(v, prepath=None): return FilePath(prepath).preauthChild(v).path return v
-def options_to_args(options, prepath=None): +def options_to_args(options): args = [] for k, v in options.items(): if v is None: continue - if k == "file": - v = resolve_file_path(v, prepath) if v == False or v == 0: continue if (len(k)) == 1: @@ -625,7 +625,7 @@ class DeckTask(object): collector_address = None
net_test_loader = NetTestLoader( - options_to_args(task_data, self.cwd), + options_to_args(task_data), annotations=annotations, test_file=nettest_path ) @@ -653,6 +653,9 @@ class DeckTask(object): self.ooni['net_test_loader'] = net_test_loader
def _setup_ooni(self): + for input_file in self.ooni['net_test_loader'].inputFiles: + file_path = resolve_file_path(input_file['filename'], self.cwd) + input_file['test_options'][input_file['key']] = file_path self.ooni['test_details'] = self.ooni['net_test_loader'].getTestDetails() self.id = generate_filename(self.ooni['test_details'])
@@ -670,15 +673,8 @@ class DeckTask(object): if task_type not in self._supported_tasks: raise UnknownTaskKey(task_type) self.type = task_type - try: - getattr(self, "_load_"+task_type)(task_data) - except InputNotFound: - log.debug( - "Will skip running this test because I can't find the input" - ) - self._skip = True - - assert len(data) == 0 + getattr(self, "_load_"+task_type)(task_data) + assert len(data) == 0, "Got an unidentified key"
class NotAnOption(Exception): pass diff --git a/ooni/director.py b/ooni/director.py index 84bc9aa..1ab076b 100644 --- a/ooni/director.py +++ b/ooni/director.py @@ -13,6 +13,7 @@ from ooni.settings import config from ooni.nettest import normalizeTestName from ooni.deck import InputStore
+from ooni.agent.scheduler import run_system_tasks from ooni.utils.onion import start_tor, connect_to_control_port
class DirectorEvent(object): @@ -139,12 +140,15 @@ class Director(object): self._tor_starting.addCallback(self._tor_startup_success)
def _tor_startup_failure(self, failure): + log.msg("Failed to start tor") + log.exception(failure) self._reset_tor_state() self.notify(DirectorEvent("error", "Failed to start Tor")) return failure
def _tor_startup_success(self, result): + log.msg("Tor has started") self._tor_state = 'running' self.notify(DirectorEvent("success", "Successfully started Tor")) @@ -187,22 +191,21 @@ class Director(object): if start_tor: yield self.start_tor(check_incoherences)
- if config.global_options.get('no-geoip'): + no_geoip = config.global_options.get('no-geoip', False) + if no_geoip: aux = [False] if config.global_options.get('annotations') is not None: annotations = [k.lower() for k in config.global_options['annotations'].keys()] aux = map(lambda x: x in annotations, ["city", "country", "asn"]) if not all(aux): log.msg("You should add annotations for the country, city and ASN") - else: - yield config.probe_ip.lookup() - self.notify(DirectorEvent("success", - "Looked up Probe IP"))
- if create_input_store: - yield self.input_store.create(config.probe_ip.geodata["countrycode"]) - self.notify(DirectorEvent("success", - "Created input store")) + self.notify(DirectorEvent("success", + "Running system tasks")) + yield run_system_tasks(no_geoip=no_geoip, + no_input_store=not create_input_store) + self.notify(DirectorEvent("success", + "Ran system tasks"))
@defer.inlineCallbacks def start(self, start_tor=False, check_incoherences=True, @@ -284,7 +287,8 @@ class Director(object):
def netTestDone(self, net_test): self.notify(DirectorEvent("success", - "Successfully ran net_test")) + "Successfully ran test {0}".format( + net_test.testDetails['test_name']))) self.activeNetTests.remove(net_test) if len(self.activeNetTests) == 0: self.allTestsDone.callback(None) @@ -371,13 +375,18 @@ class Director(object): log.debug("Tor is already running") defer.returnValue(self._tor_state) elif self._tor_state == 'starting': + log.debug("Tor is starting") yield self._tor_starting defer.returnValue(self._tor_state)
log.msg("Starting Tor...") self._tor_state = 'starting' if check_incoherences: - yield config.check_tor() + try: + yield config.check_tor() + except Exception as exc: + self._tor_starting.errback(Failure(exc)) + raise exc
if config.advanced.start_tor and config.tor_state is None: tor_config = TorConfig() @@ -438,3 +447,7 @@ class Director(object): self._tor_starting.callback(self._tor_state) except Exception as exc: self._tor_starting.errback(Failure(exc)) + else: + # This happens when we require tor to not be started and the + # socks port is set. + self._tor_starting.callback(self._tor_state) diff --git a/ooni/geoip.py b/ooni/geoip.py index f118268..2a7ec92 100644 --- a/ooni/geoip.py +++ b/ooni/geoip.py @@ -31,8 +31,12 @@ class GeoIPDataFilesNotFound(Exception): def IPToLocation(ipaddr): from ooni.settings import config
- country_file = config.get_data_file_path('GeoIP/GeoIP.dat') - asn_file = config.get_data_file_path('GeoIP/GeoIPASNum.dat') + country_file = config.get_data_file_path( + 'resources/maxmind-geoip/GeoIP.dat' + ) + asn_file = config.get_data_file_path( + 'resources/maxmind-geoip/GeoIPASNum.dat' + )
location = {'city': None, 'countrycode': 'ZZ', 'asn': 'AS0'} if not asn_file or not country_file: @@ -69,7 +73,9 @@ def database_version(): }
for key in version.keys(): - geoip_file = config.get_data_file_path("GeoIP/" + key + ".dat") + geoip_file = config.get_data_file_path( + "resources/maxmind-geoip/" + key + ".dat" + ) if not geoip_file or not os.path.isfile(geoip_file): continue timestamp = os.stat(geoip_file).st_mtime diff --git a/ooni/nettest.py b/ooni/nettest.py index 2a33a2f..88e4953 100644 --- a/ooni/nettest.py +++ b/ooni/nettest.py @@ -2,7 +2,6 @@ import os import re import time import sys -from hashlib import sha256
from twisted.internet import defer from twisted.trial.runner import filenameToModule @@ -199,6 +198,7 @@ class NetTestLoader(object): 'probe_city': config.probe_ip.geodata['city'], 'software_name': 'ooniprobe', 'software_version': ooniprobe_version, + # XXX only sanitize the input files 'options': sanitize_options(self.options), 'annotations': self.annotations, 'data_format_version': '0.2.0', @@ -206,8 +206,8 @@ class NetTestLoader(object): 'test_version': self.testVersion, 'test_helpers': self.testHelpers, 'test_start_time': otime.timestampNowLongUTC(), - 'input_hashes': [input_file['hash'] - for input_file in self.inputFiles], + # XXX We should deprecate this key very soon + 'input_hashes': [], 'report_id': self.reportId }
@@ -235,29 +235,14 @@ class NetTestLoader(object): input_file = { 'key': key, 'test_options': self.localOptions, - 'hash': None, - - 'url': None, - 'address': None, - 'filename': None } m = ONION_INPUT_REGEXP.match(filename) if m: - input_file['url'] = filename - input_file['address'] = m.group(1) - input_file['hash'] = m.group(2) + raise e.InvalidInputFile("Input files hosted on hidden services " + "are not longer supported") else: input_file['filename'] = filename - try: - with open(filename) as f: - h = sha256() - for l in f: - h.update(l) - except Exception as exc: - log.exception(exc) - raise e.InvalidInputFile(filename) - input_file['hash'] = h.hexdigest() self.inputFiles.append(input_file)
def _accumulateTestOptions(self, test_class): diff --git a/ooni/nettests/manipulation/http_invalid_request_line.py b/ooni/nettests/manipulation/http_invalid_request_line.py index 94b0b99..be0497c 100644 --- a/ooni/nettests/manipulation/http_invalid_request_line.py +++ b/ooni/nettests/manipulation/http_invalid_request_line.py @@ -42,14 +42,14 @@ class HTTPInvalidRequestLine(tcpt.TCPTest): self.address = self.localOptions['backend'] self.report['tampering'] = None
- def check_for_manipulation(self, response, payload): + def check_for_manipulation(self, response, payload, manipulation_type): log.debug("Checking if %s == %s" % (response, payload)) if response != payload: - log.msg("Detected manipulation!") + log.msg("{0}: Detected manipulation!".format(manipulation_type)) log.msg(response) self.report['tampering'] = True else: - log.msg("No manipulation detected.") + log.msg("{0}: No manipulation detected.".format(manipulation_type)) self.report['tampering'] = False
def test_random_invalid_method(self): @@ -75,7 +75,7 @@ class HTTPInvalidRequestLine(tcpt.TCPTest): payload = randomSTR(4) + " / HTTP/1.1\n\r"
d = self.sendPayload(payload) - d.addCallback(self.check_for_manipulation, payload) + d.addCallback(self.check_for_manipulation, payload, 'random_invalid_method') return d
def test_random_invalid_field_count(self): @@ -91,7 +91,7 @@ class HTTPInvalidRequestLine(tcpt.TCPTest): payload += "\n\r"
d = self.sendPayload(payload) - d.addCallback(self.check_for_manipulation, payload) + d.addCallback(self.check_for_manipulation, payload, 'random_invalid_field_count') return d
def test_random_big_request_method(self): @@ -103,7 +103,7 @@ class HTTPInvalidRequestLine(tcpt.TCPTest): payload = randomStr(1024) + ' / HTTP/1.1\n\r'
d = self.sendPayload(payload) - d.addCallback(self.check_for_manipulation, payload) + d.addCallback(self.check_for_manipulation, payload, 'random_big_request_method') return d
def test_random_invalid_version_number(self): @@ -116,5 +116,5 @@ class HTTPInvalidRequestLine(tcpt.TCPTest): payload += '\n\r'
d = self.sendPayload(payload) - d.addCallback(self.check_for_manipulation, payload) + d.addCallback(self.check_for_manipulation, payload, 'random_invalid_version_number') return d diff --git a/ooni/resources.py b/ooni/resources.py index d49e679..d67908c 100644 --- a/ooni/resources.py +++ b/ooni/resources.py @@ -1,5 +1,9 @@ +import os +import gzip import json +import shutil
+from twisted.python.runtime import platform from twisted.python.filepath import FilePath from twisted.internet import defer from twisted.web.client import downloadPage, getPage @@ -66,11 +70,27 @@ def get_out_of_date_resources(current_manifest, new_manifest, # the manifest claims we have a more up to date version. # This happens if an update by country_code happened and a new # country code is now required. + if filename.endswith(".gz"): + filename = filename[:-3] if not _resources.child(pre_path).child(filename).exists(): paths_to_update.append(info)
return paths_to_update, paths_to_delete
+def gunzip(file_path): + tmp_location = FilePath(file_path).temporarySibling() + in_file = gzip.open(file_path) + with tmp_location.open('w') as out_file: + shutil.copyfileobj(in_file, out_file) + in_file.close() + rename(tmp_location.path, file_path) + +def rename(src, dst): + # Best effort atomic renaming + if platform.isWindows() and os.path.exists(dst): + os.unlink(dst) + os.rename(src, dst) + @defer.inlineCallbacks def check_for_update(country_code=None): """ @@ -88,44 +108,48 @@ def check_for_update(country_code=None): current_version = get_current_version() latest_version = yield get_latest_version()
- # We are already at the latest version - if current_version == latest_version: - defer.returnValue(latest_version) - resources_dir = FilePath(config.resources_directory) resources_dir.makedirs(ignoreExistingDirectory=True) current_manifest = resources_dir.child("manifest.json")
- new_manifest = current_manifest.temporarySibling() - new_manifest.alwaysCreate = 0 - - temporary_files.append((current_manifest, new_manifest)) - - try: - yield downloadPage( - get_download_url(latest_version, "manifest.json"), - new_manifest.path - ) - except: - cleanup() - raise UpdateFailure("Failed to download manifest") - - new_manifest_data = json.loads(new_manifest.getContent()) - if current_manifest.exists(): with current_manifest.open("r") as f: - current_manifest_data = json.loads(f) + current_manifest_data = json.load(f) else: current_manifest_data = { "resources": [] }
+ # We should download a newer manifest + if current_version < latest_version: + new_manifest = current_manifest.temporarySibling() + new_manifest.alwaysCreate = 0 + + temporary_files.append((current_manifest, new_manifest)) + + try: + yield downloadPage( + get_download_url(latest_version, "manifest.json"), + new_manifest.path + ) + except: + cleanup() + raise UpdateFailure("Failed to download manifest") + + new_manifest_data = json.loads(new_manifest.getContent()) + else: + new_manifest_data = current_manifest_data + to_update, to_delete = get_out_of_date_resources( current_manifest_data, new_manifest_data, country_code)
try: for resource in to_update: + gzipped = False pre_path, filename = resource["path"].split("/") + if filename.endswith(".gz"): + filename = filename[:-3] + gzipped = True dst_file = resources_dir.child(pre_path).child(filename) dst_file.parent().makedirs(ignoreExistingDirectory=True) src_file = dst_file.temporarySibling() @@ -135,8 +159,9 @@ def check_for_update(country_code=None): # The paths for the download require replacing "/" with "." download_url = get_download_url(latest_version, resource["path"].replace("/", ".")) - print("Downloading {0}".format(download_url)) yield downloadPage(download_url, src_file.path) + if gzipped: + gunzip(src_file.path) except Exception as exc: cleanup() log.exception(exc) @@ -145,7 +170,7 @@ def check_for_update(country_code=None): for dst_file, src_file in temporary_files: log.msg("Moving {0} to {1}".format(src_file.path, dst_file.path)) - src_file.moveTo(dst_file) + rename(src_file.path, dst_file.path)
for resource in to_delete: log.msg("Deleting old resources") diff --git a/ooni/settings.py b/ooni/settings.py index b73e2f2..632dbe4 100644 --- a/ooni/settings.py +++ b/ooni/settings.py @@ -101,6 +101,8 @@ class OConfig(object): else: self.inputs_directory = os.path.join(self.ooni_home, 'inputs')
+ self.scheduler_directory = os.path.join(self.ooni_home, 'scheduler') + if self.advanced.decks_dir: self.decks_directory = self.advanced.decks_dir else: diff --git a/ooni/ui/cli.py b/ooni/ui/cli.py index 57924ec..e8d747c 100644 --- a/ooni/ui/cli.py +++ b/ooni/ui/cli.py @@ -305,7 +305,8 @@ def createDeck(global_options, url=None): return deck
-def runTestWithDirector(director, global_options, url=None, start_tor=True, +def runTestWithDirector(director, global_options, url=None, + start_tor=True, create_input_store=True): deck = createDeck(global_options, url=url)
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py index f9886ac..0a3d1ca 100644 --- a/ooni/ui/web/server.py +++ b/ooni/ui/web/server.py @@ -84,10 +84,7 @@ class WebUIAPI(object): "software_name": "ooniprobe", "asn": config.probe_ip.geodata['asn'], "country_code": config.probe_ip.geodata['countrycode'], - "active_measurements": {}, - "completed_measurements": [], - "director_started": False, - "failures": [] + "director_started": False }
self.status_poller = LongPoller( @@ -103,33 +100,17 @@ class WebUIAPI(object): d = self.director.start()
d.addCallback(self.director_started) - d.addErrback(self.director_startup_failed) d.addBoth(lambda _: self.status_poller.notify())
def handle_director_event(self, event): log.msg("Handling event {0}".format(event.type)) self.director_event_poller.notify(event)
- def add_failure(self, failure): - self.status['failures'].append(str(failure)) - def director_started(self, _): self.status['director_started'] = True self.status["asn"] = config.probe_ip.geodata['asn'] self.status["country_code"] = config.probe_ip.geodata['countrycode']
- def director_startup_failed(self, failure): - self.add_failure(failure) - - def completed_measurement(self, measurement_id): - del self.status['active_measurements'][measurement_id] - self.status['completed_measurements'].append(measurement_id) - - def failed_measurement(self, measurement_id, failure): - log.exception(failure) - del self.status['active_measurements'][measurement_id] - self.add_failure(str(failure)) - @app.handle_errors(NotFound) def not_found(self, request, _): request.redirect('/client/') @@ -188,18 +169,9 @@ class WebUIAPI(object): return self.render_json({"command": "deck-list"}, request)
def run_deck(self, deck): - for task_id in deck.task_ids: - self.status['active_measurements'][task_id] = { - 'test_name': 'foobar', - 'test_start_time': 'some start time' - } - self.status_poller.notify() deck.setup() - d = deck.run(self.director) - d.addCallback(lambda _: - self.completed_measurement(task_id)) - d.addErrback(lambda failure: - self.failed_measurement(task_id, failure)) + # Here there is a dangling deferred + deck.run(self.director)
@app.route('/api/nettest/string:test_name/start', methods=["POST"]) def api_nettest_start(self, request, test_name):