commit 4acc68d5318776e601d0e06b600f2817ea4d7043 Author: Damian Johnson atagar@torproject.org Date: Fri Sep 22 12:45:11 2017 -0700
Make deduplication a constant time operation
At only INFO runlevel cpu usage on my relay jumps from 13% to 20%. This is entirely coming from deduplication. We made this far more efficient (linear from n^2) but it's pretty easy to finish dropping this down to a constant time operation. --- nyx/log.py | 55 ++++++++++++++++++++++-------------------------- nyx/panel/log.py | 4 ++-- test/log/log_entry.py | 24 +++++++++++---------- test/log/log_group.py | 15 ++++++++++--- web/changelog/index.html | 2 +- 5 files changed, 53 insertions(+), 47 deletions(-)
diff --git a/nyx/log.py b/nyx/log.py index 0452891..f1a4ff0 100644 --- a/nyx/log.py +++ b/nyx/log.py @@ -56,6 +56,7 @@ except ImportError: TOR_RUNLEVELS = ['DEBUG', 'INFO', 'NOTICE', 'WARN', 'ERR'] NYX_RUNLEVELS = ['NYX_DEBUG', 'NYX_INFO', 'NYX_NOTICE', 'NYX_WARNING', 'NYX_ERROR'] TIMEZONE_OFFSET = time.altzone if time.localtime()[8] else time.timezone +GROUP_BY_DAY = True
def day_count(timestamp): @@ -218,23 +219,15 @@ class LogGroup(object): and supports deduplication. """
- def __init__(self, max_size, group_by_day = False): + def __init__(self, max_size): self._max_size = max_size - self._group_by_day = group_by_day self._entries = [] + self._dedup_map = {} # dedup key => most recent entry self._lock = threading.RLock()
def add(self, entry): with self._lock: - duplicate = None - our_day = entry.day_count() - - for existing_entry in self._entries: - if self._group_by_day and our_day != existing_entry.day_count(): - break - elif entry.is_duplicate_of(existing_entry): - duplicate = existing_entry - break + duplicate = self._dedup_map.get(entry.dedup_key, None)
if duplicate: if not duplicate.duplicates: @@ -245,6 +238,7 @@ class LogGroup(object): entry.duplicates.insert(0, entry)
self._entries.insert(0, entry) + self._dedup_map[entry.dedup_key] = entry
while len(self._entries) > self._max_size: self.pop() @@ -259,9 +253,12 @@ class LogGroup(object): if last_entry.is_duplicate: last_entry.duplicates.pop()
+ if self._dedup_map.get(last_entry.dedup_key, None) == last_entry: + del self._dedup_map[last_entry.dedup_key] + def clone(self): with self._lock: - copy = LogGroup(self._max_size, self._group_by_day) + copy = LogGroup(self._max_size) copy._entries = [entry.clone() for entry in self._entries] return copy
@@ -287,6 +284,7 @@ class LogEntry(object): :var str message: event's message :var str display_message: message annotated with our time and runlevel
+ :var str dedup_key: key that can be used for deduplication :var bool is_duplicate: true if this matches other messages in the group and isn't the first :var list duplicates: messages that are identical to this one @@ -303,37 +301,34 @@ class LogEntry(object): self.is_duplicate = False self.duplicates = None
- @lru_cache() - def is_duplicate_of(self, entry): - """ - Checks if we are a duplicate of the given message or not. + if GROUP_BY_DAY: + self.dedup_key = '%s:%s:%s' % (self.type, self.day_count(), self._message_dedup_key()) + else: + self.dedup_key = '%s:%s' % (self.type, self._message_dedup_key())
- :returns: **True** if the given log message is a duplicate of us and **False** otherwise + def _message_dedup_key(self): """ + Provides key we can use for deduplication for the message portion of our entry.
- if self.type != entry.type: - return False - elif self.message == entry.message: - return True + :returns: **str** key for deduplication purposes + """
- if self.type == 'NYX_DEBUG' and 'runtime:' in self.message and 'runtime:' in entry.message: + if self.type == 'NYX_DEBUG' and 'runtime:' in self.message: # most nyx debug messages show runtimes so try matching without that - - if self.message[:self.message.find('runtime:')] == entry.message[:self.message.find('runtime:')]: - return True + return self.message[:self.message.find('runtime:')]
for common_msg in _common_log_messages().get(self.type, []): # if it starts with an asterisk then check the whole message rather # than just the start
if common_msg[0] == '*': - if common_msg[1:] in self.message and common_msg[1:] in entry.message: - return True + if common_msg[1:] in self.message: + return common_msg else: - if self.message.startswith(common_msg) and entry.message.startswith(common_msg): - return True + if self.message.startswith(common_msg): + return common_msg
- return False + return self.message
def day_count(self): """ diff --git a/nyx/panel/log.py b/nyx/panel/log.py index d364470..4a6afe3 100644 --- a/nyx/panel/log.py +++ b/nyx/panel/log.py @@ -77,7 +77,7 @@ class LogPanel(nyx.panel.DaemonPanel): logged_events = ['NOTICE', 'WARN', 'ERR', 'NYX_NOTICE', 'NYX_WARNING', 'NYX_ERROR'] log.warn("Your --log argument had the following events tor doesn't recognize: %s" % ', '.join(invalid_events))
- self._event_log = nyx.log.LogGroup(CONFIG['max_log_size'], group_by_day = True) + self._event_log = nyx.log.LogGroup(CONFIG['max_log_size']) self._event_log_paused = None self._event_types = nyx.log.listen_for_events(self._register_tor_event, logged_events) self._log_file = nyx.log.LogFileOutput(CONFIG['write_logs_to']) @@ -152,7 +152,7 @@ class LogPanel(nyx.panel.DaemonPanel): Clears the contents of the event log. """
- self._event_log = nyx.log.LogGroup(CONFIG['max_log_size'], group_by_day = True) + self._event_log = nyx.log.LogGroup(CONFIG['max_log_size']) self.redraw()
def save_snapshot(self, path): diff --git a/test/log/log_entry.py b/test/log/log_entry.py index ffac7d3..a5b6158 100644 --- a/test/log/log_entry.py +++ b/test/log/log_entry.py @@ -1,27 +1,29 @@ import unittest
+import nyx.log + from nyx.log import LogEntry
class TestLogEntry(unittest.TestCase): - def test_deduplication_matches_identical_messages(self): - # Simple case is that we match the same message but different timestamp. - - entry = LogEntry(1333738434, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"') - self.assertTrue(entry.is_duplicate_of(LogEntry(1333738457, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"'))) + def setUp(self): + nyx.log.GROUP_BY_DAY = False
- # ... but we shouldn't match if the runlevel differs. + def tearDown(self): + nyx.log.GROUP_BY_DAY = True
- self.assertFalse(entry.is_duplicate_of(LogEntry(1333738457, 'DEBUG', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"'))) + def test_dedup_key_by_messages(self): + entry = LogEntry(1333738434, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"') + self.assertEqual('INFO:tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"', entry.dedup_key)
- def test_deduplication_matches_based_on_prefix(self): + def test_dedup_key_by_prefix(self): # matches using a prefix specified in dedup.cfg
entry = LogEntry(1333738434, 'NYX_DEBUG', 'GETCONF MyFamily (runtime: 0.0007)') - self.assertTrue(entry.is_duplicate_of(LogEntry(1333738457, 'NYX_DEBUG', 'GETCONF MyFamily (runtime: 0.0015)'))) + self.assertEqual('NYX_DEBUG:GETCONF MyFamily (', entry.dedup_key)
- def test_deduplication_matches_with_wildcard(self): + def test_dedup_key_with_wildcard(self): # matches using a wildcard specified in dedup.cfg
entry = LogEntry(1333738434, 'NOTICE', 'Bootstrapped 72%: Loading relay descriptors.') - self.assertTrue(entry.is_duplicate_of(LogEntry(1333738457, 'NOTICE', 'Bootstrapped 55%: Loading relay descriptors.'))) + self.assertEqual('NOTICE:*Loading relay descriptors.', entry.dedup_key) diff --git a/test/log/log_group.py b/test/log/log_group.py index 9a10d13..da2ae9e 100644 --- a/test/log/log_group.py +++ b/test/log/log_group.py @@ -1,10 +1,18 @@ import os import unittest
-from nyx.log import LogGroup, LogEntry, read_tor_log +import nyx.log + +from nyx.log import LogGroup, LogEntry
class TestLogGroup(unittest.TestCase): + def setUp(self): + nyx.log.GROUP_BY_DAY = False + + def tearDown(self): + nyx.log.GROUP_BY_DAY = True + def test_maintains_certain_size(self): group = LogGroup(5) self.assertEqual(0, len(group)) @@ -88,10 +96,11 @@ class TestLogGroup(unittest.TestCase): self.assertEqual([False, False, True, True, False], [e.is_duplicate for e in group_items])
def test_deduplication_with_daybreaks(self): - group = LogGroup(100, group_by_day = True) + nyx.log.GROUP_BY_DAY = True + group = LogGroup(100) test_log_path = os.path.join(os.path.dirname(__file__), 'data', 'daybreak_deduplication')
- for entry in reversed(list(read_tor_log(test_log_path))): + for entry in reversed(list(nyx.log.read_tor_log(test_log_path))): group.add(entry)
# Entries should consist of two days of results... diff --git a/web/changelog/index.html b/web/changelog/index.html index 686ec0e..163f742 100644 --- a/web/changelog/index.html +++ b/web/changelog/index.html @@ -81,7 +81,7 @@
<li><span class="component">Logging</span> <ul> - <li>Order of magnitude more performant log deduplication</li> + <li>Reduced log deduplication from an O(n^2) operation to constant time</li> </ul> </li>