[tor-commits] [bridgedb/main] Validate opendkim headers

meskio at torproject.org meskio at torproject.org
Wed Feb 23 15:10:45 UTC 2022


commit 5bdf8a0ea8671d9b5e29f7daa8e74157c72681a8
Author: meskio <meskio at torproject.org>
Date:   Tue Feb 22 11:49:29 2022 +0100

    Validate opendkim headers
    
    OpenDKIM does produce Authentication-Results header with a different
    format. Let's validate it.
    
    Related: tpa/team#40622
---
 bridgedb/distributors/email/dkim.py | 30 ++++++++++++-------
 bridgedb/test/test_email_dkim.py    | 59 ++++++++++++++++++++++++++++---------
 2 files changed, 64 insertions(+), 25 deletions(-)

diff --git a/bridgedb/distributors/email/dkim.py b/bridgedb/distributors/email/dkim.py
index 5cee31d..c8b3378 100644
--- a/bridgedb/distributors/email/dkim.py
+++ b/bridgedb/distributors/email/dkim.py
@@ -34,6 +34,11 @@ from __future__ import unicode_literals
 
 import logging
 
+headers = {
+    "X-DKIM-Authentication-Results": lambda s: s.startswith('pass'),
+    "Authentication-Results": lambda s: 'dkim=pass' in s
+}
+
 
 def checkDKIM(message, rules):
     """Check the DKIM verification results header.
@@ -50,7 +55,7 @@ def checkDKIM(message, rules):
 
     Otherwise, returns ``True``.
 
-    :type message: :api:`twisted.mail.smtp.rfc822.Message`
+    :type message: :api:`email.message.Message`
     :param message: The incoming client request email, including headers.
     :param dict rules: The list of configured ``EMAIL_DOMAIN_RULES`` for the
         canonical domain which the client's email request originated from.
@@ -60,13 +65,16 @@ def checkDKIM(message, rules):
     logging.info("Checking DKIM verification results...")
     logging.debug("Domain has rules: %s" % ', '.join(rules))
 
-    if 'dkim' in rules:
-        # getheader() returns the last of a given kind of header; we want
-        # to get the first, so we use getheaders() instead.
-        dkimHeaders = message.get("X-DKIM-Authentication-Results")
-        dkimHeader = dkimHeaders if dkimHeaders else "<no header>"
-        if not dkimHeader.startswith("pass"):
-            logging.info("Rejecting bad DKIM header on incoming email: %r "
-                         % dkimHeader)
-            return False
-    return True
+    if 'dkim' not in rules:
+        return True
+
+    foundHeader = False
+    for headerName, validHeader in headers.items():
+        for dkimHeader in message.get_all(headerName, []):
+            foundHeader = True
+            if not validHeader(dkimHeader):
+                logging.info("Rejecting bad DKIM header on incoming email: %r "
+                             % dkimHeader)
+                return False
+
+    return foundHeader
diff --git a/bridgedb/test/test_email_dkim.py b/bridgedb/test/test_email_dkim.py
index 679575f..e36657f 100644
--- a/bridgedb/test/test_email_dkim.py
+++ b/bridgedb/test/test_email_dkim.py
@@ -25,21 +25,50 @@ class CheckDKIMTests(unittest.TestCase):
 
     def setUp(self):
         """Create fake email, distributor, and associated context data."""
-        self.goodMessage = """\
+        self.goodMessage = ["""\
 From: user at gmail.com
 To: bridges at localhost
 X-DKIM-Authentication-Results: pass
 Subject: testing
 
 get bridges
-"""
-        self.badMessage = """\
+""",
+"""\
 From: user at gmail.com
 To: bridges at localhost
+Authentication-Results: gmail.com;
+	dkim=pass (1024-bit key; secure) header.d=gmail.com header.i=@gmail.com header.a=rsa-sha256 header.s=squak header.b=ZFZSqaMU;
+	dkim-atps=neutral
 Subject: testing
 
 get bridges
-"""
+"""]
+        self.badMessage = ["""\
+From: user at gmail.com
+To: bridges at localhost
+Subject: testing
+
+get bridges
+""",
+"""\
+From: user at gmail.com
+To: bridges at localhost
+Subject: testing
+Authentication-Results: gmail.com; dkim=none; dkim-atps=neutral
+
+get bridges
+""",
+"""\
+From: user at gmail.com
+To: bridges at localhost
+Subject: testing
+Authentication-Results: gmail.com;
+	dkim=pass (1024-bit key; secure) header.d=gmail.com header.i=@gmail.com header.a=rsa-sha256 header.s=squak header.b=ZFZSqaMU;
+	dkim-atps=neutral
+Authentication-Results: gmail.com; dkim=none; dkim-atps=neutral
+
+get bridges
+"""]
         self.domainRules = {
             'gmail.com': ["ignore_dots", "dkim"],
             'example.com': [],
@@ -51,23 +80,25 @@ get bridges
         return email.message_from_string(messageString)
 
     def test_checkDKIM_good(self):
-        message = self._createMessage(self.goodMessage)
-        result = dkim.checkDKIM(message,
-                                self.domainRules.get("gmail.com"))
-        self.assertTrue(result)
+        for msg in self.goodMessage:
+            message = self._createMessage(msg)
+            result = dkim.checkDKIM(message,
+                                    self.domainRules.get("gmail.com"))
+            self.assertTrue(result, msg="not good dkim for: %s" % msg)
                                          
 
     def test_checkDKIM_bad(self):
-        message = self._createMessage(self.badMessage)
-        result = dkim.checkDKIM(message,
-                                self.domainRules.get("gmail.com"))
-        self.assertIs(result, False)
+        for msg in self.badMessage:
+            message = self._createMessage(msg)
+            result = dkim.checkDKIM(message,
+                                    self.domainRules.get("gmail.com"))
+            self.assertIs(result, False, msg="not expected good dkim for: %s" % msg)
 
     def test_checkDKIM_dunno(self):
         """A ``X-DKIM-Authentication-Results: dunno`` header should return
         False.
         """
-        messageList = self.badMessage.split('\n')
+        messageList = self.badMessage[0].split('\n')
         messageList[2] = "X-DKIM-Authentication-Results: dunno"
         message = self._createMessage('\n'.join(messageList))
         result = dkim.checkDKIM(message,
@@ -78,7 +109,7 @@ get bridges
         """A good DKIM verification header, *plus* an
         ``X-DKIM-Authentication-Results: dunno`` header should return False.
         """
-        messageList = self.badMessage.split('\n')
+        messageList = self.badMessage[0].split('\n')
         messageList.insert(2, "X-DKIM-Authentication-Results: dunno")
         message = self._createMessage('\n'.join(messageList))
         result = dkim.checkDKIM(message,



More information about the tor-commits mailing list