commit ff8678565142978388e2019759d5717a90ba7982
Author: Leonid Evdokimov <leon(a)darkk.net.ru>
Date: Mon Apr 3 19:37:22 2017 +0300
Smear load generated by scheduler
* Smear load generated by scheduler, see #630
It runs ASAP after the installation. It survives reboot in predictable
way: avoids double-run and does not skip scheduler cycle on restart.
It supports non-24/7 operations as it targets running @daily task within
2.4 hours window, so the probe that was down will likely run a task
on boot and boot time is random enough for load smearing purposes.
* Schedule decks right after initialisation, see #630
That's required as refresh_deck_list() does not schedule anything if the
`config` is not `is_initialized()`.
Another possible way to implement this behavior is to postpone
RefreshDeckList.task run checking config.is_initialized() in
should_run().
---
ooni/agent/scheduler.py | 17 +++++--
ooni/contrib/croniter.py | 6 +--
ooni/tests/test_scheduler.py | 104 ++++++++++++++++++++++++++++++++-----------
ooni/ui/web/server.py | 2 +-
4 files changed, 96 insertions(+), 33 deletions(-)
diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py
index 8de3b303..32edf0af 100644
--- a/ooni/agent/scheduler.py
+++ b/ooni/agent/scheduler.py
@@ -1,8 +1,9 @@
import os
import errno
+import random
from hashlib import md5
-from datetime import datetime
+from datetime import datetime, timedelta
from twisted.application import service
from twisted.internet import defer, reactor
@@ -87,24 +88,32 @@ class ScheduledTask(object):
assert self.identifier is not None, "self.identifier must be set"
assert self.schedule is not None, "self.schedule must be set"
+ # XXX: both _last_run_lock and _smear_coef require that there is single
+ # instance of the ScheduledTask of each type identified by `identifier`.
self._last_run = FilePath(scheduler_directory).child(self.identifier)
self._last_run_lock = FileSystemlockAndMutex(
FilePath(scheduler_directory).child(self.identifier + ".lock").path
)
+ self._smear_coef = random.random()
def cancel(self):
"""
Cancel a currently running task.
If it is locked, then release the lock.
"""
- if self._last_run_lock.locked:
- self._last_run_lock.release()
-
+ if not self._last_run_lock.locked:
+ # _last_run_lock.release() will throw if we try to release it
+ log.err('BUG: cancelling non-locked task {} without holding lock'.format(self.identifier))
+ return
+ # probably, cancelling the task TAKEN the lock is even worse :-)
+ self._last_run_lock.release()
@property
def should_run(self):
current_time = datetime.utcnow().replace(tzinfo=tz.tzutc())
next_cycle = croniter(self.schedule, self.last_run).get_next(datetime)
+ delta = (croniter(self.schedule, next_cycle).get_next(datetime) - next_cycle).total_seconds()
+ next_cycle = next_cycle + timedelta(seconds=delta * 0.1 * self._smear_coef)
if next_cycle <= current_time:
return True
return False
diff --git a/ooni/contrib/croniter.py b/ooni/contrib/croniter.py
index 653dbbf3..f98daa98 100644
--- a/ooni/contrib/croniter.py
+++ b/ooni/contrib/croniter.py
@@ -157,7 +157,7 @@ class croniter(object):
def get_current(self, ret_type=None):
ret_type = ret_type or self._ret_type
- if ret_type == datetime.datetime:
+ if issubclass(ret_type, datetime.datetime):
return self._timestamp_to_datetime(self.cur)
return self.cur
@@ -219,7 +219,7 @@ class croniter(object):
ret_type = ret_type or self._ret_type
- if ret_type not in (float, datetime.datetime):
+ if not issubclass(ret_type, (float, datetime.datetime)):
raise TypeError("Invalid ret_type, only 'float' or 'datetime' "
"is acceptable.")
@@ -239,7 +239,7 @@ class croniter(object):
result = self._calc(self.cur, expanded, is_prev)
self.cur = result
- if ret_type == datetime.datetime:
+ if issubclass(ret_type, datetime.datetime):
result = self._timestamp_to_datetime(result)
return result
diff --git a/ooni/tests/test_scheduler.py b/ooni/tests/test_scheduler.py
index 56a4bb40..bfd22a5d 100644
--- a/ooni/tests/test_scheduler.py
+++ b/ooni/tests/test_scheduler.py
@@ -274,43 +274,97 @@ class TestSchedulerService(ConfigTestCase):
mock_director = mock.MagicMock()
d = defer.Deferred()
- with mock.patch('ooni.agent.scheduler.deck_store', self.deck_store):
-
- dummy_clock = task.Clock()
+ dummy_clock = task.Clock()
+ class FakeDatetime(datetime):
+ @staticmethod
+ def utcnow():
+ return datetime(2000,1,1, 7,0,0) + timedelta(seconds=dummy_clock.seconds())
+ with mock.patch('ooni.agent.scheduler.deck_store', self.deck_store), \
+ mock.patch('ooni.agent.scheduler.datetime', FakeDatetime):
scheduler_service = SchedulerService(
director=mock_director,
_reactor=dummy_clock
)
scheduler_service.startService()
- dummy_clock.advance(30)
+ dummy_clock.advance(45)
- now_time = datetime.utcnow()
- DT_FRMT = "%Y-%m-%dT%H:%M:%SZ"
+ # these tasks were run before clock was pumped
+ for t in scheduler_service._scheduled_tasks:
+ self.assertIn(t.schedule, ('@daily', '@hourly'))
+ with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+ self.assertEqual(in_file.read(), '2000-01-01T07:00:00Z')
+ # that's leaping clock, it leads to immediate scheduling
+ dummy_clock.advance(24 * 60 * 60)
for t in scheduler_service._scheduled_tasks:
- with open(os.path.join(self.scheduler_directory,
- t.identifier)) as in_file:
- dstr = datetime.strptime(in_file.read(),
- DT_FRMT).strftime("%Y-%m-%dT%H")
- self.assertEqual(dstr, now_time.strftime("%Y-%m-%dT%H"))
+ with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+ self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z')
- dummy_clock.advance(30)
- dummy_clock.advance(30)
- dummy_clock.advance(30)
- dummy_clock.advance(30)
- dummy_clock.advance(30)
- dummy_clock.advance(30)
- # Here we pretend they ran yesterday so to re-trigger the daily
- # tasks
+ # nothing happens during an hour
+ dummy_clock.advance(60 * 60 - 46)
+ self.assertEqual(FakeDatetime.utcnow(), datetime(2000,1,2, 7,59,59))
for t in scheduler_service._scheduled_tasks:
- with open(os.path.join(self.scheduler_directory,
- t.identifier), 'w') as out_file:
- yesterday = (now_time - timedelta(days=1,
- hours=2)).strftime(DT_FRMT)
- out_file.write(yesterday)
- dummy_clock.advance(30)
+ with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+ self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z')
+
+ # that's ticking clock, it smears the load a bit
+ dummy_clock.pump([1] * 1800)
+ zero, hourly, daily = 0, 0, 0
+ for t in scheduler_service._scheduled_tasks:
+ with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+ if t.schedule == '@daily':
+ daily += 1
+ self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z')
+ elif t.schedule == '@hourly':
+ hourly += 1
+ # `:[03]0Z` is caused by scheduler resolution & ticking one second a time
+ last_run = in_file.read()
+ self.assertRegexpMatches(last_run, '^2000-01-02T08:0.:[03]0Z$')
+ if last_run == '2000-01-02T08:00:00Z':
+ zero += 1
+ self.assertGreater(hourly, 0)
+ self.assertGreater(daily, 0)
+ self.assertLess(zero, hourly)
+ self.assertLessEqual(zero, 1) # should ALMOST never happen
+
+ # leaping to the end of the day
+ dummy_clock.advance((datetime(2000,1,2, 23,59,59) - FakeDatetime.utcnow()).total_seconds())
+ for t in scheduler_service._scheduled_tasks:
+ with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+ if t.schedule == '@daily':
+ self.assertEqual(in_file.read(), '2000-01-02T07:00:45Z')
+ elif t.schedule == '@hourly':
+ self.assertEqual(in_file.read(), '2000-01-02T23:59:59Z')
+
+ # save ~30% of the testcase runtime while ticking through six hours
+ for t in scheduler_service._scheduled_tasks[:]:
+ if t.schedule == '@hourly':
+ scheduler_service.unschedule(t)
+
+ # ticking through six hours
+ dummy_clock.pump([random.uniform(0, 120) for i in xrange(6*60)])
+ for t in scheduler_service._scheduled_tasks:
+ with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+ # randomized clock kills 30s resolution of the scheduler
+ self.assertRegexpMatches(in_file.read(), '^2000-01-03T0[012]:..:..Z$')
+ self.assertGreater(FakeDatetime.utcnow(), datetime(2000,1,3, 5,0,0)) # should be ~6:00
+
+ # verify, that double-run does not happen even in case of reseeding (reboot/restart)
+ dummy_clock.advance((datetime(2000,1,3, 23,59,59) - FakeDatetime.utcnow()).total_seconds())
+ launches = {}
+ while FakeDatetime.utcnow() < datetime(2000,1,4, 6,0,0):
+ for t in scheduler_service._scheduled_tasks:
+ with open(os.path.join(self.scheduler_directory, t.identifier)) as in_file:
+ launches.setdefault(t.identifier, set())
+ launches[t.identifier].add(in_file.read())
+ self.assertLessEqual(t._smear_coef, 1.0)
+ t._smear_coef = random.random()
+ dummy_clock.advance(random.uniform(0, 120))
+ self.assertEqual(len(launches), len(scheduler_service._scheduled_tasks))
+ self.assertEqual({k: len(v) for k, v in launches.iteritems()}, dict.fromkeys(launches.iterkeys(), 2))
# We check that the run method of the deck was called twice
+ # NB: That does NOT check that @daily task was called exactly twice
self.mock_deck.run.assert_has_calls([
mock.call(mock_director, from_schedule=True), mock.call(mock_director, from_schedule=True)
])
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index fc197a12..9111078b 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -340,8 +340,8 @@ class WebUIAPI(object):
except DeckNotFound:
raise WebUIError(404, 'Deck not found')
- self.scheduler.refresh_deck_list()
config.set_initialized()
+ self.scheduler.refresh_deck_list()
self._is_initialized = True