[tor-commits] [nyx/master] Make deduplication a constant time operation

atagar at torproject.org atagar at torproject.org
Fri Sep 22 22:02:09 UTC 2017


commit 4acc68d5318776e601d0e06b600f2817ea4d7043
Author: Damian Johnson <atagar at torproject.org>
Date:   Fri Sep 22 12:45:11 2017 -0700

    Make deduplication a constant time operation
    
    At only INFO runlevel cpu usage on my relay jumps from 13% to 20%. This is
    entirely coming from deduplication. We made this far more efficient (linear
    from n^2) but it's pretty easy to finish dropping this down to a constant
    time operation.
---
 nyx/log.py               | 55 ++++++++++++++++++++++--------------------------
 nyx/panel/log.py         |  4 ++--
 test/log/log_entry.py    | 24 +++++++++++----------
 test/log/log_group.py    | 15 ++++++++++---
 web/changelog/index.html |  2 +-
 5 files changed, 53 insertions(+), 47 deletions(-)

diff --git a/nyx/log.py b/nyx/log.py
index 0452891..f1a4ff0 100644
--- a/nyx/log.py
+++ b/nyx/log.py
@@ -56,6 +56,7 @@ except ImportError:
 TOR_RUNLEVELS = ['DEBUG', 'INFO', 'NOTICE', 'WARN', 'ERR']
 NYX_RUNLEVELS = ['NYX_DEBUG', 'NYX_INFO', 'NYX_NOTICE', 'NYX_WARNING', 'NYX_ERROR']
 TIMEZONE_OFFSET = time.altzone if time.localtime()[8] else time.timezone
+GROUP_BY_DAY = True
 
 
 def day_count(timestamp):
@@ -218,23 +219,15 @@ class LogGroup(object):
   and supports deduplication.
   """
 
-  def __init__(self, max_size, group_by_day = False):
+  def __init__(self, max_size):
     self._max_size = max_size
-    self._group_by_day = group_by_day
     self._entries = []
+    self._dedup_map = {}  # dedup key => most recent entry
     self._lock = threading.RLock()
 
   def add(self, entry):
     with self._lock:
-      duplicate = None
-      our_day = entry.day_count()
-
-      for existing_entry in self._entries:
-        if self._group_by_day and our_day != existing_entry.day_count():
-          break
-        elif entry.is_duplicate_of(existing_entry):
-          duplicate = existing_entry
-          break
+      duplicate = self._dedup_map.get(entry.dedup_key, None)
 
       if duplicate:
         if not duplicate.duplicates:
@@ -245,6 +238,7 @@ class LogGroup(object):
         entry.duplicates.insert(0, entry)
 
       self._entries.insert(0, entry)
+      self._dedup_map[entry.dedup_key] = entry
 
       while len(self._entries) > self._max_size:
         self.pop()
@@ -259,9 +253,12 @@ class LogGroup(object):
       if last_entry.is_duplicate:
         last_entry.duplicates.pop()
 
+      if self._dedup_map.get(last_entry.dedup_key, None) == last_entry:
+        del self._dedup_map[last_entry.dedup_key]
+
   def clone(self):
     with self._lock:
-      copy = LogGroup(self._max_size, self._group_by_day)
+      copy = LogGroup(self._max_size)
       copy._entries = [entry.clone() for entry in self._entries]
       return copy
 
@@ -287,6 +284,7 @@ class LogEntry(object):
   :var str message: event's message
   :var str display_message: message annotated with our time and runlevel
 
+  :var str dedup_key: key that can be used for deduplication
   :var bool is_duplicate: true if this matches other messages in the group and
     isn't the first
   :var list duplicates: messages that are identical to this one
@@ -303,37 +301,34 @@ class LogEntry(object):
     self.is_duplicate = False
     self.duplicates = None
 
-  @lru_cache()
-  def is_duplicate_of(self, entry):
-    """
-    Checks if we are a duplicate of the given message or not.
+    if GROUP_BY_DAY:
+      self.dedup_key = '%s:%s:%s' % (self.type, self.day_count(), self._message_dedup_key())
+    else:
+      self.dedup_key = '%s:%s' % (self.type, self._message_dedup_key())
 
-    :returns: **True** if the given log message is a duplicate of us and **False** otherwise
+  def _message_dedup_key(self):
     """
+    Provides key we can use for deduplication for the message portion of our entry.
 
-    if self.type != entry.type:
-      return False
-    elif self.message == entry.message:
-      return True
+    :returns: **str** key for deduplication purposes
+    """
 
-    if self.type == 'NYX_DEBUG' and 'runtime:' in self.message and 'runtime:' in entry.message:
+    if self.type == 'NYX_DEBUG' and 'runtime:' in self.message:
       # most nyx debug messages show runtimes so try matching without that
-
-      if self.message[:self.message.find('runtime:')] == entry.message[:self.message.find('runtime:')]:
-        return True
+      return self.message[:self.message.find('runtime:')]
 
     for common_msg in _common_log_messages().get(self.type, []):
       # if it starts with an asterisk then check the whole message rather
       # than just the start
 
       if common_msg[0] == '*':
-        if common_msg[1:] in self.message and common_msg[1:] in entry.message:
-          return True
+        if common_msg[1:] in self.message:
+          return common_msg
       else:
-        if self.message.startswith(common_msg) and entry.message.startswith(common_msg):
-          return True
+        if self.message.startswith(common_msg):
+          return common_msg
 
