[tor-commits] [ooni-probe/master] Use a Mediator pattern to keep track of which tests have failed

isis at torproject.org isis at torproject.org
Sun Mar 10 01:57:01 UTC 2013


commit e10008eb9d9f1df1d1ff9b3f7c69092e101c7716
Author: Arturo Filastò <art at fuffa.org>
Date:   Sun Jan 13 20:21:02 2013 +0100

    Use a Mediator pattern to keep track of which tests have failed
---
 ooni/managers.py |   47 +++++------------------------------------
 ooni/nettest.py  |   24 ++++++++++++++++++--
 ooni/reporter.py |   16 +++++++++++++-
 ooni/tasks.py    |   61 ++++++++++++++++++++++++++++++++++++++++++++++--------
 4 files changed, 94 insertions(+), 54 deletions(-)

diff --git a/ooni/managers.py b/ooni/managers.py
index dd748ae..3983705 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -63,7 +63,6 @@ class TaskManager(object):
         self._fillSlots()
 
         task.done.callback(result)
-
         self.succeeded(result, task)
 
     def _run(self, task):
@@ -155,50 +154,16 @@ class MeasurementManager(TaskManager):
     def failed(self, failure, measurement):
         self.director.measurementFailed(failure, measurement)
 
-class Report(object):
-    reportEntryManager = None
-
-    def __init__(self, reporters, net_test):
-        """
-        This will instantiate all the reporters and add them to the list of
-        available reporters.
-
-        net_test:
-            is a reference to the net_test to which the report object belongs to.
-        """
-        self.netTest = net_test
-        self.reporters = []
-        for r in reporters:
-            reporter = r()
-            self.reporters.append(reporter)
-
-        self.createReports()
-
-    def createReports(self):
-        """
-        This will create all the reports that need to be created.
-        """
-        for reporter in self.reporters:
-            reporter.createReport()
-
-    def write(self, measurement):
-        """
-        This will write to all the reporters, by waiting on the created
-        callback to fire.
-        """
-        for reporter in self.reporters:
-            @reporter.created.addCallback
-            def cb(result):
-                report_write_task = ReportWrite(reporter, measurement)
-                self.reportEntryManager.schedule(report_write_task)
-
-class ReportEntryManager(object):
 
+class ReportEntryManager(TaskManager):
     director = None
 
-    def succeeded(self, result, measurement):
+    def started(self, task):
         pass
 
-    def failed(self, failure, measurement):
+    def succeeded(self, result, task):
+        pass
+
+    def failed(self, failure, task):
         pass
 
diff --git a/ooni/nettest.py b/ooni/nettest.py
index 01dde72..31f2956 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -1,9 +1,10 @@
 import os
 
+from twisted.internet import defer, reactor
 from twisted.trial.runner import filenameToModule
 from twisted.python import usage, reflect
 
-from ooni.tasks import Measurement
+from ooni.tasks import Measurement, TaskMediator
 from ooni.utils import log, checkForRoot, NotRootError
 
 from inspect import getmembers
@@ -28,6 +29,18 @@ class NetTest(object):
         self.report = report
         self.test_cases = self.loadNetTest(net_test_file)
 
