commit 5bdf8a0ea8671d9b5e29f7daa8e74157c72681a8 Author: meskio meskio@torproject.org Date: Tue Feb 22 11:49:29 2022 +0100
Validate opendkim headers
OpenDKIM does produce Authentication-Results header with a different format. Let's validate it.
Related: tpa/team#40622 --- bridgedb/distributors/email/dkim.py | 30 ++++++++++++------- bridgedb/test/test_email_dkim.py | 59 ++++++++++++++++++++++++++++--------- 2 files changed, 64 insertions(+), 25 deletions(-)
diff --git a/bridgedb/distributors/email/dkim.py b/bridgedb/distributors/email/dkim.py index 5cee31d..c8b3378 100644 --- a/bridgedb/distributors/email/dkim.py +++ b/bridgedb/distributors/email/dkim.py @@ -34,6 +34,11 @@ from __future__ import unicode_literals
import logging
+headers = { + "X-DKIM-Authentication-Results": lambda s: s.startswith('pass'), + "Authentication-Results": lambda s: 'dkim=pass' in s +} +
def checkDKIM(message, rules): """Check the DKIM verification results header. @@ -50,7 +55,7 @@ def checkDKIM(message, rules):
Otherwise, returns ``True``.
- :type message: :api:`twisted.mail.smtp.rfc822.Message` + :type message: :api:`email.message.Message` :param message: The incoming client request email, including headers. :param dict rules: The list of configured ``EMAIL_DOMAIN_RULES`` for the canonical domain which the client's email request originated from. @@ -60,13 +65,16 @@ def checkDKIM(message, rules): logging.info("Checking DKIM verification results...") logging.debug("Domain has rules: %s" % ', '.join(rules))
- if 'dkim' in rules: - # getheader() returns the last of a given kind of header; we want - # to get the first, so we use getheaders() instead. - dkimHeaders = message.get("X-DKIM-Authentication-Results") - dkimHeader = dkimHeaders if dkimHeaders else "<no header>" - if not dkimHeader.startswith("pass"): - logging.info("Rejecting bad DKIM header on incoming email: %r " - % dkimHeader) - return False - return True + if 'dkim' not in rules: + return True + + foundHeader = False + for headerName, validHeader in headers.items(): + for dkimHeader in message.get_all(headerName, []): + foundHeader = True + if not validHeader(dkimHeader): + logging.info("Rejecting bad DKIM header on incoming email: %r " + % dkimHeader) + return False + + return foundHeader diff --git a/bridgedb/test/test_email_dkim.py b/bridgedb/test/test_email_dkim.py index 679575f..e36657f 100644 --- a/bridgedb/test/test_email_dkim.py +++ b/bridgedb/test/test_email_dkim.py @@ -25,21 +25,50 @@ class CheckDKIMTests(unittest.TestCase):
def setUp(self): """Create fake email, distributor, and associated context data.""" - self.goodMessage = """\ + self.goodMessage = ["""\ From: user@gmail.com To: bridges@localhost X-DKIM-Authentication-Results: pass Subject: testing
get bridges -""" - self.badMessage = """\ +""", +"""\ From: user@gmail.com To: bridges@localhost +Authentication-Results: gmail.com; + dkim=pass (1024-bit key; secure) header.d=gmail.com header.i=@gmail.com header.a=rsa-sha256 header.s=squak header.b=ZFZSqaMU; + dkim-atps=neutral Subject: testing
get bridges -""" +"""] + self.badMessage = ["""\ +From: user@gmail.com +To: bridges@localhost +Subject: testing + +get bridges +""", +"""\ +From: user@gmail.com +To: bridges@localhost +Subject: testing +Authentication-Results: gmail.com; dkim=none; dkim-atps=neutral + +get bridges +""", +"""\ +From: user@gmail.com +To: bridges@localhost +Subject: testing +Authentication-Results: gmail.com; + dkim=pass (1024-bit key; secure) header.d=gmail.com header.i=@gmail.com header.a=rsa-sha256 header.s=squak header.b=ZFZSqaMU; + dkim-atps=neutral +Authentication-Results: gmail.com; dkim=none; dkim-atps=neutral + +get bridges +"""] self.domainRules = { 'gmail.com': ["ignore_dots", "dkim"], 'example.com': [], @@ -51,23 +80,25 @@ get bridges return email.message_from_string(messageString)
def test_checkDKIM_good(self): - message = self._createMessage(self.goodMessage) - result = dkim.checkDKIM(message, - self.domainRules.get("gmail.com")) - self.assertTrue(result) + for msg in self.goodMessage: + message = self._createMessage(msg) + result = dkim.checkDKIM(message, + self.domainRules.get("gmail.com")) + self.assertTrue(result, msg="not good dkim for: %s" % msg)
def test_checkDKIM_bad(self): - message = self._createMessage(self.badMessage) - result = dkim.checkDKIM(message, - self.domainRules.get("gmail.com")) - self.assertIs(result, False) + for msg in self.badMessage: + message = self._createMessage(msg) + result = dkim.checkDKIM(message, + self.domainRules.get("gmail.com")) + self.assertIs(result, False, msg="not expected good dkim for: %s" % msg)
def test_checkDKIM_dunno(self): """A ``X-DKIM-Authentication-Results: dunno`` header should return False. """ - messageList = self.badMessage.split('\n') + messageList = self.badMessage[0].split('\n') messageList[2] = "X-DKIM-Authentication-Results: dunno" message = self._createMessage('\n'.join(messageList)) result = dkim.checkDKIM(message, @@ -78,7 +109,7 @@ get bridges """A good DKIM verification header, *plus* an ``X-DKIM-Authentication-Results: dunno`` header should return False. """ - messageList = self.badMessage.split('\n') + messageList = self.badMessage[0].split('\n') messageList.insert(2, "X-DKIM-Authentication-Results: dunno") message = self._createMessage('\n'.join(messageList)) result = dkim.checkDKIM(message,
tor-commits@lists.torproject.org