commit 003337ee7d241e18637ad5d67099d06cb0b4e9e8 Author: juga0 juga@riseup.net Date: Sat Mar 14 18:27:44 2020 +0000
chg: timestamps: Add module to manage datetime sequences --- sbws/util/timestamps.py | 89 ++++++++++++++++++++++++++++++++++++++ tests/unit/util/test_timestamps.py | 44 +++++++++++++++++++ 2 files changed, 133 insertions(+)
diff --git a/sbws/util/timestamps.py b/sbws/util/timestamps.py new file mode 100644 index 0000000..393b30b --- /dev/null +++ b/sbws/util/timestamps.py @@ -0,0 +1,89 @@ +"""Util classes to manipulate sequences of datetime timestamps. + +Optionally update also a state file. + +""" +# Workarounds to store datetimes for objects because they are not compossed +# by other objects nor stored in a database with a creation datetime. +import collections +from datetime import datetime, timedelta +import logging + +from sbws.util.timestamp import is_old + +log = logging.getLogger(__name__) + + +class DateTimeSeq(collections.deque): + """Store and manage a datetime sequence and optionally a state file.""" + + def __init__(self, iterable=[], maxlen=None, state=None, state_key=None): + self._maxlen = maxlen + self._items = collections.deque(iterable, maxlen) + self._state = state + self._state_key = state_key + + def _remove_old(self): + self._items = collections.deque( + filter(lambda x: not is_old(x), self._items), maxlen=self._maxlen + ) + + def update(self, dt=None): + self._remove_old() + self._items.append(dt or datetime.utcnow().replace(microsecond=0)) + if self._state is not None and self._state_key: + self._state[self._state_key] = list(self._items) + return list(self._items) + + def last(self): + if len(self._items) > 0: + return self._items[-1] + return datetime.utcnow().replace(microsecond=0) - timedelta(hour=1) + + def list(self): + return list(self._items) + + def __len__(self): + return len(self._items) + + +class DateTimeIntSeq(collections.deque): + """ + Store and manage a sequence of lists composed of a datetime and an int. + + Optionally store and manage an state file. + """ + + def __init__(self, iterable=[], maxlen=None, state=None, state_key=None): + self._maxlen = maxlen + self._items = collections.deque(iterable, maxlen) + self._state = state + self._state_key = state_key + + def _remove_old(self): + self._items = collections.deque( + filter(lambda x: not is_old(x[0]), self._items), + maxlen=self._maxlen, + ) + + def update(self, dt=None, number=0): + self._remove_old() + # Because json serializes tuples to lists, use list instead of tuple + # to facilitate comparisons. + self._items.append( + [dt or datetime.utcnow().replace(microsecond=0), number] + ) + if self._state is not None and self._state_key: + self._state[self._state_key] = list(self._items) + return list(self._items) + + def last(self): + if len(self._items) > 0: + return self._items[-1] + return datetime.utcnow().replace(microsecond=0) - timedelta(hour=1) + + def list(self): + return list(self._items) + + def __len__(self): + return sum(map(lambda x: x[1], self._items)) diff --git a/tests/unit/util/test_timestamps.py b/tests/unit/util/test_timestamps.py new file mode 100644 index 0000000..aef0501 --- /dev/null +++ b/tests/unit/util/test_timestamps.py @@ -0,0 +1,44 @@ +"""timestamps.py unit tests.""" + +from datetime import datetime, timedelta + +from sbws.util.state import State +from sbws.util.timestamps import ( + DateTimeSeq, + DateTimeIntSeq, +) + + +def test_update_datetime_seq(conf): + now = datetime.utcnow().replace(microsecond=0) + state = State(conf["paths"]["state_fpath"]) + # Create a list of 6 datetimes that started 6 days in the past. + dts = [now - timedelta(days=x) for x in range(6, 0, -1)] + dt_seq = DateTimeSeq( + dts, state=state, state_key="recent_measurement_attempt" + ) + new_dts = dt_seq.update() + # The updated list will not contain the 2 first (left) datetimes and it + # will have one last timestamp (right). + assert new_dts[:-1] == dts[2:] + assert 5 == state.count("recent_measurement_attempt") + assert 5 == len(dt_seq) + + +def test_update_datetime_int_seq(conf): + now = datetime.utcnow().replace(microsecond=0) + state = State(conf["paths"]["state_fpath"]) + # Create a list of 6 datetimes that started 6 days in the past. + dts = [[now - timedelta(days=x), 2] for x in range(6, 0, -1)] + dt_seq = DateTimeIntSeq( + dts, state=state, state_key="recent_measurement_attempt" + ) + new_dts = dt_seq.update() + # The updated list will not contain the 2 first (left) tuples and it + # will have one last tuple (right). + # The last tuple have 0 as the integer, instead of 2, so the count will be + # 2 * 4 = 8 + assert new_dts[:-1] == dts[2:] + assert 8 == state.count("recent_measurement_attempt") + # And `len` should return the same. + assert 8 == len(dt_seq)