commit e10008eb9d9f1df1d1ff9b3f7c69092e101c7716 Author: Arturo Filastò art@fuffa.org Date: Sun Jan 13 20:21:02 2013 +0100
Use a Mediator pattern to keep track of which tests have failed --- ooni/managers.py | 47 +++++------------------------------------ ooni/nettest.py | 24 ++++++++++++++++++-- ooni/reporter.py | 16 +++++++++++++- ooni/tasks.py | 61 ++++++++++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 94 insertions(+), 54 deletions(-)
diff --git a/ooni/managers.py b/ooni/managers.py index dd748ae..3983705 100644 --- a/ooni/managers.py +++ b/ooni/managers.py @@ -63,7 +63,6 @@ class TaskManager(object): self._fillSlots()
task.done.callback(result) - self.succeeded(result, task)
def _run(self, task): @@ -155,50 +154,16 @@ class MeasurementManager(TaskManager): def failed(self, failure, measurement): self.director.measurementFailed(failure, measurement)
-class Report(object): - reportEntryManager = None - - def __init__(self, reporters, net_test): - """ - This will instantiate all the reporters and add them to the list of - available reporters. - - net_test: - is a reference to the net_test to which the report object belongs to. - """ - self.netTest = net_test - self.reporters = [] - for r in reporters: - reporter = r() - self.reporters.append(reporter) - - self.createReports() - - def createReports(self): - """ - This will create all the reports that need to be created. - """ - for reporter in self.reporters: - reporter.createReport() - - def write(self, measurement): - """ - This will write to all the reporters, by waiting on the created - callback to fire. - """ - for reporter in self.reporters: - @reporter.created.addCallback - def cb(result): - report_write_task = ReportWrite(reporter, measurement) - self.reportEntryManager.schedule(report_write_task) - -class ReportEntryManager(object):
+class ReportEntryManager(TaskManager): director = None
- def succeeded(self, result, measurement): + def started(self, task): pass
- def failed(self, failure, measurement): + def succeeded(self, result, task): + pass + + def failed(self, failure, task): pass
diff --git a/ooni/nettest.py b/ooni/nettest.py index 01dde72..31f2956 100644 --- a/ooni/nettest.py +++ b/ooni/nettest.py @@ -1,9 +1,10 @@ import os
+from twisted.internet import defer, reactor from twisted.trial.runner import filenameToModule from twisted.python import usage, reflect
-from ooni.tasks import Measurement +from ooni.tasks import Measurement, TaskMediator from ooni.utils import log, checkForRoot, NotRootError
from inspect import getmembers @@ -28,6 +29,18 @@ class NetTest(object): self.report = report self.test_cases = self.loadNetTest(net_test_file)
+ self.allMeasurementsDone = defer.Deferred() + self.allReportsDone = defer.Deferred() + + # This should fire when all the measurements have been completed and + # all the reports are done. Done means that they have either completed + # successfully or all the possible retries have been reached. + # self.done = defer.DeferredList([self.allMeasurementsDone, + # self.allReportsDone]) + + # XXX Fire the done when also all the reporting tasks have been completed. + self.done = self.allMeasurementsDone + def start(self): """ Set up tests and start running. @@ -36,6 +49,8 @@ class NetTest(object): self.setUpNetTestCases() self.measurementManager.schedule(self.generateMeasurements())
+ return self.done + def loadNetTest(self, net_test_file): """ Creates all the necessary test_cases (a list of tuples containing the @@ -111,7 +126,7 @@ class NetTest(object):
def succeeded(self, measurement): """ - This gets called when a measurement has failed. + This gets called when a measurement has succeeded. """ self.report.write(measurement)
@@ -120,12 +135,15 @@ class NetTest(object): This is a generator that yields measurements and sets their timeout value and their netTest attribute. """ + + task_mediator = TaskMediator(self.allMeasurementsDone) for test_class, test_method in self.test_cases: for test_input in test_class.inputs: measurement = Measurement(test_class, test_method, - test_input, self) + test_input, self, task_mediator) measurement.netTest = self yield measurement + task_mediator.allTasksScheduled()
def setUpNetTestCases(self): """ diff --git a/ooni/reporter.py b/ooni/reporter.py index 637131c..b3a7244 100644 --- a/ooni/reporter.py +++ b/ooni/reporter.py @@ -31,6 +31,8 @@ from ooni.utils.net import BodyReceiver, StringProducer, userAgents
from ooni import config
+from ooni.tasks import ReportEntry, TaskMediator + def createPacketReport(packet_list): """ Takes as input a packet a list. @@ -391,6 +393,11 @@ class Report(object):
self.createReports()
+ self.done = defer.Deferred() + self.done.addCallback(self.finish) + + self.report_mediator = TaskMediator(self.done) + def createReports(self): """ This will create all the reports that need to be created. @@ -406,6 +413,13 @@ class Report(object): for reporter in self.reporters: @reporter.created.addCallback def cb(result): - report_write_task = ReportWrite(reporter, measurement) + report_write_task = ReportEntry(reporter, measurement, + self.report_mediator) self.reportEntryManager.schedule(report_write_task)
+ def finish(self): + for reporter in self.reporters: + d = defer.maybeDeferred(reporter.finish) + dl.append(d) + return defer.DeferredList(dl) + diff --git a/ooni/tasks.py b/ooni/tasks.py index 981f5e1..3fdf083 100644 --- a/ooni/tasks.py +++ b/ooni/tasks.py @@ -5,7 +5,11 @@ from twisted.internet import defer, reactor class BaseTask(object): _timer = None
- def __init__(self): + def __init__(self, mediator=None): + """ + If you want to schedule a task multiple times, remember to create fresh + instances of it. + """ self.running = False self.failures = 0
@@ -16,6 +20,9 @@ class BaseTask(object): # final status, this means: all retries have been attempted or the test # has successfully executed. self.done = defer.Deferred() + if mediator: + mediator.created() + self.done.addCallback(mediator.taskDone)
def _failed(self, failure): self.failures += 1 @@ -90,7 +97,8 @@ class TaskWithTimeout(BaseTask): pass
class Measurement(TaskWithTimeout): - def __init__(self, test_class, test_method, test_input, net_test): + def __init__(self, test_class, test_method, test_input, net_test, + mediator): """ test_class: is the class, subclass of NetTestCase, of the test to be run @@ -115,24 +123,59 @@ class Measurement(TaskWithTimeout):
self.netTest = net_test
- def succeeded(self): - self.net_test.succeeded(self) + TaskWithTimeout.__init__(self, mediator)
- def failed(self): + def succeeded(self, result): + return self.netTest.succeeded(self) + + def failed(self, failure): pass
def timedOut(self): - self.net_test.timedOut() + self.netTest.timedOut()
def run(self): - return defer.maybeDeferred(self.test) + d = defer.maybeDeferred(self.test) + return d
class ReportEntry(TaskWithTimeout): - def __init__(self, reporter, measurement): + def __init__(self, reporter, measurement, task_mediator): self.reporter = reporter self.measurement = measurement - TaskWithTimeout.__init__(self) + + TaskWithTimeout.__init__(self, task_mediator)
def run(self): return self.reporter.writeReportEntry(self.measurement)
+ +class TaskMediator(object): + def __init__(self, allTasksDone): + """ + This implements a Mediator/Observer pattern to keep track of when Tasks + that are logically linked together have all reached a final done stage. + + Args: + allTasksDone is a deferred that will get fired once all the tasks + have been completed. + """ + self.doneTasks = 0 + self.tasks = 0 + + self.completedScheduling = False + + self.allTasksDone = allTasksDone + + def created(self): + self.tasks += 1 + + def taskDone(self, result): + self.doneTasks += 1 + if self.completedScheduling and \ + self.doneTasks == self.tasks: + self.allTasksDone.callback(None) + + def allTasksScheduled(self): + self.completedScheduling = True + +
tor-commits@lists.torproject.org