commit c71375eed5db6dd1e11881c7045c2392c68aa42c Author: Arturo Filastò arturo@filasto.net Date: Fri Jul 29 00:20:45 2016 +0200
Implement tasks and add new APIs
* Add new API endpoints for deleting and keeping reports
* Implement tasks for uploading not submitted reports
* Implement tasks for cleaning up old reports --- ooni/agent/agent.py | 11 +++++++ ooni/agent/scheduler.py | 69 ++++++++++++++++++++++++++++++++++++------- ooni/deck/deck.py | 2 +- ooni/measurements.py | 67 +++++++++++++++++++++++++++++++++++++++++ ooni/results.py | 40 ------------------------- ooni/scripts/oonireport.py | 8 +++++ ooni/ui/web/client/index.html | 2 +- ooni/ui/web/server.py | 40 +++++++++++++++++++++++-- ooni/utils/__init__.py | 6 ++-- 9 files changed, 188 insertions(+), 57 deletions(-)
diff --git a/ooni/agent/agent.py b/ooni/agent/agent.py index a1394f0..f5322ed 100644 --- a/ooni/agent/agent.py +++ b/ooni/agent/agent.py @@ -1,6 +1,7 @@ from twisted.application import service from ooni.director import Director from ooni.settings import config +from ooni.utils import log
from ooni.ui.web.web import WebUIService from ooni.agent.scheduler import SchedulerService @@ -19,3 +20,13 @@ class AgentService(service.MultiService):
self.scheduler_service = SchedulerService(director) self.scheduler_service.setServiceParent(self) + + def startService(self): + service.MultiService.startService(self) + + log.start() + + def stopService(self): + service.MultiService.stopService(self) + + log.stop() diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py index 1288d6c..167a14a 100644 --- a/ooni/agent/scheduler.py +++ b/ooni/agent/scheduler.py @@ -4,12 +4,17 @@ from twisted.application import service from twisted.internet import task, defer from twisted.python.filepath import FilePath
+from ooni.scripts import oonireport from ooni import resources -from ooni.utils import log +from ooni.utils import log, SHORT_DATE from ooni.deck.store import input_store from ooni.settings import config from ooni.contrib import croniter from ooni.geoip import probe_ip +from ooni.measurements import list_measurements + +class DidNotRun(Exception): + pass
class ScheduledTask(object): _time_format = "%Y-%m-%dT%H:%M:%SZ" @@ -58,7 +63,7 @@ class ScheduledTask(object): yield self._last_run_lock.deferUntilLocked() if not self.should_run: self._last_run_lock.unlock() - defer.returnValue(None) + raise DidNotRun try: yield self.task() self._update_last_run() @@ -68,7 +73,7 @@ class ScheduledTask(object): self._last_run_lock.unlock()
class UpdateInputsAndResources(ScheduledTask): - identifier = "ooni-update-inputs" + identifier = "update-inputs" schedule = "@daily"
@defer.inlineCallbacks @@ -78,13 +83,50 @@ class UpdateInputsAndResources(ScheduledTask): yield resources.check_for_update(probe_ip.geodata['countrycode']) yield input_store.update(probe_ip.geodata['countrycode'])
-class CleanupInProgressReports(ScheduledTask): - identifier = 'ooni-cleanup-reports' + +class UploadReports(ScheduledTask): + """ + This task is used to submit to the collector reports that have not been + submitted and those that have been partially uploaded. + """ + identifier = 'upload-reports' + schedule = '@hourly' + + @defer.inlineCallbacks + def task(self): + yield oonireport.upload_all(upload_incomplete=True) + + +class DeleteOldReports(ScheduledTask): + """ + This task is used to delete reports that are older than a week. + """ + identifier = 'delete-old-reports' schedule = '@daily'
-class UploadMissingReports(ScheduledTask): - identifier = 'ooni-cleanup-reports' - schedule = '@weekly' + def task(self): + measurement_path = FilePath(config.measurements_directory) + for measurement in list_measurements(): + if measurement['keep'] is True: + continue + delta = datetime.utcnow() - \ + datetime.strptime(measurement['test_start_time'], + SHORT_DATE) + if delta.days >= 7: + log.debug("Deleting old report {0}".format(measurement["id"])) + measurement_path.child(measurement['id']).remove() + +class SendHeartBeat(ScheduledTask): + """ + This task is used to send a heartbeat that the probe is still alive and + well. + """ + identifier = 'send-heartbeat' + schedule = '@hourly' + + def task(self): + # XXX implement this + pass
# Order mattters SYSTEM_TASKS = [ @@ -122,8 +164,12 @@ class SchedulerService(service.MultiService): def schedule(self, task): self._scheduled_tasks.append(task)
+ def _task_did_not_run(self, failure, task): + failure.trap(DidNotRun) + log.debug("Did not run {0}".format(task.identifier)) + def _task_failed(self, failure, task): - log.debug("Failed to run {0}".format(task.identifier)) + log.err("Failed to run {0}".format(task.identifier)) log.exception(failure)
def _task_success(self, result, task): @@ -137,13 +183,16 @@ class SchedulerService(service.MultiService): for task in self._scheduled_tasks: log.debug("Running task {0}".format(task.identifier)) d = task.run() - d.addErrback(self._task_failed, task) + d.addErrback(self._task_did_not_run, task) d.addCallback(self._task_success, task) + d.addErrback(self._task_failed, task)
def startService(self): service.MultiService.startService(self)
self.schedule(UpdateInputsAndResources()) + self.schedule(UploadReports()) + self.schedule(DeleteOldReports())
self._looping_call.start(self.interval)
diff --git a/ooni/deck/deck.py b/ooni/deck/deck.py index c9b3fc5..868bfb8 100644 --- a/ooni/deck/deck.py +++ b/ooni/deck/deck.py @@ -13,7 +13,7 @@ from ooni.deck.legacy import convert_legacy_deck from ooni.deck.store import input_store from ooni.geoip import probe_ip from ooni.nettest import NetTestLoader, nettest_to_path -from ooni.results import generate_summary +from ooni.measurements import generate_summary from ooni.settings import config from ooni.utils import log, generate_filename
diff --git a/ooni/measurements.py b/ooni/measurements.py new file mode 100644 index 0000000..f830a99 --- /dev/null +++ b/ooni/measurements.py @@ -0,0 +1,67 @@ +import json +from twisted.python.filepath import FilePath +from ooni.settings import config + +class Process(): + supported_tests = [ + "web_connectivity" + ] + @staticmethod + def web_connectivity(entry): + result = {} + result['anomaly'] = False + if entry['test_keys']['blocking'] is not False: + result['anomaly'] = True + result['url'] = entry['input'] + return result + +def generate_summary(input_file, output_file): + results = {} + with open(input_file) as in_file: + for idx, line in enumerate(in_file): + entry = json.loads(line.strip()) + result = {} + if entry['test_name'] in Process.supported_tests: + result = getattr(Process, entry['test_name'])(entry) + result['idx'] = idx + results['test_name'] = entry['test_name'] + results['test_start_time'] = entry['test_start_time'] + results['country_code'] = entry['probe_cc'] + results['asn'] = entry['probe_asn'] + results['results'] = results.get('results', []) + results['results'].append(result) + + with open(output_file, "w") as fw: + json.dump(results, fw) + + +def list_measurements(): + measurements = [] + measurement_path = FilePath(config.measurements_directory) + for measurement_id in measurement_path.listdir(): + measurement = measurement_path.child(measurement_id) + completed = True + keep = False + if measurement.child("measurement.njson.progress").exists(): + completed = False + if measurement.child("keep").exists(): + keep = True + test_start_time, country_code, asn, test_name = \ + measurement_id.split("-")[:4] + measurements.append({ + "test_name": test_name, + "country_code": country_code, + "asn": asn, + "test_start_time": test_start_time, + "id": measurement_id, + "completed": completed, + "keep": keep + }) + return measurements + +if __name__ == "__main__": + import sys + if len(sys.argv) != 3: + print("Usage: {0} [input_file] [output_file]".format(sys.argv[0])) + sys.exit(1) + generate_summary(sys.argv[1], sys.argv[2]) diff --git a/ooni/results.py b/ooni/results.py deleted file mode 100644 index 39477a1..0000000 --- a/ooni/results.py +++ /dev/null @@ -1,40 +0,0 @@ -import json - -class Process(): - supported_tests = [ - "web_connectivity" - ] - @staticmethod - def web_connectivity(entry): - result = {} - result['anomaly'] = False - if entry['test_keys']['blocking'] is not False: - result['anomaly'] = True - result['url'] = entry['input'] - return result - -def generate_summary(input_file, output_file): - results = {} - with open(input_file) as in_file: - for idx, line in enumerate(in_file): - entry = json.loads(line.strip()) - result = {} - if entry['test_name'] in Process.supported_tests: - result = getattr(Process, entry['test_name'])(entry) - result['idx'] = idx - results['test_name'] = entry['test_name'] - results['test_start_time'] = entry['test_start_time'] - results['country_code'] = entry['probe_cc'] - results['asn'] = entry['probe_asn'] - results['results'] = results.get('results', []) - results['results'].append(result) - - with open(output_file, "w") as fw: - json.dump(results, fw) - -if __name__ == "__main__": - import sys - if len(sys.argv) != 3: - print("Usage: {0} [input_file] [output_file]".format(sys.argv[0])) - sys.exit(1) - generate_summary(sys.argv[1], sys.argv[2]) diff --git a/ooni/scripts/oonireport.py b/ooni/scripts/oonireport.py index f59a843..13d8473 100644 --- a/ooni/scripts/oonireport.py +++ b/ooni/scripts/oonireport.py @@ -122,6 +122,14 @@ def upload_all(collector=None, bouncer=None, upload_incomplete=False): except Exception as exc: log.exception(exc)
+ if upload_incomplete: + reports_to_upload = yield oonib_report_log.get_incomplete() + for report_file, value in reports_to_upload: + try: + yield upload(report_file, collector, bouncer, + value['measurement_id']) + except Exception as exc: + log.exception(exc)
def print_report(report_file, value): print("* %s" % report_file) diff --git a/ooni/ui/web/client/index.html b/ooni/ui/web/client/index.html index c6d83b7..7ba33fb 100644 --- a/ooni/ui/web/client/index.html +++ b/ooni/ui/web/client/index.html @@ -13,5 +13,5 @@ <app> Loading... </app> - <script type="text/javascript" src="app.bundle.js?16bac0b4c21c5b120b04"></script></body> + <script type="text/javascript" src="app.bundle.js?afba2f26969b4c8f00ec"></script></body> </html> diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py index a6fd315..9d98e62 100644 --- a/ooni/ui/web/server.py +++ b/ooni/ui/web/server.py @@ -20,7 +20,7 @@ from ooni.deck import NGDeck from ooni.settings import config from ooni.utils import log from ooni.director import DirectorEvent -from ooni.results import generate_summary +from ooni.measurements import generate_summary from ooni.geoip import probe_ip
config.advanced.debug = True @@ -64,7 +64,8 @@ def xsrf_protect(check=True): if (token_cookie != instance._xsrf_token and instance._enable_xsrf_protection): request.addCookie(u'XSRF-TOKEN', - instance._xsrf_token) + instance._xsrf_token, + path=u'/') if should_check and token_cookie != token_header: raise WebUIError(404, "Invalid XSRF token") return f(instance, request, *a, **kw) @@ -353,6 +354,41 @@ class WebUIAPI(object):
return self.render_json(r, request)
+ @app.route('/api/measurement/string:measurement_id', methods=["DELETE"]) + @xsrf_protect(check=True) + def api_measurement_delete(self, request, measurement_id): + try: + measurement_dir = self.measurement_path.child(measurement_id) + except InsecurePath: + raise WebUIError(500, "invalid measurement id") + + if measurement_dir.child("measurements.njson.progress").exists(): + raise WebUIError(400, "measurement in progress") + + try: + measurement_dir.remove() + except: + raise WebUIError(400, "Failed to delete report") + + return self.render_json({"result": "ok"}, request) + + @app.route('/api/measurement/string:measurement_id/keep', methods=["POST"]) + @xsrf_protect(check=True) + def api_measurement_keep(self, request, measurement_id): + try: + measurement_dir = self.measurement_path.child(measurement_id) + except InsecurePath: + raise WebUIError(500, "invalid measurement id") + + if measurement_dir.child("measurements.njson.progress").exists(): + raise WebUIError(400, "measurement in progress") + + summary = measurement_dir.child("keep") + with summary.open("w+") as f: + pass + + return self.render_json({"result": "ok"}, request) + @app.route('/api/measurement/string:measurement_id/int:idx', methods=["GET"]) @xsrf_protect(check=False) diff --git a/ooni/utils/__init__.py b/ooni/utils/__init__.py index f8ee953..35d419d 100644 --- a/ooni/utils/__init__.py +++ b/ooni/utils/__init__.py @@ -92,6 +92,9 @@ def randomStr(length, num=True): chars += string.digits return ''.join(random.choice(chars) for x in range(length))
+LONG_DATE = "%Y-%m-%d %H:%M:%S" +SHORT_DATE = "%Y%m%dT%H%M%SZ" + def generate_filename(test_details, prefix=None, extension=None): """ Returns a filename for every test execution. @@ -99,9 +102,6 @@ def generate_filename(test_details, prefix=None, extension=None): It's used to assure that all files of a certain test have a common basename but different extension. """ - LONG_DATE = "%Y-%m-%d %H:%M:%S" - SHORT_DATE = "%Y%m%dT%H%M%SZ" - kwargs = {} filename_format = "" if prefix is not None:
tor-commits@lists.torproject.org