[tor-commits] [ooni-probe/master] Feature/test progress (#719)

art at torproject.org art at torproject.org
Fri Sep 22 18:41:06 UTC 2017


commit b8d48b0887aa4ef8ba4ec4d26baa24515472c586
Author: Arturo Filastò <arturo at filasto.net>
Date:   Fri Jan 27 10:59:20 2017 +0000

    Feature/test progress (#719)
    
    * Add support for showing the test progress
    
    * Add progress to API
    
    * Expose anomaly inside of top level measurement listing
    
    * Differentiate between danger and warning anomaly
---
 ooni/deck/deck.py     |  5 +++--
 ooni/director.py      |  7 ++++++-
 ooni/measurements.py  | 25 ++++++++++++++++++++---
 ooni/nettest.py       | 56 ++++++++++++++++++++++++++++++++++++++++++++++++---
 ooni/ui/web/server.py | 10 ++++++++-
 5 files changed, 93 insertions(+), 10 deletions(-)

diff --git a/ooni/deck/deck.py b/ooni/deck/deck.py
index cb52bb22..2060b6ec 100644
--- a/ooni/deck/deck.py
+++ b/ooni/deck/deck.py
@@ -37,13 +37,13 @@ def options_to_args(options):
     for k, v in options.items():
         if v is None:
             continue
-        if v == False:
+        if v is False:
             continue
         if (len(k)) == 1:
             args.append('-'+k)
         else:
             args.append('--'+k)
-        if v == True:
+        if v is True:
             continue
         args.append(v)
     return args
@@ -218,6 +218,7 @@ class NGDeck(object):
             generate_summary(
                 measurement_dir.child("measurements.njson").path,
                 measurement_dir.child("summary.json").path,
+                measurement_dir.child("anomaly").path,
                 deck_id=self.id
             )
             measurement_dir.child("running.pid").remove()
diff --git a/ooni/director.py b/ooni/director.py
index f7566f5f..0b949db6 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -72,6 +72,8 @@ class Director(object):
         self.activeNetTests = []
         self.activeDecks = []
 
+        self.activeMeasurements = {}
+
         self.measurementManager = MeasurementManager()
         self.measurementManager.director = self
 
@@ -307,7 +309,6 @@ class Director(object):
             log.error("Completed deck {0} is not actually running".format(
                 deck_id))
 
-
     def isDeckRunning(self, deck_id, from_schedule):
         """
         :param deck_id: the ID of the deck to check if it's running
@@ -358,11 +359,15 @@ class Director(object):
         yield net_test.initialize()
         try:
             self.activeNetTests.append(net_test)
+            if measurement_id:
+                self.activeMeasurements[measurement_id] = net_test
             self.measurementManager.schedule(net_test.generateMeasurements())
 
             yield net_test.done
             yield report.close()
         finally:
+            if measurement_id:
+                del self.activeMeasurements[measurement_id]
             self.netTestDone(net_test)
 
     def start_sniffing(self, test_details):
diff --git a/ooni/measurements.py b/ooni/measurements.py
index 2a4440ad..4188f66f 100644
--- a/ooni/measurements.py
+++ b/ooni/measurements.py
@@ -23,6 +23,10 @@ class MeasurementTypes():
         result['anomaly'] = False
         if entry['test_keys']['blocking'] is not False:
             result['anomaly'] = True
+            if entry['test_keys']['blocking'] is None:
+                result['anomaly_type'] = 'warning'
+            else:
+                result['anomaly_type'] = 'danger'
         result['url'] = entry['input']
         return result
 
@@ -32,6 +36,7 @@ class MeasurementTypes():
         result['anomaly'] = False
         if entry['test_keys']['connection'] != "success":
             result['anomaly'] = True
+            result['anomaly_type'] = 'danger'
         result['url'] = entry['input']
         return result
 
@@ -48,12 +53,15 @@ class MeasurementTypes():
             )
         )
         result['anomaly'] = anomaly
+        if anomaly is True:
+            result['anomaly_type'] = 'danger'
         result['url'] = entry['input']
         return result
 
 
-def generate_summary(input_file, output_file, deck_id='none'):
+def generate_summary(input_file, output_file, anomaly_file, deck_id='none'):
     results = {}
+    anomaly = False
     with open(input_file) as in_file:
         for idx, line in enumerate(in_file):
             entry = json.loads(line.strip())
@@ -61,6 +69,8 @@ def generate_summary(input_file, output_file, deck_id='none'):
             if entry['test_name'] in MeasurementTypes.supported_tests:
                 result = getattr(MeasurementTypes, entry['test_name'])(entry)
             result['idx'] = idx
+            if result.get('anomaly', None) is True:
+                anomaly = True
             if not result.get('url', None):
                 result['url'] = entry['input']
             results['test_name'] = entry['test_name']
@@ -73,6 +83,8 @@ def generate_summary(input_file, output_file, deck_id='none'):
 
     with open(output_file, "w") as fw:
         json.dump(results, fw)
+    if anomaly is True:
+        with open(anomaly_file, 'w') as _: pass
     return results
 
 
@@ -91,6 +103,7 @@ def get_measurement(measurement_id, compute_size=False):
     completed = True
     keep = False
     stale = False
+    anomaly = False
     if measurement.child("measurements.njson.progress").exists():
         completed = False
         try:
@@ -106,6 +119,9 @@ def get_measurement(measurement_id, compute_size=False):
     if measurement.child("keep").exists():
         keep = True
 
+    if measurement.child("anomaly").exists():
+        anomaly = True
+
     if compute_size is True:
         size = directory_usage(measurement.path)
 
@@ -125,7 +141,8 @@ def get_measurement(measurement_id, compute_size=False):
         "running": running,
         "stale": stale,
         "size": size,
-        "deck_id": deck_id
+        "deck_id": deck_id,
+        "anomaly": anomaly
     }
 
 
@@ -142,11 +159,13 @@ def get_summary(measurement_id):
         return defer.fail(MeasurementInProgress)
 
     summary = measurement.child("summary.json")
+    anomaly = measurement.child("anomaly")
     if not summary.exists():
         return deferToThread(
             generate_summary,
             measurement.child("measurements.njson").path,
-            summary.path
+            summary.path,
+            anomaly.path
         )
 
     with summary.open("r") as f:
diff --git a/ooni/nettest.py b/ooni/nettest.py
index 5ff54858..edfd9cd7 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -1,7 +1,8 @@
 import os
 import re
-import time
 import sys
+import copy
+import time
 
 from twisted.internet import defer
 from twisted.python.filepath import FilePath
@@ -503,6 +504,10 @@ class NetTest(object):
         self.testDetails = test_details
         self.testCases = test_cases
 
+        self._startTime = 0
+        self._totalInputs = 0
+        self._completedInputs = 0
+
         self.summary = {}
 
         # This will fire when all the measurements have been completed and
@@ -533,6 +538,27 @@ class NetTest(object):
         if self.testDetails["report_id"]:
             log.msg("Report ID: %s" % self.testDetails["report_id"])
 
+    @property
+    def completionRate(self):
+        return float(self._completedInputs) / (time.time() - self._startTime)
+
+    @property
+    def completionPercentage(self):
+        if self._totalInputs == 0:
+            return 0.0
+        # Never return 100%
+        if self._completedInputs >= self._totalInputs:
+            return 0.99
+        return float(self._completedInputs) / float(self._totalInputs)
+
+    @property
+    def completionEta(self):
+        remaining_inputs = self._totalInputs - self._completedInputs
+        # We adjust for negative values
+        if remaining_inputs <= 0:
+            return 1
+        return (self.completionRate * remaining_inputs) * 1.5 # fudge factor
+
     def doneReport(self, report_results):
         """
         This will get called every time a report is done and therefore a
@@ -541,6 +567,14 @@ class NetTest(object):
         The state for the NetTest is informed of the fact that another task has
         reached the done state.
         """
+        self._completedInputs += 1
+        log.msg("")
+        log.msg("Status")
+        log.msg("------")
+        log.msg("%d completed %d remaining" % (self._completedInputs,
+                                               self._totalInputs))
+        log.msg("%0.1f%% (ETA: %ds)" % (self.completionPercentage * 100,
+                                        self.completionEta))
         self.state.taskDone()
 
         return report_results
@@ -574,11 +608,17 @@ class NetTest(object):
 
     @defer.inlineCallbacks
     def initialize(self):
-        for test_class, _ in self.testCases:
+        for test_class, test_cases in self.testCases:
             # Initialize Input Processor
+            test_instance = test_class()
             test_class.inputs = yield defer.maybeDeferred(
-                test_class().getInputProcessor
+                test_instance.getInputProcessor
             )
+            for _ in test_cases:
+                if test_instance._totalInputs != None:
+                    self._totalInputs += test_instance._totalInputs
+                else:
+                    self._totalInputs += 1
 
             # Run the setupClass method
             yield defer.maybeDeferred(
@@ -593,6 +633,7 @@ class NetTest(object):
         FIXME: If this generator throws exception TaskManager scheduler is
         irreversibly damaged.
         """
+        self._startTime = time.time()
 
         for test_class, test_methods in self.testCases:
             # load a singular input processor for all instances
@@ -730,6 +771,8 @@ class NetTestCase(object):
 
     localOptions = {}
 
+    _totalInputs = None
+
     @classmethod
     def setUpClass(cls):
         """
@@ -853,9 +896,16 @@ class NetTestCase(object):
             inputProcessor.
         """
         if self.inputFileSpecified:
+            if self._totalInputs is None:
+                self._totalInputs = 0
             self.inputFilename = self.localOptions[self.inputFile[0]]
+            for _ in self.inputProcessor(self.inputFilename):
+                self._totalInputs += 1
             return self.inputProcessor(self.inputFilename)
 
+        if isinstance(self.inputs, list):
+            self._totalInputs = len(self.inputs)
+
         if self.inputs:
             return self.inputs
 
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index a79c1e19..37b93dba 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -472,7 +472,7 @@ class WebUIAPI(object):
                 400, 'Missing required option: "{}"'.format(option_name)
             )
 
-        except usage.UsageError:
+        except usage.UsageError as ue:
             raise WebUIError(
                 400, 'Error in parsing options'
             )
@@ -528,6 +528,14 @@ class WebUIAPI(object):
     @requires_true(attrs=['_is_initialized'])
     def api_measurement_list(self, request):
         measurements = list_measurements(order='desc')
+        for measurement in measurements:
+            if measurement['running'] == False:
+                continue
+            try:
+                net_test = self.director.activeMeasurements[measurement['id']]
+                measurement['progress'] = net_test.completionPercentage * 100
+            except KeyError:
+                log.err("Did not find measurement with ID %s" % measurement['id'])
         return self.render_json({"measurements": measurements}, request)
 
     @app.route('/api/measurement/<string:measurement_id>', methods=["GET"])





More information about the tor-commits mailing list