-    return False
+    return self.message
 
   def day_count(self):
     """
diff --git a/nyx/panel/log.py b/nyx/panel/log.py
index d364470..4a6afe3 100644
--- a/nyx/panel/log.py
+++ b/nyx/panel/log.py
@@ -77,7 +77,7 @@ class LogPanel(nyx.panel.DaemonPanel):
       logged_events = ['NOTICE', 'WARN', 'ERR', 'NYX_NOTICE', 'NYX_WARNING', 'NYX_ERROR']
       log.warn("Your --log argument had the following events tor doesn't recognize: %s" % ', '.join(invalid_events))
 
-    self._event_log = nyx.log.LogGroup(CONFIG['max_log_size'], group_by_day = True)
+    self._event_log = nyx.log.LogGroup(CONFIG['max_log_size'])
     self._event_log_paused = None
     self._event_types = nyx.log.listen_for_events(self._register_tor_event, logged_events)
     self._log_file = nyx.log.LogFileOutput(CONFIG['write_logs_to'])
@@ -152,7 +152,7 @@ class LogPanel(nyx.panel.DaemonPanel):
     Clears the contents of the event log.
     """
 
-    self._event_log = nyx.log.LogGroup(CONFIG['max_log_size'], group_by_day = True)
+    self._event_log = nyx.log.LogGroup(CONFIG['max_log_size'])
     self.redraw()
 
   def save_snapshot(self, path):
diff --git a/test/log/log_entry.py b/test/log/log_entry.py
index ffac7d3..a5b6158 100644
--- a/test/log/log_entry.py
+++ b/test/log/log_entry.py
@@ -1,27 +1,29 @@
 import unittest
 
+import nyx.log
+
 from nyx.log import LogEntry
 
 
 class TestLogEntry(unittest.TestCase):
-  def test_deduplication_matches_identical_messages(self):
-    # Simple case is that we match the same message but different timestamp.
-
-    entry = LogEntry(1333738434, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')
-    self.assertTrue(entry.is_duplicate_of(LogEntry(1333738457, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')))
+  def setUp(self):
+    nyx.log.GROUP_BY_DAY = False
 
-    # ... but we shouldn't match if the runlevel differs.
+  def tearDown(self):
+    nyx.log.GROUP_BY_DAY = True
 
-    self.assertFalse(entry.is_duplicate_of(LogEntry(1333738457, 'DEBUG', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')))
+  def test_dedup_key_by_messages(self):
+    entry = LogEntry(1333738434, 'INFO', 'tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"')
+    self.assertEqual('INFO:tor_lockfile_lock(): Locking "/home/atagar/.tor/lock"', entry.dedup_key)
 
-  def test_deduplication_matches_based_on_prefix(self):
+  def test_dedup_key_by_prefix(self):
     # matches using a prefix specified in dedup.cfg
 
     entry = LogEntry(1333738434, 'NYX_DEBUG', 'GETCONF MyFamily (runtime: 0.0007)')
-    self.assertTrue(entry.is_duplicate_of(LogEntry(1333738457, 'NYX_DEBUG', 'GETCONF MyFamily (runtime: 0.0015)')))
+    self.assertEqual('NYX_DEBUG:GETCONF MyFamily (', entry.dedup_key)
 
-  def test_deduplication_matches_with_wildcard(self):
+  def test_dedup_key_with_wildcard(self):
     # matches using a wildcard specified in dedup.cfg
 
     entry = LogEntry(1333738434, 'NOTICE', 'Bootstrapped 72%: Loading relay descriptors.')
-    self.assertTrue(entry.is_duplicate_of(LogEntry(1333738457, 'NOTICE', 'Bootstrapped 55%: Loading relay descriptors.')))
+    self.assertEqual('NOTICE:*Loading relay descriptors.', entry.dedup_key)
diff --git a/test/log/log_group.py b/test/log/log_group.py
index 9a10d13..da2ae9e 100644
--- a/test/log/log_group.py
+++ b/test/log/log_group.py
@@ -1,10 +1,18 @@
 import os
 import unittest
 
-from nyx.log import LogGroup, LogEntry, read_tor_log
+import nyx.log
+
+from nyx.log import LogGroup, LogEntry
 
 
 class TestLogGroup(unittest.TestCase):
+  def setUp(self):
+    nyx.log.GROUP_BY_DAY = False
+
+  def tearDown(self):
+    nyx.log.GROUP_BY_DAY = True
+
   def test_maintains_certain_size(self):
     group = LogGroup(5)
     self.assertEqual(0, len(group))
@@ -88,10 +96,11 @@ class TestLogGroup(unittest.TestCase):
     self.assertEqual([False, False, True, True, False], [e.is_duplicate for e in group_items])
 
   def test_deduplication_with_daybreaks(self):
-    group = LogGroup(100, group_by_day = True)
+    nyx.log.GROUP_BY_DAY = True
+    group = LogGroup(100)
     test_log_path = os.path.join(os.path.dirname(__file__), 'data', 'daybreak_deduplication')
 
-    for entry in reversed(list(read_tor_log(test_log_path))):
+    for entry in reversed(list(nyx.log.read_tor_log(test_log_path))):
       group.add(entry)
 
     # Entries should consist of two days of results...
diff --git a/web/changelog/index.html b/web/changelog/index.html
index 686ec0e..163f742 100644
--- a/web/changelog/index.html
+++ b/web/changelog/index.html
@@ -81,7 +81,7 @@
 
         <li><span class="component">Logging</span>
           <ul>
-            <li>Order of magnitude more performant log deduplication</li>
+            <li>Reduced log deduplication from an O(n^2) operation to constant time</li>
           </ul>
         </li>
 



More information about the tor-commits mailing list