[tor-commits] [ooni-probe/master] Write ooniprobe reports in JSON format on disk

art at torproject.org art at torproject.org
Mon Sep 19 12:14:24 UTC 2016


commit 1ec88b611d77048b8281d3358b20883388bd8283
Author: Arturo Filastò <arturo at filasto.net>
Date:   Thu Jul 14 21:30:01 2016 +0200

    Write ooniprobe reports in JSON format on disk
    
    * Implement various API endpoints
---
 ooni/measurements.py   |  19 +++----
 ooni/reporter.py       | 116 ++++++++++++++++++++++++++++------------
 ooni/settings.py       |   3 +-
 ooni/ui/web/server.py  | 142 +++++++++++++++++++++++++++----------------------
 ooni/ui/web/web.py     |   7 ++-
 ooni/utils/__init__.py |   6 ++-
 6 files changed, 179 insertions(+), 114 deletions(-)

diff --git a/ooni/measurements.py b/ooni/measurements.py
index 5244ea4..976b125 100644
--- a/ooni/measurements.py
+++ b/ooni/measurements.py
@@ -9,26 +9,27 @@ class GenerateResults(object):
         self.input_file = input_file
 
     def process_web_connectivity(self, entry):
-        anomaly = {}
-        anomaly['result'] = False
+        result = {}
+        result['anomaly'] = False
         if entry['test_keys']['blocking'] is not False:
-            anomaly['result'] = True
-        anomaly['url'] = entry['input']
-        return anomaly
+            result['anomaly'] = True
+        result['url'] = entry['input']
+        return result
 
     def output(self, output_file):
         results = {}
         with open(self.input_file) as in_file:
-            for line in in_file:
+            for idx, line in enumerate(in_file):
                 entry = json.loads(line.strip())
                 if entry['test_name'] not in self.supported_tests:
                     raise Exception("Unsupported test")
-                anomaly = getattr(self, 'process_'+entry['test_name'])(entry)
+                result = getattr(self, 'process_'+entry['test_name'])(entry)
+                result['idx'] = idx
                 results['test_name'] = entry['test_name']
                 results['country_code'] = entry['probe_cc']
                 results['asn'] = entry['probe_asn']
-                results['anomalies'] = results.get('anomalies', [])
-                results['anomalies'].append(anomaly)
+                results['results'] = results.get('results', [])
+                results['results'].append(result)
 
         with open(output_file, "w") as fw:
             json.dump(results, fw)
diff --git a/ooni/reporter.py b/ooni/reporter.py
index f76fada..f07b3cf 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -1,5 +1,6 @@
 import uuid
 import yaml
+import json
 import os
 
 from copy import deepcopy
@@ -206,6 +207,56 @@ class YAMLReporter(OReporter):
     def finish(self):
         self._stream.close()
 
+class NJSONReporter(OReporter):
+
+    """
+    report_destination:
+        the destination directory of the report
+
+    """
+
+    def __init__(self, test_details, report_filename):
+        self.report_path = report_filename
+        OReporter.__init__(self, test_details)
+
+    def _writeln(self, line):
+        self._write("%s\n" % line)
+
+    def _write(self, data):
+        if not self._stream:
+            raise errors.ReportNotCreated
+        if self._stream.closed:
+            raise errors.ReportAlreadyClosed
+        s = str(data)
+        assert isinstance(s, type(''))
+        self._stream.write(s)
+        untilConcludes(self._stream.flush)
+
+    def writeReportEntry(self, entry):
+        if isinstance(entry, Measurement):
+            e = deepcopy(entry.testInstance.report)
+        elif isinstance(entry, dict):
+            e = deepcopy(entry)
+        else:
+            raise Exception("Failed to serialise entry")
+        report_entry = {
+            'input': e.pop('input', None),
+            'id': str(uuid.uuid4()),
+            'test_start_time': e.pop('test_start_time', None),
+            'measurement_start_time': e.pop('measurement_start_time', None),
+            'test_runtime': e.pop('test_runtime', None),
+            'test_keys': e
+        }
+        report_entry.update(self.testDetails)
+        self._write(json.dumps(report_entry))
+        self._write("\n")
+
+    def createReport(self):
+        self._stream = open(self.report_path, 'w+')
+
+    def finish(self):
+        self._stream.close()
+
 
 class OONIBReporter(OReporter):
 
