commit 813230cc416338740bf72acb04b97042ebd5261a Author: Arturo Filastò art@fuffa.org Date: Tue Jan 15 18:01:45 2013 +0100
Refactoring of the architecture of the taskManager
Minimize coupling, make clearer the chain of responsibilities --- ooni/director.py | 23 ++++++++-- ooni/managers.py | 36 +++++--------- ooni/nettest.py | 123 +++++++++++++++++++++++++++++++++--------------- ooni/reporter.py | 70 +++++++++++++++++++-------- ooni/tasks.py | 65 ++----------------------- tests/test_nettest.py | 3 + 6 files changed, 174 insertions(+), 146 deletions(-)
diff --git a/ooni/director.py b/ooni/director.py index 582bb84..41100cf 100644 --- a/ooni/director.py +++ b/ooni/director.py @@ -120,13 +120,25 @@ class Director(object):
self.successfulMeasurements += 1
+ return measurement.report.write(measurement) + def measurementFailed(self, failure, measurement): self.totalMeasurementRuntime += measurement.runtime
self.failedMeasurements += 1 self.failures.append((failure, measurement))
- def startTest(self, net_test_file, options): + def reportEntryFailed(self, failure): + # XXX add failure handling logic + return + + def startMeasurements(self, measurements): + self.measurementManager.schedule(measurements) + + def netTestDone(self, net_test): + self.activeNetTests.remove(net_test) + + def startNetTest(self, net_test_file, options): """ Create the Report for the NetTest and start the report NetTest.
@@ -139,12 +151,15 @@ class Director(object): is a dict containing the options to be passed to the chosen net test. """ - report = Report(self.reporters) - report.reportEntryManager = self.reportEntryManager + report = Report(self.reporters, self.reportEntryManager)
net_test = NetTest(net_test_file, options, report) - net_test.measurementManager = self.measurementManager + net_test.director = self + + self.activeNetTests.append(net_test) + self.activeNetTests.append(net_test)
d = net_test.start() + d.addBoth(self.netTestDone) return d
diff --git a/ooni/managers.py b/ooni/managers.py index 3983705..fa59058 100644 --- a/ooni/managers.py +++ b/ooni/managers.py @@ -15,13 +15,12 @@ def makeIterable(item): class TaskManager(object): retries = 2
- failures = [] concurrency = 10
- completedTasks = 0 - - _tasks = iter(()) - _active_tasks = [] + def __init__(self): + self._tasks = iter(()) + self._active_tasks = [] + self.failures = []
def _failed(self, failure, task): """ @@ -35,7 +34,8 @@ class TaskManager(object): self._tasks = itertools.chain(self._tasks, makeIterable(task)) else: - task.done.callback((failure, task)) + # This fires the errback when the task is done but has failed. + task.done.callback(failure)
self.failed(failure, task)
@@ -58,11 +58,11 @@ class TaskManager(object): We have successfully completed a measurement. """ self._active_tasks.remove(task) - self.completedTasks += 1
self._fillSlots()
- task.done.callback(result) + # Fires the done deferred when the task has completed + task.done.callback(task) self.succeeded(result, task)
def _run(self, task): @@ -105,12 +105,6 @@ class TaskManager(object):
self._fillSlots()
- def started(self, task): - """ - This hook will get called every time a task has been started. - """ - pass - def failed(self, failure, task): """ This hoook is called every time a task has failed. @@ -138,28 +132,24 @@ class MeasurementManager(TaskManager): NetTest on the contrary is aware of the typology of measurements that it is dispatching as they are logically grouped by test file. """ + # XXX tweak these values retries = 2 - - failures = [] concurrency = 10
director = None
- def started(self, measurement): - self.director.measurementStarted(measurement) - def succeeded(self, result, measurement): self.director.measurementSucceeded(measurement)
def failed(self, failure, measurement): self.director.measurementFailed(failure, measurement)
- class ReportEntryManager(TaskManager): - director = None + # XXX tweak these values + retries = 3 + concurrency = 20
- def started(self, task): - pass + director = None
def succeeded(self, result, task): pass diff --git a/ooni/nettest.py b/ooni/nettest.py index d6cadc3..9146924 100644 --- a/ooni/nettest.py +++ b/ooni/nettest.py @@ -4,7 +4,7 @@ from twisted.internet import defer, reactor from twisted.trial.runner import filenameToModule from twisted.python import usage, reflect
-from ooni.tasks import Measurement, TaskMediator +from ooni.tasks import Measurement from ooni.utils import log, checkForRoot, NotRootError
from inspect import getmembers @@ -13,8 +13,53 @@ from StringIO import StringIO class NoTestCasesFound(Exception): pass
+class NetTestState(object): + def __init__(self, allTasksDone): + """ + This keeps track of the state of a running NetTests case. + + Args: + allTasksDone is a deferred that will get fired once all the NetTest + cases have reached a final done state. + """ + self.doneTasks = 0 + self.tasks = 0 + + self.completedScheduling = False + self.allTasksDone = allTasksDone + + def created(self): + self.tasks += 1 + + def checkAllTasksDone(self): + if self.completedScheduling and \ + self.doneTasks == self.tasks: + self.allTasksDone.callback(self.doneTasks) + + def taskDone(self, result): + """ + This is called every time a task has finished running. + """ + self.doneTasks += 1 + self.checkAllTasksDone() + + def allTasksScheduled(self): + """ + This should be called once all the tasks that need to run have been + scheduled. + + XXX this is ghetto. + The reason for which we are calling allTasksDone inside of the + allTasksScheduled method is called after all tasks are done, then we + will run into a race condition. The race is that we don't end up + checking that all the tasks are complete because no task is to be + scheduled. + """ + self.completedScheduling = True + self.checkAllTasksDone() + class NetTest(object): - measurementManager = None + director = None method_prefix = 'test'
def __init__(self, net_test_file, options, report): @@ -29,17 +74,12 @@ class NetTest(object): self.report = report self.test_cases = self.loadNetTest(net_test_file)
- self.allMeasurementsDone = defer.Deferred() - self.allReportsDone = defer.Deferred() - - # This should fire when all the measurements have been completed and + # This will fire when all the measurements have been completed and # all the reports are done. Done means that they have either completed # successfully or all the possible retries have been reached. - self.done = defer.DeferredList([self.allMeasurementsDone, - self.allReportsDone]) + self.done = defer.Deferred()
- # XXX Fire the done when also all the reporting tasks have been completed. - # self.done = self.allMeasurementsDone + self.state = NetTestState(self.done)
def start(self): """ @@ -47,10 +87,42 @@ class NetTest(object): Start tests and generate measurements. """ self.setUpNetTestCases() - self.measurementManager.schedule(self.generateMeasurements()) - + self.director.startMeasurements(self.generateMeasurements()) return self.done
+ def doneReport(self, result): + """ + This will get called every time a measurement is done and therefore a + measurement is done. + + The state for the NetTest is informed of the fact that another task has + reached the done state. + """ + self.state.taskDone() + return result + + def generateMeasurements(self): + """ + This is a generator that yields measurements and registers the + callbacks for when a measurement is successful or has failed. + """ + for test_class, test_method in self.test_cases: + for test_input in test_class.inputs: + measurement = Measurement(test_class, test_method, test_input) + + measurement.done.addCallback(self.director.measurementSucceeded) + measurement.done.addErrback(self.director.measurementFailed) + + measurement.done.addCallback(self.report.write) + measurement.done.addErrback(self.director.reportEntryFailed) + + measurement.done.addBoth(self.doneReport) + + self.state.taskCreated() + yield measurement + + self.state.allTasksScheduled() + def loadNetTest(self, net_test_file): """ Creates all the necessary test_cases (a list of tuples containing the @@ -124,33 +196,6 @@ class NetTest(object): pass return test_cases
- def succeeded(self, measurement): - """ - This gets called when a measurement has succeeded. - """ - self.report.write(measurement) - - def generateMeasurements(self): - """ - This is a generator that yields measurements and sets their timeout - value and their netTest attribute. - """ - task_mediator = TaskMediator(self.allMeasurementsDone) - for test_class, test_method in self.test_cases: - for test_input in test_class.inputs: - measurement = Measurement(test_class, test_method, - test_input, self, task_mediator) - measurement.netTest = self - yield measurement - task_mediator.allTasksScheduled() - - @task_mediator.allTasksDone.addCallback - def done(result): - """ - Once all the MeasurementsTasks have been completed all the report - tasks will have been scheduled. - """ - self.report.report_mediator.allTasksScheduled()
def setUpNetTestCases(self): """ diff --git a/ooni/reporter.py b/ooni/reporter.py index d24edcc..80595f9 100644 --- a/ooni/reporter.py +++ b/ooni/reporter.py @@ -31,7 +31,7 @@ from ooni.utils.net import BodyReceiver, StringProducer, userAgents
from ooni import config
-from ooni.tasks import ReportEntry, TaskMediator +from ooni.tasks import ReportEntry
def createPacketReport(packet_list): """ @@ -158,6 +158,8 @@ def getTestDetails(options): return test_details
class OReporter(object): + created = defer.Deferred() + def __init__(self, cmd_line_options): self.cmd_line_options = dict(cmd_line_options)
@@ -380,15 +382,19 @@ class ReportClosed(Exception): pass
class Report(object): - reportEntryManager = None - - def __init__(self, reporters): + def __init__(self, reporters, reportEntryManager): """ - This will instantiate all the reporters and add them to the list of - available reporters. + This is an abstraction layer on top of all the configured reporters. + + It allows to lazily write to the reporters that are to be used. + + Args: + + reporters: + a list of :class:ooni.reporter.OReporter
- net_test: - is a reference to the net_test to which the report object belongs to. + reportEntryManager: + an instance of :class:ooni.tasks.ReportEntryManager """ self.reporters = [] for r in reporters: @@ -398,13 +404,14 @@ class Report(object): self.createReports()
self.done = defer.Deferred() - self.done.addCallback(self.finish) + self.done.addCallback(self.close)
- self.report_mediator = TaskMediator(self.done) + self.reportEntryManager = reportEntryManager
- def createReports(self): + def open(self): """ - This will create all the reports that need to be created. + This will create all the reports that need to be created and fires the + created callback of the reporter whose report got created. """ for reporter in self.reporters: d = defer.maybeDeferred(reporter.createReport) @@ -413,20 +420,41 @@ class Report(object): def write(self, measurement): """ This is a lazy call that will write to all the reporters by waiting on - the created callback to fire. + them to be created.
- The report_write_task is created before we attach the callback so that - the report mediator is aware of the total number of created reportEntry - tasks. + Will return a deferred that will fire once the report for the specified + measurement have been written to all the reporters. + + Args: + + measurement: + an instance of :class:ooni.tasks.Measurement + + Returns: + a deferred list that will fire once all the report entries have + been written. """ + dl = [] for reporter in self.reporters: - report_write_task = ReportEntry(reporter, measurement, - self.report_mediator) - @reporter.created.addCallback - def cb(result): + def writeReportEntry(result): + report_write_task = ReportEntry(reporter, measurement) self.reportEntryManager.schedule(report_write_task) + return report_write_task.done
- def finish(self, result): + d = reporter.created.addBoth(writeReportEntry) + dl.append(d) + + return defer.DeferredList(dl) + + def close(self, _): + """ + Close the report by calling it's finish method. + + Returns: + a :class:twisted.internet.defer.DeferredList that will fire when + all the reports have been closed. + + """ dl = [] for reporter in self.reporters: d = defer.maybeDeferred(reporter.finish) diff --git a/ooni/tasks.py b/ooni/tasks.py index f2a9bae..28aaca4 100644 --- a/ooni/tasks.py +++ b/ooni/tasks.py @@ -5,7 +5,7 @@ from twisted.internet import defer, reactor class BaseTask(object): _timer = None
- def __init__(self, mediator=None): + def __init__(self): """ If you want to schedule a task multiple times, remember to create fresh instances of it. @@ -19,10 +19,8 @@ class BaseTask(object): # This is a deferred that gets called when a test has reached it's # final status, this means: all retries have been attempted or the test # has successfully executed. + # Such deferred will be called on completion by the TaskManager. self.done = defer.Deferred() - if mediator: - mediator.created() - self.done.addCallback(mediator.taskDone)
def _failed(self, failure): self.failures += 1 @@ -97,8 +95,7 @@ class TaskWithTimeout(BaseTask): pass
class Measurement(TaskWithTimeout): - def __init__(self, test_class, test_method, test_input, net_test, - mediator): + def __init__(self, test_class, test_method, test_input): """ test_class: is the class, subclass of NetTestCase, of the test to be run @@ -121,9 +118,7 @@ class Measurement(TaskWithTimeout): self.test_instance.setUp() self.test = getattr(self.test_instance, test_method)
- self.netTest = net_test - - TaskWithTimeout.__init__(self, mediator) + TaskWithTimeout.__init__(self)
def succeeded(self, result): return self.netTest.succeeded(self) @@ -139,60 +134,12 @@ class Measurement(TaskWithTimeout): return d
class ReportEntry(TaskWithTimeout): - def __init__(self, reporter, measurement, task_mediator): + def __init__(self, reporter, measurement): self.reporter = reporter self.measurement = measurement
- TaskWithTimeout.__init__(self, task_mediator) + TaskWithTimeout.__init__(self)
def run(self): return self.reporter.writeReportEntry(self.measurement)
- -class TaskMediator(object): - def __init__(self, allTasksDone): - """ - This implements a Mediator/Observer pattern to keep track of when Tasks - that are logically linked together have all reached a final done stage. - - Args: - allTasksDone is a deferred that will get fired once all the tasks - have been completed. - """ - self.doneTasks = 0 - self.tasks = 0 - - self.completedScheduling = False - self.allTasksDone = allTasksDone - - def created(self): - self.tasks += 1 - - def checkAllTasksDone(self): - if self.completedScheduling and \ - self.doneTasks == self.tasks: - self.allTasksDone.callback(self.doneTasks) - - def taskDone(self, result): - """ - This is called every time a task has finished running. - """ - self.doneTasks += 1 - self.checkAllTasksDone() - - def allTasksScheduled(self): - """ - This should be called once all the tasks that need to run have been - scheduled. - - XXX this is ghetto. - The reason for which we are calling allTasksDone inside of the - allTasksScheduled method is called after all tasks are done, then we - will run into a race condition. The race is that we don't end up - checking that all the tasks are complete because no task is to be - scheduled. - """ - self.completedScheduling = True - self.checkAllTasksDone() - - diff --git a/tests/test_nettest.py b/tests/test_nettest.py index 3c7bdf6..a029fae 100644 --- a/tests/test_nettest.py +++ b/tests/test_nettest.py @@ -10,6 +10,8 @@ from ooni.nettest import FailureToLoadNetTest from ooni.tasks import BaseTask from ooni.utils import NotRootError
+from ooni.director import Director + from ooni.managers import TaskManager
from tests.mocks import MockMeasurement, MockMeasurementFailOnce @@ -223,6 +225,7 @@ class TestNetTest(unittest.TestCase): net_test = NetTest(StringIO(net_test_string_with_file), dummyOptionsWithFile, MockReporter()) net_test.measurementManager = MockMeasurementManager() + net_test.director = Director()
d = net_test.start() @d.addCallback