[tor-commits] [ooni-probe/master] Refactoring of reporting managers

isis at torproject.org isis at torproject.org
Sun Mar 10 01:57:01 UTC 2013


commit 055993061eb1a69a9e9876091d9b07f894deda32
Author: Arturo Filastò <art at fuffa.org>
Date:   Fri Jan 11 20:56:45 2013 +0100

    Refactoring of reporting managers
    
    Make the dependencies between modules much clearer
    Sketch out a graph that illustrates such depedencies
---
 ooni/managers.py |  281 ++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 209 insertions(+), 72 deletions(-)

diff --git a/ooni/managers.py b/ooni/managers.py
index 3383840..ec47ca7 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -3,7 +3,108 @@ import itertools
 from .ratelimiting import StaticRateLimiter
 from .measurements import Measurement, NetTest
 
-class MeasurementsManager(object):
+class TaskManager(object):
+    retries = 2
+
+    failures = []
+    concurrency = 10
+
+    _tasks = iter()
+    _active_tasks = []
+
+    def _failed(self, failure, task):
+        """
+        The has failed to complete, we append it to the end of the task chain
+        to be re-run once all the currently scheduled tasks have run.
+        """
+        self._active_tasks.remove(task)
+        self.failures.append((failure, task))
+
+        if task.failures < self.retries:
+            self._tasks = itertools.chain(self._tasks,
+                    iter(task))
+
+        self.fillSlots()
+
+        self.failed(failure, task)
+
+    def _fillSlots(self):
+        """
+        Called on test completion and schedules measurements to be run for the
+        available slots.
+        """
+        for _ in range(self.availableSlots()):
+            try:
+                task = self._tasks.next()
+                self._run(task)
+            except StopIteration:
+                break
+
+    def _suceeded(self, result, task):
+        """
+        We have successfully completed a measurement.
+        """
+        self._active_tasks.remove(task)
+        self.completedTasks += 1
+
+        self.fillSlots()
+
+        self.suceeded(result, task)
+
+    def _run(self, task):
+        """
+        This gets called to add a task to the list of currently active and
+        running tasks.
+        """
+        self._active_tasks.append(task)
+
+        d = task.run()
+        d.addCallback(self.succeeded)
+        d.addCallback(self.failed)
+
+    @property
+    def failedMeasurements(self):
+        return len(self.failures)
+
+    def availableSlots(self):
+        """
+        Returns the number of available slots for running tests.
+        """
+        return self.concurrency - len(self._active_tasks)
+
+    def schedule(self, task_or_task_iterator):
+        """
+        Takes as argument a single task or a task iterable and appends it to the task
+        generator queue.
+        """
+        self._tasks = itertools.chain(self._tasks, task_or_task_iterator)
+
+    def start(self):
+        self.initializeTaskList()
+        self._fillSlots()
+
+    def initializeTaskList(self):
+        """
+        This should contain all the logic that gets run at first start to
+        pre-populate the list of tasks to be run and the tasks currently
+        running.
+        """
+        raise NotImplemented
+
+    def failed(self, failure, task):
+        """
+        This method should be overriden by the subclass and should contains
+        logic for dealing with a failure that is subclass specific.
+
+        The default failure handling logic is to reschedule the task up until
+        we reach the maximum number of retries.
+        """
+        raise NotImplemented
+
+    def succeeded(self, result, task):
+        raise NotImplemented
+
+class MeasurementsManager(TaskManager):
     """
     This is the Measurement Tracker. In here we keep track of active measurements
     and issue new measurements once the active ones have been completed.
@@ -20,113 +121,149 @@ class MeasurementsManager(object):
     failures = []
     concurrency = 10
 
-    _measurements = iter()
-    _active_measurements = []
+    director = None
 
-    def __init__(self, manager, netTests=None):
+    def __init__(self, netTests=None):
         self.netTests = netTests if netTests else []
-        self.manager = manager
 
-    @property
-    def failedMeasurements(self):
-        return len(self.failures)
+    def initializeTaskList(self):
+        for net_test in self.netTests:
+            self.schedule(net_test.generateMeasurements())
 
-    def start(self):
+    def suceeded(self, result, measurement):
+        pass
+
+    def failed(self, failure, measurement):
+        pass
+
+class Report(object):
+    def __init__(self, reporters, net_test):
         """
-        Start running the measurements.
+        This will instantiate all the reporters and add them to the list of
+        available reporters.
+
+        net_test:
+            is a reference to the net_test to which the report object belongs to.
         """
-        self.populateMeasurements()
-        self.runMoreMeasurements()
+        self.netTest = net_test
+        self.reporters = []
+        for r in reporters:
+            reporter = r()
+            self.reporters.append(reporter)
+
+        self.createReports()
 
-    def populateMeasurements(self):
+    def createReports(self):
         """