@@ -219,25 +270,20 @@ class OONIBReporter(OReporter):
     def serializeEntry(self, entry, serialisation_format="yaml"):
         if serialisation_format == "json":
             if isinstance(entry, Measurement):
-                report_entry = {
-                    'input': entry.testInstance.report.pop('input', None),
-                    'id': str(uuid.uuid4()),
-                    'test_start_time': entry.testInstance.report.pop('test_start_time', None),
-                    'measurement_start_time': entry.testInstance.report.pop('measurement_start_time', None),
-                    'test_runtime': entry.testInstance.report.pop('test_runtime', None),
-                    'test_keys': entry.testInstance.report
-                }
+                e = deepcopy(entry.testInstance.report)
+
             elif isinstance(entry, dict):
-                report_entry = {
-                    'input': entry.pop('input', None),
-                    'id': str(uuid.uuid4()),
-                    'test_start_time': entry.pop('test_start_time', None),
-                    'measurement_start_time': entry.pop('measurement_start_time', None),
-                    'test_runtime': entry.pop('test_runtime', None),
-                    'test_keys': entry
-                }
+                e = deepcopy(entry)
             else:
                 raise Exception("Failed to serialise entry")
+            report_entry = {
+                'input': e.pop('input', None),
+                'id': str(uuid.uuid4()),
+                'test_start_time': e.pop('test_start_time', None),
+                'measurement_start_time': e.pop('measurement_start_time', None),
+                'test_runtime': e.pop('test_runtime', None),
+                'test_keys': e
+            }
             report_entry.update(self.testDetails)
             return report_entry
         else:
