commit 0467a6ea8c33a35beb315e56de3b2f5bdae605d6 Author: aagbsn aagbsn@extc.org Date: Thu Aug 22 13:00:45 2013 +0200
Link TaskManagers by Least Available Slots
A LinkedTaskManager only has availableSlots if its child TaskManager also has availableSlots. Children LinkedTaskManagers must notify the parent LinkedTaskManager when a task is complete because the task queue is event-driven. --- ooni/director.py | 4 ++++ ooni/managers.py | 28 ++++++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-)
diff --git a/ooni/director.py b/ooni/director.py index 2c48bb0..1e3f062 100644 --- a/ooni/director.py +++ b/ooni/director.py @@ -71,6 +71,10 @@ class Director(object):
self.reportEntryManager = ReportEntryManager() self.reportEntryManager.director = self + # Link the TaskManager's by least available slots. + self.measurementManager.child = self.reportEntryManager + # Notify the parent when tasks complete # XXX deadlock!? + self.reportEntryManager.parent = self.measurementManager
self.successfulMeasurements = 0 self.failedMeasurements = 0 diff --git a/ooni/managers.py b/ooni/managers.py index ff7c2f2..cc2d067 100644 --- a/ooni/managers.py +++ b/ooni/managers.py @@ -129,7 +129,31 @@ class TaskManager(object): """ raise NotImplemented
-class MeasurementManager(TaskManager): +class LinkedTaskManager(TaskManager): + def __init__(self): + super(LinkedTaskManager, self).__init__() + self.child = None + self.parent = None + + @property + def availableSlots(self): + mySlots = self.concurrency - len(self._active_tasks) + if self.child: + s = self.child.availableSlots + return min(s, mySlots) + return mySlots + + def _succeeded(self, result, task): + super(LinkedTaskManager, self)._succeeded(result, task) + if self.parent: + self.parent._fillSlots() + + def _failed(self, result, task): + super(LinkedTaskManager, self)._failed(result, task) + if self.parent: + self.parent._fillSlots() + +class MeasurementManager(LinkedTaskManager): """ This is the Measurement Tracker. In here we keep track of active measurements and issue new measurements once the active ones have been completed. @@ -155,7 +179,7 @@ class MeasurementManager(TaskManager): def failed(self, failure, measurement): pass
-class ReportEntryManager(TaskManager): +class ReportEntryManager(LinkedTaskManager): def __init__(self): if config.advanced.reporting_retries: self.retries = config.advanced.reporting_retries