commit 4865ade8dc39d924e6a14e00c9b5a9e741f50abc Author: Arturo Filastò arturo@filasto.net Date: Mon Sep 12 14:24:41 2016 +0200
Add more unittests for the scheduler
* Make the logic for triggering scheduled deck tasks more robust --- ooni/agent/scheduler.py | 49 ++++++++++++++++++++++++++++++++++---------- ooni/tests/test_scheduler.py | 28 ++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 12 deletions(-)
diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py index 98b395b..4002369 100644 --- a/ooni/agent/scheduler.py +++ b/ooni/agent/scheduler.py @@ -1,6 +1,7 @@ import os import errno
+from hashlib import md5 from datetime import datetime
from twisted.application import service @@ -41,6 +42,10 @@ class FileSystemlockAndMutex(object): self._fs_lock.unlock() self._mutex.release()
+ @property + def locked(self): + return self._mutex.locked or self._fs_lock.locked + # We use this date to indicate that the scheduled task has never run. # Easter egg, try to see what is special about this date :)? CANARY_DATE = datetime(1957, 8, 4) @@ -68,6 +73,9 @@ class ScheduledTask(object): FilePath(scheduler_directory).child(self.identifier + ".lock").path )
+ def cancel(self): + self._last_run_lock.release() + @property def should_run(self): current_time = datetime.utcnow() @@ -233,7 +241,8 @@ class RunDeck(ScheduledTask): def __init__(self, director, deck_id, schedule): self.deck_id = deck_id self.director = director - identifier = 'run-deck-' + deck_id + # We use as identifier also the schedule time + identifier = 'run-deck-' + deck_id + '-' + md5(schedule).hexdigest() super(RunDeck, self).__init__(schedule, identifier)
@defer.inlineCallbacks @@ -316,21 +325,39 @@ class SchedulerService(service.MultiService): def schedule(self, task): self._scheduled_tasks.append(task)
- def refresh_deck_list(self): - # Deletes all the RunDeck tasks and reschedules only the ones that - # are enabled. - for scheduled_task in self._scheduled_tasks[:]: - if isinstance(scheduled_task, RunDeck): - self._scheduled_tasks.remove(scheduled_task) + def unschedule(self, task): + # We first cancel the task so the run lock is deleted + task.cancel() + self._scheduled_tasks.remove(task)
- if not config.is_initialized(): - # Disable scheduling measurements if we are not initialized. - return + def refresh_deck_list(self):
+ to_enable = [] for deck_id, deck in deck_store.list_enabled(): if deck.schedule is None: continue - self.schedule(RunDeck(self.director, deck_id, deck.schedule)) + to_enable.append((deck_id, deck.schedule)) + + # If we are not initialized we should not enable anything + if not config.is_initialized(): + to_enable = [] + + for scheduled_task in self._scheduled_tasks[:]: + if not isinstance(scheduled_task, RunDeck): + continue + + info = (scheduled_task.deck_id, scheduled_task.schedule) + if info in to_enable: + # If the task is already scheduled there is no need to + # enable it. + to_enable.remove(info) + else: + # If one of the tasks that is scheduled is no longer in the + # scheduled tasks. We should disable it. + self.unschedule(scheduled_task) + + for deck_id, schedule in to_enable: + self.schedule(RunDeck(self.director, deck_id, schedule))
def _task_did_not_run(self, failure, task): failure.trap(DidNotRun) diff --git a/ooni/tests/test_scheduler.py b/ooni/tests/test_scheduler.py index cb90a30..1350dde 100644 --- a/ooni/tests/test_scheduler.py +++ b/ooni/tests/test_scheduler.py @@ -1,11 +1,12 @@ import os import shutil +import random import tempfile
from twisted.internet import defer from twisted.trial import unittest
-from ooni.agent.scheduler import ScheduledTask, DidNotRun +from ooni.agent.scheduler import ScheduledTask, DidNotRun, FileSystemlockAndMutex
class TestScheduler(unittest.TestCase): def test_scheduled_task(self): @@ -49,3 +50,28 @@ class TestScheduler(unittest.TestCase):
self.assertEqual(dummy_st.should_run, False) shutil.rmtree(scheduler_directory) + + + @defer.inlineCallbacks + def test_filesystem_lock_and_mutex(self): + lock_dir = tempfile.mkdtemp() + lock_path = os.path.join(lock_dir, 'lock') + + lock = FileSystemlockAndMutex(lock_path) + + lock_count = 100 + unlock_count = 0 + dl = [] + for i in range(lock_count): + dl.append(lock.acquire()) + if random.choice([0, 1]) == 0: + unlock_count += 1 + lock.release() + + for i in range(lock_count - unlock_count): + lock.release() + + yield defer.DeferredList(dl) + self.assertFalse(lock.locked) + + shutil.rmtree(lock_dir)
tor-commits@lists.torproject.org