@@ -468,7 +514,7 @@ class Report(object):
 
     def __init__(self, test_details, report_filename,
                  reportEntryManager, collector_client=None,
-                 no_yamloo=False):
+                 no_njson=False):
         """
         This is an abstraction layer on top of all the configured reporters.
 
@@ -499,9 +545,9 @@ class Report(object):
 
         self.report_log = OONIBReportLog()
 
-        self.yaml_reporter = None
+        self.njson_reporter = None
         self.oonib_reporter = None
-        self.no_yamloo = no_yamloo
+        self.no_njson = no_njson
 
         self.done = defer.Deferred()
         self.reportEntryManager = reportEntryManager
@@ -509,7 +555,7 @@ class Report(object):
     def generateReportFilename(self):
         report_filename = generate_filename(self.test_details,
                                             prefix='report',
-                                            extension='yamloo')
+                                            extension='njson')
         report_path = os.path.join('.', report_filename)
         return os.path.abspath(report_path)
 
@@ -543,12 +589,12 @@ class Report(object):
                                                 self.collector_client)
             self.test_details['report_id'] = yield self.open_oonib_reporter()
 
-        if not self.no_yamloo:
-            self.yaml_reporter = YAMLReporter(self.test_details,
-                                              self.report_filename)
+        if not self.no_njson:
+            self.njson_reporter = NJSONReporter(self.test_details,
+                                                self.report_filename)
             if not self.oonib_reporter:
                 yield self.report_log.not_created(self.report_filename)
-            yield defer.maybeDeferred(self.yaml_reporter.createReport)
+            yield defer.maybeDeferred(self.njson_reporter.createReport)
 
         defer.returnValue(self.reportId)
 
@@ -570,7 +616,7 @@ class Report(object):
         d = defer.Deferred()
         deferreds = []
 
-        def yaml_report_failed(failure):
+        def njson_report_failed(failure):
             d.errback(failure)
 
         def oonib_report_failed(failure):
@@ -580,11 +626,11 @@ class Report(object):
             if not d.called:
                 d.callback(None)
 
-        if self.yaml_reporter:
-            write_yaml_report = ReportEntry(self.yaml_reporter, measurement)
-            self.reportEntryManager.schedule(write_yaml_report)
-            write_yaml_report.done.addErrback(yaml_report_failed)
-            deferreds.append(write_yaml_report.done)
+        if self.njson_reporter:
+            write_njson_report = ReportEntry(self.njson_reporter, measurement)
+            self.reportEntryManager.schedule(write_njson_report)
+            write_njson_report.done.addErrback(njson_report_failed)
+            deferreds.append(write_njson_report.done)
 
         if self.oonib_reporter:
             write_oonib_report = ReportEntry(self.oonib_reporter, measurement)
@@ -609,7 +655,7 @@ class Report(object):
         d = defer.Deferred()
         deferreds = []
 
-        def yaml_report_failed(failure):
+        def njson_report_failed(failure):
             d.errback(failure)
 
         def oonib_report_closed(result):
@@ -623,10 +669,10 @@ class Report(object):
             if not d.called:
                 d.callback(None)
 
-        if self.yaml_reporter:
-            close_yaml = defer.maybeDeferred(self.yaml_reporter.finish)
-            close_yaml.addErrback(yaml_report_failed)
-            deferreds.append(close_yaml)
+        if self.njson_reporter:
+            close_njson = defer.maybeDeferred(self.njson_reporter.finish)
+            close_njson.addErrback(njson_report_failed)
+            deferreds.append(close_njson)
 
         if self.oonib_reporter:
             close_oonib = self.oonib_reporter.finish()
diff --git a/ooni/settings.py b/ooni/settings.py
index ffbf68e..5245451 100644
--- a/ooni/settings.py
+++ b/ooni/settings.py
@@ -106,7 +106,8 @@ class OConfig(object):
         else:
             self.decks_directory = os.path.join(self.ooni_home, 'decks')
 
-        self.reports_directory = os.path.join(self.ooni_home, 'reports')
+        self.measurements_directory = os.path.join(self.ooni_home,
+                                                   'measurements')
         self.resources_directory = os.path.join(self.data_directory,
                                                 "resources")
         if self.advanced.report_log_file:
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index 5be581c..ccc5d87 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -3,34 +3,20 @@ from __future__ import print_function
 import os
 import json
 
+from twisted.internet import defer
 from twisted.python import usage
 from twisted.python.filepath import FilePath, InsecurePath
 from twisted.web import static
 
 from klein import Klein
+from werkzeug.exceptions import NotFound
 
-from ooni.settings import config
 from ooni import errors
+from ooni.deck import Deck
+from ooni.settings import config
 from ooni.nettest import NetTestLoader
 from ooni.measurements import GenerateResults
-
-class RouteNotFound(Exception):
-    def __init__(self, path, method):
-        self._path = path
-        self._method = method
-
-    def __repr__(self):
-        return "<RouteNotFound {0} {1}>".format(self._path,
-                                                self._method)
-
-def _resolvePath(request):
-    path = b''
-    if request.postpath:
-        path = b'/'.join(request.postpath)
-
-        if not path.startswith(b'/'):
-            path = b'/' + path
-    return path
+from ooni.utils import generate_filename
 
 def rpath(*path):
     context = os.path.abspath(os.path.dirname(__file__))
@@ -48,6 +34,9 @@ def getNetTestLoader(test_options, test_file):
         """
     options = []
     for k, v in test_options.items():
+        if v is None:
+            print("Skipping %s because none" % k)
+            continue
         options.append('--'+k)
         options.append(v)
 
@@ -61,6 +50,11 @@ class WebUIAPI(object):
     def __init__(self, config, director):
         self.director = director
         self.config = config
+        self.active_measurements = {}
+
+    @app.handle_errors(NotFound)
+    def not_found(self, request, _):
+        request.redirect('/client/')
 
     def render_json(self, obj, request):
         json_string = json.dumps(obj) + "\n"
@@ -68,28 +62,43 @@ class WebUIAPI(object):
         request.setHeader('Content-Length', len(json_string))
         return json_string
 
-    @app.route('/api/decks/generate', methods=["GET"])
-    def generate_decks(self, request):
+    @app.route('/api/deck/generate', methods=["GET"])
+    def api_deck_generate(self, request):
         return self.render_json({"generate": "deck"}, request)
 
