commit f621794d11c3dda37edbccf82a56db1025e348c7 Author: Arturo Filastò art@fuffa.org Date: Sat Jan 12 14:58:11 2013 +0100
Work on TaskManager and write tests for it
Changes to the API of the MeasurementManager and ReportManager --- ooni/managers.py | 47 +++++++++++++++++++++++------------------------ tests/test_manager.py | 5 +++++ 2 files changed, 28 insertions(+), 24 deletions(-)
diff --git a/ooni/managers.py b/ooni/managers.py index f6d4c74..26ac3f9 100644 --- a/ooni/managers.py +++ b/ooni/managers.py @@ -18,9 +18,14 @@ class TaskManager(object): failures = [] concurrency = 10
- _tasks = iter() + _tasks = iter(()) _active_tasks = []
+ def _tasksDone(self): + self.tasksDone.callback(None) + + self.tasksDone = defer.Deferred() + def _failed(self, failure, task): """ The has failed to complete, we append it to the end of the task chain @@ -31,9 +36,9 @@ class TaskManager(object):
if task.failures < self.retries: self._tasks = itertools.chain(self._tasks, - iter(task)) + makeIterable(task))
- self.fillSlots() + self._fillSlots()
self.failed(failure, task)
@@ -42,11 +47,12 @@ class TaskManager(object): Called on test completion and schedules measurements to be run for the available slots. """ - for _ in range(self.availableSlots()): + for _ in range(self.availableSlots): try: task = self._tasks.next() self._run(task) except StopIteration: + self._tasksDone() break
def _suceeded(self, result, task): @@ -56,7 +62,7 @@ class TaskManager(object): self._active_tasks.remove(task) self.completedTasks += 1
- self.fillSlots() + self._fillSlots()
self.suceeded(result, task)
@@ -68,13 +74,14 @@ class TaskManager(object): self._active_tasks.append(task)
d = task.run() - d.addCallback(self.succeeded) - d.addCallback(self.failed) + d.addCallback(self.succeeded, task) + d.addErrback(self.failed, task)
@property def failedMeasurements(self): return len(self.failures)
+ @property def availableSlots(self): """ Returns the number of available slots for running tests. @@ -86,19 +93,15 @@ class TaskManager(object): Takes as argument a single task or a task iterable and appends it to the task generator queue. """ - self._tasks = itertools.chain(self._tasks, task_or_task_iterator)
- def start(self): - self.initializeTaskList() + iterable = makeIterable(task_or_task_iterator) + + self._tasks = itertools.chain(self._tasks, iterable) self._fillSlots()
- def initializeTaskList(self): - """ - This should contain all the logic that gets run at first start to - pre-populate the list of tasks to be run and the tasks currently - running. - """ - raise NotImplemented + def start(self): + self.tasksDone = defer.Deferred() + self._fillSlots()
def failed(self, failure, task): """ @@ -132,13 +135,6 @@ class MeasurementsManager(TaskManager):
director = None
- def __init__(self, netTests=None): - self.netTests = netTests if netTests else [] - - def initializeTaskList(self): - for net_test in self.netTests: - self.schedule(net_test.generateMeasurements()) - def suceeded(self, result, measurement): pass
@@ -146,6 +142,8 @@ class MeasurementsManager(TaskManager): pass
class Report(object): + reportEntryManager = None + def __init__(self, reporters, net_test): """ This will instantiate all the reporters and add them to the list of @@ -178,6 +176,7 @@ class Report(object): @reporter.created.addCallback def cb(result): report_write_task = ReportWrite(reporter, measurement) + self.reportEntryManager.schedule(report_write_task)
class ReportEntryManager(object):
diff --git a/tests/test_manager.py b/tests/test_manager.py index 0ed77b0..f5ac052 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -65,6 +65,11 @@ class TestNetTest(unittest.TestCase): self.assertEqual([(DummyTestCase, 'test_a'), (DummyTestCase, 'test_b')], net_test.test_cases)
+ def test_net_test_timeout(self): + """Instantiate a test and verify that the timeout works properly when we call it.""" + net_test = NetTest(net_test_file, dummyInputs, dummyOptions) + # Where net_test_file is a test that will take longer than + class TestMeasurementsTracker(unittest.TestCase): def setUp(self): self.mock_mt = MeasurementsTracker(DummyManager())