[tor-commits] [ooni-probe/master] Work with aagbsn to make the TaskManager pass all tests

isis at torproject.org isis at torproject.org
Sun Mar 10 01:57:01 UTC 2013


commit 8fc8056fa1796c3f5d096d98076aa9d0bbbe47d9
Author: Arturo Filastò <art at fuffa.org>
Date:   Sat Jan 12 16:56:47 2013 +0100

    Work with aagbsn to make the TaskManager pass all tests
    
    Tests will retry to run until the retry limit has been reached
    Test failures are properly kept track of
---
 ooni/managers.py       |   35 +++++-----
 ooni/tasks.py          |   19 ++++--
 tests/test_managers.py |  174 ++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 205 insertions(+), 23 deletions(-)

diff --git a/ooni/managers.py b/ooni/managers.py
index 26ac3f9..be11473 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -18,14 +18,11 @@ class TaskManager(object):
     failures = []
     concurrency = 10
 
+    completedTasks = 0
+
     _tasks = iter(())
     _active_tasks = []
 
-    def _tasksDone(self):
-        self.tasksDone.callback(None)
-
-        self.tasksDone = defer.Deferred()
-
     def _failed(self, failure, task):
         """
         The has failed to complete, we append it to the end of the task chain
@@ -34,14 +31,16 @@ class TaskManager(object):
         self._active_tasks.remove(task)
         self.failures.append((failure, task))
 
-        if task.failures < self.retries:
+        if task.failures <= self.retries:
             self._tasks = itertools.chain(self._tasks,
                     makeIterable(task))
-
-        self._fillSlots()
+        else:
+            task.done.callback((failure, task))
 
         self.failed(failure, task)
 
+        self._fillSlots()
+
     def _fillSlots(self):
         """
         Called on test completion and schedules measurements to be run for the
@@ -52,10 +51,9 @@ class TaskManager(object):
                 task = self._tasks.next()
                 self._run(task)
             except StopIteration:
-                self._tasksDone()
                 break
 
-    def _suceeded(self, result, task):
+    def _succeeded(self, result, task):
         """
         We have successfully completed a measurement.
         """
@@ -64,7 +62,9 @@ class TaskManager(object):
 
         self._fillSlots()
 
-        self.suceeded(result, task)
+        task.done.callback(result)
+
+        self.succeeded(result, task)
 
     def _run(self, task):
         """
@@ -73,9 +73,9 @@ class TaskManager(object):
         """
         self._active_tasks.append(task)
 
-        d = task.run()
-        d.addCallback(self.succeeded, task)
-        d.addErrback(self.failed, task)
+        d = task.start()
+        d.addCallback(self._succeeded, task)
+        d.addErrback(self._failed, task)
 
     @property
     def failedMeasurements(self):
@@ -93,13 +93,14 @@ class TaskManager(object):
         Takes as argument a single task or a task iterable and appends it to the task
         generator queue.
         """
-
         iterable = makeIterable(task_or_task_iterator)
 
         self._tasks = itertools.chain(self._tasks, iterable)
         self._fillSlots()
 
     def start(self):
+        self.failures = []
+
         self.tasksDone = defer.Deferred()
         self._fillSlots()
 
@@ -135,7 +136,7 @@ class MeasurementsManager(TaskManager):
 
     director = None
 
-    def suceeded(self, result, measurement):
+    def succeeded(self, result, measurement):
         pass
 
     def failed(self, failure, measurement):
@@ -192,7 +193,7 @@ class ReportEntryManager(object):
     def initializeTaskList(self):
         pass
 
-    def suceeded(self, result, measurement):
+    def succeeded(self, result, measurement):
         pass
 
     def failed(self, failure, measurement):
diff --git a/ooni/tasks.py b/ooni/tasks.py
index 909f3e5..55f09bd 100644
--- a/ooni/tasks.py
+++ b/ooni/tasks.py
@@ -1,24 +1,31 @@
+from twisted.internet import defer
+
 class BaseTask(object):
     _timer = None
 
     def __init__(self):
         self.running = False
         self.failures = 0
+        # This is a deferred that gets called when a test has reached it's
+        # final status, this means: all retries have been attempted or the test
+        # has successfully executed.
+        self.done = defer.Deferred()
 
     def _failed(self, failure):
         self.failures += 1
         self.failed(failure)
-        return
+        return failure
 
-    def _run(self):
+    def _succeeded(self, result):
+        self.succeeded(result)
+        return result
+
+    def start(self):
         d = self.run()
         d.addErrback(self._failed)
         d.addCallback(self._succeeded)
         return d
 
-    def _succeeded(self, result):
-        self.succeeded(result)
-
     def succeeded(self, result):
         """
         Place here the logic to handle a successful execution of the task.
@@ -113,7 +120,7 @@ class Measurement(TaskWithTimeout):
         d.addErrback(self.failure)
         return d
 
-class ReportEntry(TimedOutTask):
+class ReportEntry(TaskWithTimeout):
     def __init__(self, reporter, measurement):
         self.reporter = reporter
         self.measurement = measurement