-    @app.route('/api/decks/<string:deck_name>/start', methods=["POST"])
-    def start_deck(self, request, deck_name):
+    @app.route('/api/deck/<string:deck_name>/start', methods=["POST"])
+    def api_deck_start(self, request, deck_name):
         return self.render_json({"start": deck_name}, request)
 
-    @app.route('/api/decks/<string:deck_name>/stop', methods=["POST"])
-    def stop_deck(self, request, deck_name):
-        return self.render_json({"stop": deck_name}, request)
-
-    @app.route('/api/decks/<string:deck_name>', methods=["GET"])
-    def deck_status(self, request, deck_name):
-        return self.render_json({"status": deck_name}, request)
-
-    @app.route('/api/decks', methods=["GET"])
-    def deck_list(self, request):
+    @app.route('/api/deck', methods=["GET"])
+    def api_deck_list(self, request):
         return self.render_json({"command": "deck-list"}, request)
 
-    @app.route('/api/net-tests/<string:test_name>/start', methods=["POST"])
-    def test_start(self, request, test_name):
+    @defer.inlineCallbacks
+    def run_deck(self, deck):
+        yield deck.setup()
+        measurement_ids = []
+        for net_test_loader in deck.netTestLoaders:
+            # XXX synchronize this with startNetTest
+            test_details = net_test_loader.getTestDetails()
+            measurement_id = generate_filename(test_details)
+
+            measurement_dir = os.path.join(
+                config.measurements_directory,
+                measurement_id
+            )
+            os.mkdir(measurement_dir)
+            report_filename = os.path.join(measurement_dir,
+                                           "measurements.njson")
+            measurement_ids.append(measurement_id)
+            self.active_measurements[measurement_id] = {
+                'test_name': test_details['test_name'],
+                'test_start_time': test_details['test_start_time']
+            }
+            self.director.startNetTest(net_test_loader, report_filename)
+
+    @app.route('/api/nettest/<string:test_name>/start', methods=["POST"])
+    def api_nettest_start(self, request, test_name):
         try:
             net_test = self.director.netTests[test_name]
         except KeyError:
@@ -99,21 +108,19 @@ class WebUIAPI(object):
                 'error_message': 'Could not find the specified test'
             }, request)
         try:
-            test_options = json.load(request.content.read())
+            test_options = json.load(request.content)
         except ValueError:
             return self.render_json({
                 'error_code': 500,
                 'error_message': 'Invalid JSON message recevied'
             }, request)
 
+        deck = Deck(no_collector=True) # XXX remove no_collector
         net_test_loader = getNetTestLoader(test_options, net_test['path'])
         try:
-            net_test_loader.checkOptions()
-            # XXX we actually want to generate the report_filename in a smart
-            # way so that we can know where it is located and learn the results
-            # of the measurement.
-            report_filename = None
-            self.director.startNetTest(net_test_loader, report_filename)
+            deck.insert(net_test_loader)
+            self.run_deck(deck)
+
         except errors.MissingRequiredOption, option_name:
             request.setResponseCode(500)
             return self.render_json({
@@ -134,25 +141,18 @@ class WebUIAPI(object):
                 'error_message': 'Insufficient priviledges'
             }, request)
 
-        return self.render_json({"deck": "list"}, request)
-
-    @app.route('/api/net-tests/<string:test_name>/start', methods=["POST"])
-    def test_stop(self, request, test_name):
-        return self.render_json({
-            "command": "test-stop",
-            "test-name": test_name
-        }, request)
-
-    @app.route('/api/net-tests/<string:test_name>', methods=["GET"])
-    def test_status(self, request, test_name):
-        return self.render_json({"command": "test-stop"}, request)
+        return self.render_json({"status": "started"}, request)
 
-    @app.route('/api/net-tests', methods=["GET"])
-    def test_list(self, request):
+    @app.route('/api/nettest', methods=["GET"])
+    def api_nettest_list(self, request):
         return self.render_json(self.director.netTests, request)
 
+    @app.route('/api/status', methods=["GET"])
+    def api_status(self):
+        return self.render_json()
+
     @app.route('/api/measurement', methods=["GET"])