+        self.allMeasurementsDone = defer.Deferred()
+        self.allReportsDone = defer.Deferred()
+
+        # This should fire when all the measurements have been completed and
+        # all the reports are done. Done means that they have either completed
+        # successfully or all the possible retries have been reached.
+        # self.done = defer.DeferredList([self.allMeasurementsDone,
+        #     self.allReportsDone])
+
+        # XXX Fire the done when also all the reporting tasks have been completed.
+        self.done = self.allMeasurementsDone
+
     def start(self):
         """
         Set up tests and start running.
@@ -36,6 +49,8 @@ class NetTest(object):
         self.setUpNetTestCases()
         self.measurementManager.schedule(self.generateMeasurements())
 
+        return self.done
+
     def loadNetTest(self, net_test_file):
         """
         Creates all the necessary test_cases (a list of tuples containing the
@@ -111,7 +126,7 @@ class NetTest(object):
 
     def succeeded(self, measurement):
         """
-        This gets called when a measurement has failed.
+        This gets called when a measurement has succeeded.
         """
         self.report.write(measurement)
 
@@ -120,12 +135,15 @@ class NetTest(object):
         This is a generator that yields measurements and sets their timeout
         value and their netTest attribute.
         """
+
+        task_mediator = TaskMediator(self.allMeasurementsDone)
         for test_class, test_method in self.test_cases:
             for test_input in test_class.inputs:
                 measurement = Measurement(test_class, test_method,
-                        test_input, self)
+                        test_input, self, task_mediator)
                 measurement.netTest = self
                 yield measurement
+        task_mediator.allTasksScheduled()
 
     def setUpNetTestCases(self):
         """
diff --git a/ooni/reporter.py b/ooni/reporter.py
index 637131c..b3a7244 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -31,6 +31,8 @@ from ooni.utils.net import BodyReceiver, StringProducer, userAgents
 
 from ooni import config
 
+from ooni.tasks import ReportEntry, TaskMediator
+
 def createPacketReport(packet_list):
     """
     Takes as input a packet a list.
@@ -391,6 +393,11 @@ class Report(object):
 
         self.createReports()
 
+        self.done = defer.Deferred()
+        self.done.addCallback(self.finish)
+
+        self.report_mediator = TaskMediator(self.done)
+
     def createReports(self):
         """
         This will create all the reports that need to be created.
@@ -406,6 +413,13 @@ class Report(object):
         for reporter in self.reporters:
             @reporter.created.addCallback
             def cb(result):
-                report_write_task = ReportWrite(reporter, measurement)
+                report_write_task = ReportEntry(reporter, measurement,
+                        self.report_mediator)
                 self.reportEntryManager.schedule(report_write_task)
 
+    def finish(self):
+        for reporter in self.reporters:
+            d = defer.maybeDeferred(reporter.finish)
+            dl.append(d)
+        return defer.DeferredList(dl)
+
diff --git a/ooni/tasks.py b/ooni/tasks.py
index 981f5e1..3fdf083 100644
--- a/ooni/tasks.py
+++ b/ooni/tasks.py
@@ -5,7 +5,11 @@ from twisted.internet import defer, reactor
 class BaseTask(object):
     _timer = None
 
-    def __init__(self):
+    def __init__(self, mediator=None):
+        """
+        If you want to schedule a task multiple times, remember to create fresh
+        instances of it.
+        """
         self.running = False
         self.failures = 0
 
@@ -16,6 +20,9 @@ class BaseTask(object):
         # final status, this means: all retries have been attempted or the test
         # has successfully executed.
         self.done = defer.Deferred()
+        if mediator:
+            mediator.created()
+            self.done.addCallback(mediator.taskDone)
 
     def _failed(self, failure):
         self.failures += 1
@@ -90,7 +97,8 @@ class TaskWithTimeout(BaseTask):
         pass
 
 class Measurement(TaskWithTimeout):
-    def __init__(self, test_class, test_method, test_input, net_test):
+    def __init__(self, test_class, test_method, test_input, net_test,
+            mediator):
         """
         test_class:
             is the class, subclass of NetTestCase, of the test to be run
@@ -115,24 +123,59 @@ class Measurement(TaskWithTimeout):
 
         self.netTest = net_test
 
-    def succeeded(self):
-        self.net_test.succeeded(self)
+        TaskWithTimeout.__init__(self, mediator)
 
-    def failed(self):
+    def succeeded(self, result):
+        return self.netTest.succeeded(self)
+
+    def failed(self, failure):
         pass
 
     def timedOut(self):
-        self.net_test.timedOut()
+        self.netTest.timedOut()
 
     def run(self):
-        return defer.maybeDeferred(self.test)
+        d = defer.maybeDeferred(self.test)
+        return d
 
 class ReportEntry(TaskWithTimeout):
-    def __init__(self, reporter, measurement):
+    def __init__(self, reporter, measurement, task_mediator):
         self.reporter = reporter
         self.measurement = measurement
-        TaskWithTimeout.__init__(self)
+
+        TaskWithTimeout.__init__(self, task_mediator)
 
     def run(self):
         return self.reporter.writeReportEntry(self.measurement)
 
+
+class TaskMediator(object):
+    def __init__(self, allTasksDone):
+        """
+        This implements a Mediator/Observer pattern to keep track of when Tasks
+        that are logically linked together have all reached a final done stage.
+
+        Args:
+            allTasksDone is a deferred that will get fired once all the tasks
+            have been completed.
+        """
+        self.doneTasks = 0
+        self.tasks = 0
+
+        self.completedScheduling = False
+
+        self.allTasksDone = allTasksDone
+
+    def created(self):
+        self.tasks += 1
+
+    def taskDone(self, result):
+        self.doneTasks += 1
+        if self.completedScheduling and \
+                self.doneTasks == self.tasks:
+            self.allTasksDone.callback(None)
+
+    def allTasksScheduled(self):
+        self.completedScheduling = True
+
+





More information about the tor-commits mailing list