diff --git a/tests/test_managers.py b/tests/test_managers.py
new file mode 100644
index 0000000..59eba5f
--- /dev/null
+++ b/tests/test_managers.py
@@ -0,0 +1,174 @@
+from twisted.trial import unittest
+from twisted.python import failure
+from twisted.internet import defer
+
+from ooni.tasks import BaseTask
+from ooni.managers import TaskManager
+
+
+mockFailure = failure.Failure(Exception('mock'))
+
+class MockSuccessTask(BaseTask):
+    def run(self):
+        return defer.succeed(42)
+
+class MockFailTask(BaseTask):
+    def run(self):
+        return defer.fail(mockFailure)
+
+class MockFailOnceTask(BaseTask):
+    def run(self):
+        if self.failures >= 1:
+            return defer.succeed(42)
+        else:
+            return defer.fail(mockFailure)
+
+class MockTaskManager(TaskManager):
+    def __init__(self):
+        self.successes = []
+
+    def failed(self, failure, task):
+        # print "TASK"
+        # print task
+        # print "FAILURES (%s)" % task.failures
+        # print failure
+        pass
+
+    def succeeded(self, result, task):
+        self.successes.append((result, task))
+
+class TestTaskManager(unittest.TestCase):
+    def setUp(self):
+        self.taskManager = MockTaskManager()
+        self.taskManager.concurrency = 10
+        self.taskManager.retries = 2
+
+        self.taskManager.start()
+
+    def tearDown(self):
+        pass
+
+    def test_schedule_successful_one_task(self):
+        mock_task = MockSuccessTask()
+        self.taskManager.schedule(mock_task)
+
+        @mock_task.done.addCallback
+        def done(res):
+            self.assertEqual(self.taskManager.successes,
+                    [(42, mock_task)])
+        return mock_task.done
+
+    def test_schedule_failing_one_task(self):
+        mock_task = MockFailTask()
+        self.taskManager.schedule(mock_task)
+
+        @mock_task.done.addCallback
+        def done(failure):
+            self.assertEqual(len(self.taskManager.failures), 3)
+
+            self.assertEqual(failure, (mockFailure, mock_task))
+
+        return mock_task.done
+
+    def test_schedule_successful_ten_tasks(self):
+        all_done = []
+        for x in range(10):
+            mock_task = MockSuccessTask()
+            all_done.append(mock_task.done)
+            self.taskManager.schedule(mock_task)
+
+        d = defer.DeferredList(all_done)
+        @d.addCallback
+        def done(res):
+            for task_result, task_instance in self.taskManager.successes:
+                self.assertEqual(task_result, 42)
+                self.assertIsInstance(task_instance, MockSuccessTask)
+
+        return d
+
+    def test_schedule_failing_ten_tasks(self):
+        all_done = []
+        for x in range(10):
+            mock_task = MockFailTask()
+            all_done.append(mock_task.done)
+            self.taskManager.schedule(mock_task)
+
+        d = defer.DeferredList(all_done)
+        @d.addCallback
+        def done(res):
+            # 10*2 because 2 is the number of retries
+            self.assertEqual(len(self.taskManager.failures), 10*3)
+            for task_result, task_instance in self.taskManager.failures:
+                self.assertEqual(task_result, mockFailure)
+                self.assertIsInstance(task_instance, MockFailTask)
+
+        return d
+
+    def test_schedule_successful_27_tasks(self):
+        all_done = []
+        for x in range(27):
+            mock_task = MockSuccessTask()
+            all_done.append(mock_task.done)
+            self.taskManager.schedule(mock_task)
+
+        d = defer.DeferredList(all_done)
+        @d.addCallback
+        def done(res):
+            for task_result, task_instance in self.taskManager.successes:
+                self.assertEqual(task_result, 42)
+                self.assertIsInstance(task_instance, MockSuccessTask)
+
+        return d
+
+    def test_schedule_failing_27_tasks(self):
+        all_done = []
+        for x in range(27):
+            mock_task = MockFailTask()
+            all_done.append(mock_task.done)
+            self.taskManager.schedule(mock_task)
+
+        d = defer.DeferredList(all_done)
+        @d.addCallback
+        def done(res):
+            # 10*2 because 2 is the number of retries
+            self.assertEqual(len(self.taskManager.failures), 27*3)
+            for task_result, task_instance in self.taskManager.failures:
+                self.assertEqual(task_result, mockFailure)
+                self.assertIsInstance(task_instance, MockFailTask)
+
+        return d
+
+
+    def test_task_retry_and_succeed(self):
+        mock_task = MockFailOnceTask()
+        self.taskManager.schedule(mock_task)
+
+        @mock_task.done.addCallback
+        def done(res):
+            self.assertEqual(len(self.taskManager.failures), 1)
+
+            self.assertEqual(self.taskManager.failures,
+                    [(mockFailure, mock_task)])
+            self.assertEqual(self.taskManager.successes,
+                    [(42, mock_task)])
+
+        return mock_task.done
+
+    def test_task_retry_and_succeed_56_tasks(self):
+        all_done = []
+        for x in range(56):
+            mock_task = MockFailOnceTask()
+            all_done.append(mock_task.done)
+            self.taskManager.schedule(mock_task)
+
+        d = defer.DeferredList(all_done)
+        @d.addCallback
+        def done(res):
+            self.assertEqual(len(self.taskManager.failures), 56)
+
+            for task_result, task_instance in self.taskManager.successes:
+                self.assertEqual(task_result, 42)
+                self.assertIsInstance(task_instance, MockFailOnceTask)
+
+        return d
+





More information about the tor-commits mailing list