-    def measurement_list(self, request):
+    def api_measurement_list(self, request):
         measurement_ids = os.listdir(os.path.join(config.ooni_home,
                                                   "measurements"))
         measurements = []
@@ -169,8 +169,8 @@ class WebUIAPI(object):
         return self.render_json({"measurements": measurements}, request)
 
     @app.route('/api/measurement/<string:measurement_id>', methods=["GET"])
-    def measurement_summary(self, request, measurement_id):
-        measurement_path = FilePath(config.ooni_home).child("measurements")
+    def api_measurement_summary(self, request, measurement_id):
+        measurement_path = FilePath(config.measurements_directory)
         try:
             measurement_dir = measurement_path.child(measurement_id)
         except InsecurePath:
@@ -189,13 +189,29 @@ class WebUIAPI(object):
 
     @app.route('/api/measurement/<string:measurement_id>/<int:idx>',
                methods=["GET"])
-    def measurement_open(self, request, measurement_id, idx):
-        return self.render_json({"command": "results"}, request)
+    def api_measurement_view(self, request, measurement_id, idx):
+        measurement_path = FilePath(config.measurements_directory)
+        try:
+            measurement_dir = measurement_path.child(measurement_id)
+        except InsecurePath:
+            return self.render_json({"error": "invalid measurement id"})
+        measurements = measurement_dir.child("measurements.njson")
+
+        # XXX maybe implement some caching here
+        with measurements.open("r") as f:
+            r = None
+            for f_idx, line in enumerate(f):
+                if f_idx == idx:
+                    r = json.loads(line)
+                    break
+            if r is None:
+                return self.render_json({"error": "Could not find measurement "
+                                                  "with this idx"}, request)
+        return self.render_json(r, request)
 
     @app.route('/client/', branch=True)
     def static(self, request):
-        path = rpath("build")
-        print(path)
+        path = rpath("client")
         return static.File(path)
 <<<<<<< acda284b56fa3a75acbe7d000fbdefb643839948
 
diff --git a/ooni/ui/web/web.py b/ooni/ui/web/web.py
index f709c18..6c6971c 100644
--- a/ooni/ui/web/web.py
+++ b/ooni/ui/web/web.py
@@ -24,10 +24,9 @@ class WebUIService(service.MultiService):
             root = server.Site(WebUIAPI(config, director).app.resource())
             self._port = reactor.listenTCP(self.portNum, root)
         director = Director()
-        #d = director.start()
-        #d.addCallback(_started)
-        #d.addErrback(self._startupFailed)
-        _started(None)
+        d = director.start()
+        d.addCallback(_started)
+        d.addErrback(self._startupFailed)
 
     def _startupFailed(self, err):
         log.err("Failed to start the director")
diff --git a/ooni/utils/__init__.py b/ooni/utils/__init__.py
index b28aadb..1aaac7b 100644
--- a/ooni/utils/__init__.py
+++ b/ooni/utils/__init__.py
@@ -97,18 +97,20 @@ def generate_filename(test_details, prefix=None, extension=None):
     extension.
     """
     LONG_DATE = "%Y-%m-%d %H:%M:%S"
-    SHORT_DATE = "%Y-%m-%dT%H%M%SZ"
+    SHORT_DATE = "%Y%m%dT%H%M%SZ"
 
     kwargs = {}
     filename_format = ""
     if prefix is not None:
         kwargs["prefix"] = prefix
         filename_format += "{prefix}-"
-    filename_format += "{test_name}-{timestamp}"
+    filename_format += "{timestamp}-{probe_cc}-{probe_asn}-{test_name}"
     if extension is not None:
         kwargs["extension"] = extension
         filename_format += ".{extension}"
     kwargs['test_name']  = test_details['test_name']
+    kwargs['probe_cc']  = test_details['probe_cc']
+    kwargs['probe_asn']  = test_details['probe_asn']
     kwargs['timestamp'] = datetime.strptime(test_details['test_start_time'],
                                             LONG_DATE).strftime(SHORT_DATE)
     return filename_format.format(**kwargs)





More information about the tor-commits mailing list