-        Take all the setup netTests and create the measurements iterator from
-        them.
+        This will create all the reports that need to be created.
         """
-        for net_test in self.netTests:
-            self._measurements = itertools.chain(self._measurements,
-                    net_test.generateMeasurements())
+        for reporter in self.reporters:
+            reporter.createReport()
 
-    def availableSlots(self):
+    def write(self, measurement):
         """
-        Returns the number of available slots for running tests.
+        This will write to all the reporters, by waiting on the created
+        callback to fire.
         """
-        return self.concurrency - len(self._active_measurements)
+        for reporter in self.reporters:
+            @reporter.created.addCallback
+            def cb(result):
+                report_write_task = ReportWrite(reporter, measurement)
 
-    def schedule(self, measurement):
-        self._active_measurements.append(measurement)
+class ReportEntryManager(object):
 
-        d = measurement.run()
-        d.addCallback(self.done)
-        d.addCallback(self.failed)
-        return d
+    director = None
 
-    def fillSlots(self):
-        """
-        Called on test completion and schedules measurements to be run for the
-        available slots.
-        """
-        for _ in range(self.availableSlots()):
-            try:
-                measurement = self._measurements.next()
-                self.schedule(measurement)
-            except StopIteration:
-                break
+    def __init__(self, manager, netTests=None):
+        self.netTests = netTests if netTests else []
+        self.manager = manager
 
-    def done(self, result, measurement):
-        """
-        We have successfully completed a measurement.
-        """
-        self._active_measurements.remove(measurement)
-        self.completedMeasurements += 1
+    def addNetTest(self, netTest):
+        self.netTests.append(netTest)
 
-        self.fillSlots()
+    def initializeTaskList(self):
+        pass
+
+    def suceeded(self, result, measurement):
+        pass
 
     def failed(self, failure, measurement):
-        """
-        The measurement has failed to complete.
-        """
-        self._active_measurements.remove(measurement)
-        self.failures.append((failure, measurement))
+        pass
 
-        if measurement.failures < self.retries:
-            self._measurements = itertools.chain(self._measurements,
-                    iter(measurement))
+class Director(object):
+    """
+    Singleton object responsible for coordinating the Measurements Manager and the
+    Reporting Manager.
 
-        self.fillSlots()
+    How this all looks like is as follows:
 
-class OManager(object):
-    """
-    Singleton object responsible for managing the Measurements Tracker and the
-    Reporting Tracker.
-    """
+    +------------------------------------------------+
+    |                   Director                     |<--+
+    +------------------------------------------------+   |
+        ^                                ^               |
+        |        Measurement             |               |
+    +---------+  [---------]    +--------------------+   |
+    |         |                 | MeasurementManager |   |
+    | NetTest |  [---------]    +--------------------+   |
+    |         |                 | [----------------] |   |
+    +---------+  [---------]    | [----------------] |   |
+        |                       | [----------------] |   |
+        |                       +--------------------+   |
+        v                                                |
+    +---------+   ReportEntry                            |
+    |         |   [---------]    +--------------------+  |
+    |  Report |                  | ReportEntryManager |  |
+    |         |   [---------]    +--------------------+  |
+    +---------+                  | [----------------] |  |
+                  [---------]    | [----------------] |--
+                                 | [----------------] |
+                                 +--------------------+
 
+    [------------] are Tasks
+
+    +------+
+    |      |  are TaskManagers
+    +------+
+    |      |
+    +------+
+
+    +------+
+    |      |  are general purpose objects
+    +------+
+
+    """
     _scheduledTests = 0
 
-    def __init__(self, reporters=[]):
+    def __init__(self):
         self.reporters = reporters
 
         self.netTests = []
 
         self.measurementsManager = MeasurementsManager(manager=self,
                 netTests=self.netTests)
-        self.measurementsManager.manager = self
+        self.measurementsManager.director = self
 
-        self.reportingManager
+        self.reportingManager = ReportingEntryManager()
+        self.reportingManager.director = self
 
-    def writeReport(self, measurement):
+    def startTest(self, net_test_file, inputs, options):
         """
-        Write to all the configured reporters.
+        Create the Report for the NetTest and start the report NetTest.
         """
-        for reporter in self.reporters:
-            reporter.write(measurement)
-
-    def writeFailure(self, measurement, failure):
-        pass
+        report = Report()
+        net_test = NetTest(net_test_file, inputs, options, report)
+        net_test.director = self
 
-    def addNetTest(self, net_test):
+    def measurementTimedOut(self, measurement):
         """
-        This is called to add a NetTest to the list of running network tests.
+        This gets called every time a measurement times out independenty from
+        the fact that it gets re-scheduled or not.
         """
-        self.netTests.append(net_test)
+        pass
+
+    def measurementFailed(self, measurement, failure):
+        pass
+
+    def writeFailure(self, measurement, failure):
+        pass
 





More information about the tor-commits mailing list