[tor-commits] [ooni-probe/master] Work on TaskManager and write tests for it

isis at torproject.org isis at torproject.org
Sun Mar 10 01:57:01 UTC 2013


commit f621794d11c3dda37edbccf82a56db1025e348c7
Author: Arturo Filastò <art at fuffa.org>
Date:   Sat Jan 12 14:58:11 2013 +0100

    Work on TaskManager and write tests for it
    
    Changes to the API of the MeasurementManager and ReportManager
---
 ooni/managers.py      |   47 +++++++++++++++++++++++------------------------
 tests/test_manager.py |    5 +++++
 2 files changed, 28 insertions(+), 24 deletions(-)

diff --git a/ooni/managers.py b/ooni/managers.py
index f6d4c74..26ac3f9 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -18,9 +18,14 @@ class TaskManager(object):
     failures = []
     concurrency = 10
 
-    _tasks = iter()
+    _tasks = iter(())
     _active_tasks = []
 
+    def _tasksDone(self):
+        self.tasksDone.callback(None)
+
+        self.tasksDone = defer.Deferred()
+
     def _failed(self, failure, task):
         """
         The has failed to complete, we append it to the end of the task chain
@@ -31,9 +36,9 @@ class TaskManager(object):
 
         if task.failures < self.retries:
             self._tasks = itertools.chain(self._tasks,
-                    iter(task))
+                    makeIterable(task))
 
-        self.fillSlots()
+        self._fillSlots()
 
         self.failed(failure, task)
 
@@ -42,11 +47,12 @@ class TaskManager(object):
         Called on test completion and schedules measurements to be run for the
         available slots.
         """
-        for _ in range(self.availableSlots()):
+        for _ in range(self.availableSlots):
             try:
                 task = self._tasks.next()
                 self._run(task)
             except StopIteration:
+                self._tasksDone()
                 break
 
     def _suceeded(self, result, task):
@@ -56,7 +62,7 @@ class TaskManager(object):
         self._active_tasks.remove(task)
         self.completedTasks += 1
 
-        self.fillSlots()
+        self._fillSlots()
 
         self.suceeded(result, task)
 
@@ -68,13 +74,14 @@ class TaskManager(object):
         self._active_tasks.append(task)
 
         d = task.run()
-        d.addCallback(self.succeeded)
-        d.addCallback(self.failed)
+        d.addCallback(self.succeeded, task)
+        d.addErrback(self.failed, task)
 
     @property
     def failedMeasurements(self):
         return len(self.failures)
 
+    @property
     def availableSlots(self):
         """
         Returns the number of available slots for running tests.
@@ -86,19 +93,15 @@ class TaskManager(object):
         Takes as argument a single task or a task iterable and appends it to the task
         generator queue.
         """
-        self._tasks = itertools.chain(self._tasks, task_or_task_iterator)
 
-    def start(self):
-        self.initializeTaskList()
+        iterable = makeIterable(task_or_task_iterator)
+
+        self._tasks = itertools.chain(self._tasks, iterable)
         self._fillSlots()
 
-    def initializeTaskList(self):
-        """
-        This should contain all the logic that gets run at first start to
-        pre-populate the list of tasks to be run and the tasks currently
-        running.
-        """
-        raise NotImplemented
+    def start(self):
+        self.tasksDone = defer.Deferred()
+        self._fillSlots()
 
     def failed(self, failure, task):
         """
@@ -132,13 +135,6 @@ class MeasurementsManager(TaskManager):
 
     director = None
 
-    def __init__(self, netTests=None):
-        self.netTests = netTests if netTests else []
-
-    def initializeTaskList(self):
-        for net_test in self.netTests:
-            self.schedule(net_test.generateMeasurements())
-
     def suceeded(self, result, measurement):
         pass
 
@@ -146,6 +142,8 @@ class MeasurementsManager(TaskManager):
         pass
 
 class Report(object):
+    reportEntryManager = None
+
     def __init__(self, reporters, net_test):
         """
         This will instantiate all the reporters and add them to the list of
@@ -178,6 +176,7 @@ class Report(object):
             @reporter.created.addCallback
             def cb(result):
                 report_write_task = ReportWrite(reporter, measurement)
+                self.reportEntryManager.schedule(report_write_task)
 
 class ReportEntryManager(object):
 
diff --git a/tests/test_manager.py b/tests/test_manager.py
index 0ed77b0..f5ac052 100644
--- a/tests/test_manager.py
+++ b/tests/test_manager.py
@@ -65,6 +65,11 @@ class TestNetTest(unittest.TestCase):
         self.assertEqual([(DummyTestCase, 'test_a'), (DummyTestCase,
             'test_b')], net_test.test_cases)
 
+    def test_net_test_timeout(self):
+        """Instantiate a test and verify that the timeout works properly when we call it."""
+        net_test = NetTest(net_test_file, dummyInputs, dummyOptions)
+        # Where net_test_file is a test that will take longer than 
+
 class TestMeasurementsTracker(unittest.TestCase):
     def setUp(self):
         self.mock_mt = MeasurementsTracker(DummyManager())





More information about the tor-commits mailing list