commit 055993061eb1a69a9e9876091d9b07f894deda32 Author: Arturo Filastò art@fuffa.org Date: Fri Jan 11 20:56:45 2013 +0100
Refactoring of reporting managers
Make the dependencies between modules much clearer Sketch out a graph that illustrates such depedencies --- ooni/managers.py | 281 ++++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 209 insertions(+), 72 deletions(-)
diff --git a/ooni/managers.py b/ooni/managers.py index 3383840..ec47ca7 100644 --- a/ooni/managers.py +++ b/ooni/managers.py @@ -3,7 +3,108 @@ import itertools from .ratelimiting import StaticRateLimiter from .measurements import Measurement, NetTest
-class MeasurementsManager(object): +class TaskManager(object): + retries = 2 + + failures = [] + concurrency = 10 + + _tasks = iter() + _active_tasks = [] + + def _failed(self, failure, task): + """ + The has failed to complete, we append it to the end of the task chain + to be re-run once all the currently scheduled tasks have run. + """ + self._active_tasks.remove(task) + self.failures.append((failure, task)) + + if task.failures < self.retries: + self._tasks = itertools.chain(self._tasks, + iter(task)) + + self.fillSlots() + + self.failed(failure, task) + + def _fillSlots(self): + """ + Called on test completion and schedules measurements to be run for the + available slots. + """ + for _ in range(self.availableSlots()): + try: + task = self._tasks.next() + self._run(task) + except StopIteration: + break + + def _suceeded(self, result, task): + """ + We have successfully completed a measurement. + """ + self._active_tasks.remove(task) + self.completedTasks += 1 + + self.fillSlots() + + self.suceeded(result, task) + + def _run(self, task): + """ + This gets called to add a task to the list of currently active and + running tasks. + """ + self._active_tasks.append(task) + + d = task.run() + d.addCallback(self.succeeded) + d.addCallback(self.failed) + + @property + def failedMeasurements(self): + return len(self.failures) + + def availableSlots(self): + """ + Returns the number of available slots for running tests. + """ + return self.concurrency - len(self._active_tasks) + + def schedule(self, task_or_task_iterator): + """ + Takes as argument a single task or a task iterable and appends it to the task + generator queue. + """ + self._tasks = itertools.chain(self._tasks, task_or_task_iterator) + + def start(self): + self.initializeTaskList() + self._fillSlots() + + def initializeTaskList(self): + """ + This should contain all the logic that gets run at first start to + pre-populate the list of tasks to be run and the tasks currently + running. + """ + raise NotImplemented + + def failed(self, failure, task): + """ + This method should be overriden by the subclass and should contains + logic for dealing with a failure that is subclass specific. + + The default failure handling logic is to reschedule the task up until + we reach the maximum number of retries. + """ + raise NotImplemented + + def succeeded(self, result, task): + raise NotImplemented + +class MeasurementsManager(TaskManager): """ This is the Measurement Tracker. In here we keep track of active measurements and issue new measurements once the active ones have been completed. @@ -20,113 +121,149 @@ class MeasurementsManager(object): failures = [] concurrency = 10
- _measurements = iter() - _active_measurements = [] + director = None
- def __init__(self, manager, netTests=None): + def __init__(self, netTests=None): self.netTests = netTests if netTests else [] - self.manager = manager
- @property - def failedMeasurements(self): - return len(self.failures) + def initializeTaskList(self): + for net_test in self.netTests: + self.schedule(net_test.generateMeasurements())
- def start(self): + def suceeded(self, result, measurement): + pass + + def failed(self, failure, measurement): + pass + +class Report(object): + def __init__(self, reporters, net_test): """ - Start running the measurements. + This will instantiate all the reporters and add them to the list of + available reporters. + + net_test: + is a reference to the net_test to which the report object belongs to. """ - self.populateMeasurements() - self.runMoreMeasurements() + self.netTest = net_test + self.reporters = [] + for r in reporters: + reporter = r() + self.reporters.append(reporter) + + self.createReports()
- def populateMeasurements(self): + def createReports(self): """ - Take all the setup netTests and create the measurements iterator from - them. + This will create all the reports that need to be created. """ - for net_test in self.netTests: - self._measurements = itertools.chain(self._measurements, - net_test.generateMeasurements()) + for reporter in self.reporters: + reporter.createReport()
- def availableSlots(self): + def write(self, measurement): """ - Returns the number of available slots for running tests. + This will write to all the reporters, by waiting on the created + callback to fire. """ - return self.concurrency - len(self._active_measurements) + for reporter in self.reporters: + @reporter.created.addCallback + def cb(result): + report_write_task = ReportWrite(reporter, measurement)
- def schedule(self, measurement): - self._active_measurements.append(measurement) +class ReportEntryManager(object):
- d = measurement.run() - d.addCallback(self.done) - d.addCallback(self.failed) - return d + director = None
- def fillSlots(self): - """ - Called on test completion and schedules measurements to be run for the - available slots. - """ - for _ in range(self.availableSlots()): - try: - measurement = self._measurements.next() - self.schedule(measurement) - except StopIteration: - break + def __init__(self, manager, netTests=None): + self.netTests = netTests if netTests else [] + self.manager = manager
- def done(self, result, measurement): - """ - We have successfully completed a measurement. - """ - self._active_measurements.remove(measurement) - self.completedMeasurements += 1 + def addNetTest(self, netTest): + self.netTests.append(netTest)
- self.fillSlots() + def initializeTaskList(self): + pass + + def suceeded(self, result, measurement): + pass
def failed(self, failure, measurement): - """ - The measurement has failed to complete. - """ - self._active_measurements.remove(measurement) - self.failures.append((failure, measurement)) + pass
- if measurement.failures < self.retries: - self._measurements = itertools.chain(self._measurements, - iter(measurement)) +class Director(object): + """ + Singleton object responsible for coordinating the Measurements Manager and the + Reporting Manager.
- self.fillSlots() + How this all looks like is as follows:
-class OManager(object): - """ - Singleton object responsible for managing the Measurements Tracker and the - Reporting Tracker. - """ + +------------------------------------------------+ + | Director |<--+ + +------------------------------------------------+ | + ^ ^ | + | Measurement | | + +---------+ [---------] +--------------------+ | + | | | MeasurementManager | | + | NetTest | [---------] +--------------------+ | + | | | [----------------] | | + +---------+ [---------] | [----------------] | | + | | [----------------] | | + | +--------------------+ | + v | + +---------+ ReportEntry | + | | [---------] +--------------------+ | + | Report | | ReportEntryManager | | + | | [---------] +--------------------+ | + +---------+ | [----------------] | | + [---------] | [----------------] |-- + | [----------------] | + +--------------------+
+ [------------] are Tasks + + +------+ + | | are TaskManagers + +------+ + | | + +------+ + + +------+ + | | are general purpose objects + +------+ + + """ _scheduledTests = 0
- def __init__(self, reporters=[]): + def __init__(self): self.reporters = reporters
self.netTests = []
self.measurementsManager = MeasurementsManager(manager=self, netTests=self.netTests) - self.measurementsManager.manager = self + self.measurementsManager.director = self
- self.reportingManager + self.reportingManager = ReportingEntryManager() + self.reportingManager.director = self
- def writeReport(self, measurement): + def startTest(self, net_test_file, inputs, options): """ - Write to all the configured reporters. + Create the Report for the NetTest and start the report NetTest. """ - for reporter in self.reporters: - reporter.write(measurement) - - def writeFailure(self, measurement, failure): - pass + report = Report() + net_test = NetTest(net_test_file, inputs, options, report) + net_test.director = self
- def addNetTest(self, net_test): + def measurementTimedOut(self, measurement): """ - This is called to add a NetTest to the list of running network tests. + This gets called every time a measurement times out independenty from + the fact that it gets re-scheduled or not. """ - self.netTests.append(net_test) + pass + + def measurementFailed(self, measurement, failure): + pass + + def writeFailure(self, measurement, failure): + pass
tor-commits@lists.torproject.org