commit 8fc8056fa1796c3f5d096d98076aa9d0bbbe47d9 Author: Arturo Filastò art@fuffa.org Date: Sat Jan 12 16:56:47 2013 +0100
Work with aagbsn to make the TaskManager pass all tests
Tests will retry to run until the retry limit has been reached Test failures are properly kept track of --- ooni/managers.py | 35 +++++----- ooni/tasks.py | 19 ++++-- tests/test_managers.py | 174 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 205 insertions(+), 23 deletions(-)
diff --git a/ooni/managers.py b/ooni/managers.py index 26ac3f9..be11473 100644 --- a/ooni/managers.py +++ b/ooni/managers.py @@ -18,14 +18,11 @@ class TaskManager(object): failures = [] concurrency = 10
+ completedTasks = 0 + _tasks = iter(()) _active_tasks = []
- def _tasksDone(self): - self.tasksDone.callback(None) - - self.tasksDone = defer.Deferred() - def _failed(self, failure, task): """ The has failed to complete, we append it to the end of the task chain @@ -34,14 +31,16 @@ class TaskManager(object): self._active_tasks.remove(task) self.failures.append((failure, task))
- if task.failures < self.retries: + if task.failures <= self.retries: self._tasks = itertools.chain(self._tasks, makeIterable(task)) - - self._fillSlots() + else: + task.done.callback((failure, task))
self.failed(failure, task)
+ self._fillSlots() + def _fillSlots(self): """ Called on test completion and schedules measurements to be run for the @@ -52,10 +51,9 @@ class TaskManager(object): task = self._tasks.next() self._run(task) except StopIteration: - self._tasksDone() break
- def _suceeded(self, result, task): + def _succeeded(self, result, task): """ We have successfully completed a measurement. """ @@ -64,7 +62,9 @@ class TaskManager(object):
self._fillSlots()
- self.suceeded(result, task) + task.done.callback(result) + + self.succeeded(result, task)
def _run(self, task): """ @@ -73,9 +73,9 @@ class TaskManager(object): """ self._active_tasks.append(task)
- d = task.run() - d.addCallback(self.succeeded, task) - d.addErrback(self.failed, task) + d = task.start() + d.addCallback(self._succeeded, task) + d.addErrback(self._failed, task)
@property def failedMeasurements(self): @@ -93,13 +93,14 @@ class TaskManager(object): Takes as argument a single task or a task iterable and appends it to the task generator queue. """ - iterable = makeIterable(task_or_task_iterator)
self._tasks = itertools.chain(self._tasks, iterable) self._fillSlots()
def start(self): + self.failures = [] + self.tasksDone = defer.Deferred() self._fillSlots()
@@ -135,7 +136,7 @@ class MeasurementsManager(TaskManager):
director = None
- def suceeded(self, result, measurement): + def succeeded(self, result, measurement): pass
def failed(self, failure, measurement): @@ -192,7 +193,7 @@ class ReportEntryManager(object): def initializeTaskList(self): pass
- def suceeded(self, result, measurement): + def succeeded(self, result, measurement): pass
def failed(self, failure, measurement): diff --git a/ooni/tasks.py b/ooni/tasks.py index 909f3e5..55f09bd 100644 --- a/ooni/tasks.py +++ b/ooni/tasks.py @@ -1,24 +1,31 @@ +from twisted.internet import defer + class BaseTask(object): _timer = None
def __init__(self): self.running = False self.failures = 0 + # This is a deferred that gets called when a test has reached it's + # final status, this means: all retries have been attempted or the test + # has successfully executed. + self.done = defer.Deferred()
def _failed(self, failure): self.failures += 1 self.failed(failure) - return + return failure
- def _run(self): + def _succeeded(self, result): + self.succeeded(result) + return result + + def start(self): d = self.run() d.addErrback(self._failed) d.addCallback(self._succeeded) return d
- def _succeeded(self, result): - self.succeeded(result) - def succeeded(self, result): """ Place here the logic to handle a successful execution of the task. @@ -113,7 +120,7 @@ class Measurement(TaskWithTimeout): d.addErrback(self.failure) return d
-class ReportEntry(TimedOutTask): +class ReportEntry(TaskWithTimeout): def __init__(self, reporter, measurement): self.reporter = reporter self.measurement = measurement diff --git a/tests/test_managers.py b/tests/test_managers.py new file mode 100644 index 0000000..59eba5f --- /dev/null +++ b/tests/test_managers.py @@ -0,0 +1,174 @@ +from twisted.trial import unittest +from twisted.python import failure +from twisted.internet import defer + +from ooni.tasks import BaseTask +from ooni.managers import TaskManager + + +mockFailure = failure.Failure(Exception('mock')) + +class MockSuccessTask(BaseTask): + def run(self): + return defer.succeed(42) + +class MockFailTask(BaseTask): + def run(self): + return defer.fail(mockFailure) + +class MockFailOnceTask(BaseTask): + def run(self): + if self.failures >= 1: + return defer.succeed(42) + else: + return defer.fail(mockFailure) + +class MockTaskManager(TaskManager): + def __init__(self): + self.successes = [] + + def failed(self, failure, task): + # print "TASK" + # print task + # print "FAILURES (%s)" % task.failures + # print failure + pass + + def succeeded(self, result, task): + self.successes.append((result, task)) + +class TestTaskManager(unittest.TestCase): + def setUp(self): + self.taskManager = MockTaskManager() + self.taskManager.concurrency = 10 + self.taskManager.retries = 2 + + self.taskManager.start() + + def tearDown(self): + pass + + def test_schedule_successful_one_task(self): + mock_task = MockSuccessTask() + self.taskManager.schedule(mock_task) + + @mock_task.done.addCallback + def done(res): + self.assertEqual(self.taskManager.successes, + [(42, mock_task)]) + return mock_task.done + + def test_schedule_failing_one_task(self): + mock_task = MockFailTask() + self.taskManager.schedule(mock_task) + + @mock_task.done.addCallback + def done(failure): + self.assertEqual(len(self.taskManager.failures), 3) + + self.assertEqual(failure, (mockFailure, mock_task)) + + return mock_task.done + + def test_schedule_successful_ten_tasks(self): + all_done = [] + for x in range(10): + mock_task = MockSuccessTask() + all_done.append(mock_task.done) + self.taskManager.schedule(mock_task) + + d = defer.DeferredList(all_done) + @d.addCallback + def done(res): + for task_result, task_instance in self.taskManager.successes: + self.assertEqual(task_result, 42) + self.assertIsInstance(task_instance, MockSuccessTask) + + return d + + def test_schedule_failing_ten_tasks(self): + all_done = [] + for x in range(10): + mock_task = MockFailTask() + all_done.append(mock_task.done) + self.taskManager.schedule(mock_task) + + d = defer.DeferredList(all_done) + @d.addCallback + def done(res): + # 10*2 because 2 is the number of retries + self.assertEqual(len(self.taskManager.failures), 10*3) + for task_result, task_instance in self.taskManager.failures: + self.assertEqual(task_result, mockFailure) + self.assertIsInstance(task_instance, MockFailTask) + + return d + + def test_schedule_successful_27_tasks(self): + all_done = [] + for x in range(27): + mock_task = MockSuccessTask() + all_done.append(mock_task.done) + self.taskManager.schedule(mock_task) + + d = defer.DeferredList(all_done) + @d.addCallback + def done(res): + for task_result, task_instance in self.taskManager.successes: + self.assertEqual(task_result, 42) + self.assertIsInstance(task_instance, MockSuccessTask) + + return d + + def test_schedule_failing_27_tasks(self): + all_done = [] + for x in range(27): + mock_task = MockFailTask() + all_done.append(mock_task.done) + self.taskManager.schedule(mock_task) + + d = defer.DeferredList(all_done) + @d.addCallback + def done(res): + # 10*2 because 2 is the number of retries + self.assertEqual(len(self.taskManager.failures), 27*3) + for task_result, task_instance in self.taskManager.failures: + self.assertEqual(task_result, mockFailure) + self.assertIsInstance(task_instance, MockFailTask) + + return d + + + def test_task_retry_and_succeed(self): + mock_task = MockFailOnceTask() + self.taskManager.schedule(mock_task) + + @mock_task.done.addCallback + def done(res): + self.assertEqual(len(self.taskManager.failures), 1) + + self.assertEqual(self.taskManager.failures, + [(mockFailure, mock_task)]) + self.assertEqual(self.taskManager.successes, + [(42, mock_task)]) + + return mock_task.done + + def test_task_retry_and_succeed_56_tasks(self): + all_done = [] + for x in range(56): + mock_task = MockFailOnceTask() + all_done.append(mock_task.done) + self.taskManager.schedule(mock_task) + + d = defer.DeferredList(all_done) + @d.addCallback + def done(res): + self.assertEqual(len(self.taskManager.failures), 56) + + for task_result, task_instance in self.taskManager.successes: + self.assertEqual(task_result, 42) + self.assertIsInstance(task_instance, MockFailOnceTask) + + return d +
tor-commits@lists.torproject.org