commit ff8678565142978388e2019759d5717a90ba7982 Author: Leonid Evdokimov leon@darkk.net.ru Date: Mon Apr 3 19:37:22 2017 +0300
Smear load generated by scheduler
* Smear load generated by scheduler, see #630
It runs ASAP after the installation. It survives reboot in predictable way: avoids double-run and does not skip scheduler cycle on restart. It supports non-24/7 operations as it targets running @daily task within 2.4 hours window, so the probe that was down will likely run a task on boot and boot time is random enough for load smearing purposes.
* Schedule decks right after initialisation, see #630
That's required as refresh_deck_list() does not schedule anything if the `config` is not `is_initialized()`.
Another possible way to implement this behavior is to postpone RefreshDeckList.task run checking config.is_initialized() in should_run(). --- ooni/agent/scheduler.py | 17 +++++-- ooni/contrib/croniter.py | 6 +-- ooni/tests/test_scheduler.py | 104 ++++++++++++++++++++++++++++++++----------- ooni/ui/web/server.py | 2 +- 4 files changed, 96 insertions(+), 33 deletions(-)
diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py index 8de3b303..32edf0af 100644 --- a/ooni/agent/scheduler.py +++ b/ooni/agent/scheduler.py @@ -1,8 +1,9 @@ import os import errno +import random
from hashlib import md5 -from datetime import datetime +from datetime import datetime, timedelta
from twisted.application import service from twisted.internet import defer, reactor @@ -87,24 +88,32 @@ class ScheduledTask(object): assert self.identifier is not None, "self.identifier must be set" assert self.schedule is not None, "self.schedule must be set"
+ # XXX: both _last_run_lock and _smear_coef require that there is single + # instance of the ScheduledTask of each type identified by `identifier`. self._last_run = FilePath(scheduler_directory).child(self.identifier) self._last_run_lock = FileSystemlockAndMutex( FilePath(scheduler_directory).child(self.identifier + ".lock").path ) + self._smear_coef = random.random()
def cancel(self): """ Cancel a currently running task. If it is locked, then release the lock. """ - if self._last_run_lock.locked: - self._last_run_lock.release() - + if not self._last_run_lock.locked: + # _last_run_lock.release() will throw if we try to release it + log.err('BUG: cancelling non-locked task {} without holding lock'.format(self.identifier)) + return + # probably, cancelling the task TAKEN the lock is even worse :-) + self._last_run_lock.release()
@property def should_run(self): current_time = datetime.utcnow().replace(tzinfo=tz.tzutc()) next_cycle = croniter(self.schedule, self.last_run).get_next(datetime) + delta = (croniter(self.schedule, next_cycle).get_next(datetime) - next_cycle).total_seconds() + next_cycle = next_cycle + timedelta(seconds=delta * 0.1 * self._smear_coef) if next_cycle <= current_time: return True return False diff --git a/ooni/contrib/croniter.py b/ooni/contrib/croniter.py index 653dbbf3..f98daa98 100644 --- a/ooni/contrib/croniter.py +++ b/ooni/contrib/croniter.py @@ -157,7 +157,7 @@ class croniter(object):
def get_current(self, ret_type=None): ret_type = ret_type or self._ret_type - if ret_type == datetime.datetime: + if issubclass(ret_type, datetime.datetime): return self._timestamp_to_datetime(self.cur) return self.cur
@@ -219,7 +219,7 @@ class croniter(object):
ret_type = ret_type or self._ret_type
- if ret_type not in (float, datetime.datetime): + if not issubclass(ret_type, (float, datetime.datetime)): raise TypeError("Invalid ret_type, only 'float' or 'datetime' " "is acceptable.")
@@ -239,7 +239,7 @@ class croniter(object): result = self._calc(self.cur, expanded, is_prev) self.cur = result
- if ret_type == datetime.datetime: + if issubclass(ret_type, datetime.datetime): result = self._timestamp_to_datetime(result)
return result diff --git a/ooni/tests/test_scheduler.py b/ooni/tests/test_scheduler.py index 56a4bb40..bfd22a5d 100644 --- a/ooni/tests/test_scheduler.py +++ b/ooni/tests/test_scheduler.py @@ -274,43 +274,97 @@ class TestSchedulerService(ConfigTestCase):
mock_director = mock.MagicMock() d = defer.Deferred() - with mock.patch('ooni.agent.scheduler.deck_store', self.deck_store): - - dummy_clock = task.Clock() + dummy_clock = task.Clock() + class FakeDatetime(datetime): + @staticmethod + def utcnow(): + return datetime(2000,1,1, 7,0,0) + timedelta(seconds=dummy_clock.seconds()) + with mock.patch('ooni.agent.scheduler.deck_store', self.deck_store), \ + mock.patch('ooni.agent.scheduler.datetime', FakeDatetime): scheduler_service = SchedulerService( director=mock_director, _reactor=dummy_clock ) scheduler_service.startService() - dummy_clock.advance(30) + dummy_clock.advance(45)
- now_time = datetime.utcnow() - DT_FRMT = "%Y-%m-%dT%H:%M:%SZ" + # these tasks were run before clock was pumped + for t in scheduler_service._scheduled_tasks: + self.assertIn(t.schedule, ('@daily', '@hourly')) + with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file: + self.assertEqual(in_file.read(), '2000-01-01T07:00:00Z')
+ # that's leaping clock, it leads to immediate scheduling + dummy_clock.advance(24 * 60 * 60) for t in scheduler_service._scheduled_tasks: - with open(os.path.join(self.scheduler_directory, - t.identifier)) as in_file: - dstr = datetime.strptime(in_file.read(), - DT_FRMT).strftime("%Y-%m-%dT%H") - self.assertEqual(dstr, now_time.strftime("%Y-%m-%dT%H")) + with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file: + self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z')
- dummy_clock.advance(30) - dummy_clock.advance(30) - dummy_clock.advance(30) - dummy_clock.advance(30) - dummy_clock.advance(30) - dummy_clock.advance(30) - # Here we pretend they ran yesterday so to re-trigger the daily - # tasks + # nothing happens during an hour + dummy_clock.advance(60 * 60 - 46) + self.assertEqual(FakeDatetime.utcnow(), datetime(2000,1,2, 7,59,59)) for t in scheduler_service._scheduled_tasks: - with open(os.path.join(self.scheduler_directory, - t.identifier), 'w') as out_file: - yesterday = (now_time - timedelta(days=1, - hours=2)).strftime(DT_FRMT) - out_file.write(yesterday) - dummy_clock.advance(30) + with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file: + self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z') + + # that's ticking clock, it smears the load a bit + dummy_clock.pump([1] * 1800) + zero, hourly, daily = 0, 0, 0 + for t in scheduler_service._scheduled_tasks: + with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file: + if t.schedule == '@daily': + daily += 1 + self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z') + elif t.schedule == '@hourly': + hourly += 1 + # `:[03]0Z` is caused by scheduler resolution & ticking one second a time + last_run = in_file.read() + self.assertRegexpMatches(last_run, '^2000-01-02T08:0.:[03]0Z$') + if last_run == '2000-01-02T08:00:00Z': + zero += 1 + self.assertGreater(hourly, 0) + self.assertGreater(daily, 0) + self.assertLess(zero, hourly) + self.assertLessEqual(zero, 1) # should ALMOST never happen + + # leaping to the end of the day + dummy_clock.advance((datetime(2000,1,2, 23,59,59) - FakeDatetime.utcnow()).total_seconds()) + for t in scheduler_service._scheduled_tasks: + with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file: + if t.schedule == '@daily': + self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z') + elif t.schedule == '@hourly': + self.assertEqual(in_file.read(), '2000-01-02T23:59:59Z') + + # save ~30% of the testcase runtime while ticking through six hours + for t in scheduler_service._scheduled_tasks[:]: + if t.schedule == '@hourly': + scheduler_service.unschedule(t) + + # ticking through six hours + dummy_clock.pump([random.uniform(0, 120) for i in xrange(6*60)]) + for t in scheduler_service._scheduled_tasks: + with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file: + # randomized clock kills 30s resolution of the scheduler + self.assertRegexpMatches(in_file.read(), '^2000-01-03T0[012]:..:..Z$') + self.assertGreater(FakeDatetime.utcnow(), datetime(2000,1,3, 5,0,0)) # should be ~6:00 + + # verify, that double-run does not happen even in case of reseeding (reboot/restart) + dummy_clock.advance((datetime(2000,1,3, 23,59,59) - FakeDatetime.utcnow()).total_seconds()) + launches = {} + while FakeDatetime.utcnow() < datetime(2000,1,4, 6,0,0): + for t in scheduler_service._scheduled_tasks: + with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file: + launches.setdefault(t.identifier, set()) + launches[t.identifier].add(in_file.read()) + self.assertLessEqual(t._smear_coef, 1.0) + t._smear_coef = random.random() + dummy_clock.advance(random.uniform(0, 120)) + self.assertEqual(len(launches), len(scheduler_service._scheduled_tasks)) + self.assertEqual({k: len(v) for k, v in launches.iteritems()}, dict.fromkeys(launches.iterkeys(), 2))
# We check that the run method of the deck was called twice + # NB: That does NOT check that @daily task was called exactly twice self.mock_deck.run.assert_has_calls([ mock.call(mock_director, from_schedule=True), mock.call(mock_director, from_schedule=True) ]) diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py index fc197a12..9111078b 100644 --- a/ooni/ui/web/server.py +++ b/ooni/ui/web/server.py @@ -340,8 +340,8 @@ class WebUIAPI(object): except DeckNotFound: raise WebUIError(404, 'Deck not found')
- self.scheduler.refresh_deck_list() config.set_initialized() + self.scheduler.refresh_deck_list()
self._is_initialized = True
tor-commits@lists.torproject.org