commit 10306676c94a9abd4307870074407cb2fe94d63e
Author: meskio <meskio(a)torproject.org>
Date: Mon May 10 11:50:06 2021 +0200
Attach QR codes of bridges on email distributor
Refactor the code to use email.message.EmailMessage instead of the custom
EmailResponse implementation.
Fixes #40008
---
bridgedb/distributors/email/autoresponder.py | 260 +++++----------------------
bridgedb/test/test_email_autoresponder.py | 171 +++---------------
2 files changed, 78 insertions(+), 353 deletions(-)
diff --git a/bridgedb/distributors/email/autoresponder.py b/bridgedb/distributors/email/autoresponder.py
index 94a8f75..1632258 100644
--- a/bridgedb/distributors/email/autoresponder.py
+++ b/bridgedb/distributors/email/autoresponder.py
@@ -21,7 +21,7 @@ bridgedb.distributors.email.autoresponder
Functionality for autoresponding to incoming emails.
-.. inheritance-diagram:: EmailResponse SMTPAutoresponder
+.. inheritance-diagram:: SMTPAutoresponder
:parts: 1
::
@@ -31,7 +31,6 @@ Functionality for autoresponding to incoming emails.
| | how to respond.
| |_ generateResponse - Create an email response.
|
- |_ EmailResponse - Holds information for generating a response to a request.
|_ SMTPAutoresponder - An SMTP autoresponder for incoming mail.
..
"""
@@ -39,11 +38,12 @@ Functionality for autoresponding to incoming emails.
from __future__ import unicode_literals
from __future__ import print_function
-import email
import io
import logging
import time
+from email.utils import parseaddr
+from email.message import EmailMessage
from twisted.internet import defer
from twisted.internet import reactor
from twisted.mail import smtp
@@ -59,6 +59,7 @@ from bridgedb.distributors.email.distributor import TooSoonEmail
from bridgedb.distributors.email.distributor import IgnoreEmail
from bridgedb.parse import addr
from bridgedb.parse.addr import canonicalizeEmailDomain
+from bridgedb.qrcodes import generateQR
from bridgedb.util import levenshteinDistance
from bridgedb import translations
@@ -85,11 +86,12 @@ def createResponseBody(lines, context, client, lang='en'):
email to `bridges+fa(a)torproject.org
<mailto:bridges+fa@torproject.org>`__, the client should receive a
response in Farsi.
- :rtype: str
+ :rtype: (str, bytes)
:returns: ``None`` if we shouldn't respond to the client (i.e., if they
have already received a rate-limiting warning email). Otherwise,
returns a string containing the (optionally translated) body for the
- email response which we should send out.
+ email response which we should send out and the qrcode image of the
+ bridges if we provide bridges.
"""
translator = translations.installTranslations(lang)
bridges = None
@@ -102,28 +104,28 @@ def createResponseBody(lines, context, client, lang='en'):
bridges = context.distributor.getBridges(bridgeRequest, interval)
except TooSoonEmail as error:
logging.info("Got a mail too frequently: %s." % error)
- return templates.buildSpamWarning(translator, client)
+ return templates.buildSpamWarning(translator, client), None
except (IgnoreEmail, addr.BadEmail) as error:
logging.info(error)
# Don't generate a response if their email address is unparsable or
# invalid, or if we've already warned them about rate-limiting:
- return None
+ return None, None
else:
answer = "(no bridges currently available)\r\n"
+ qrcode = None
if bridges:
transport = bridgeRequest.justOnePTType()
- answer = "".join(" %s\r\n" % b.getBridgeLine(
- bridgeRequest, context.includeFingerprints) for b in bridges)
+ bridgeLines = [b.getBridgeLine(bridgeRequest, context.includeFingerprints) for b in bridges]
+ answer = "".join(" %s\r\n" % line for line in bridgeLines)
+ qrcode = generateQR(bridgeLines)
internalMetrix.recordHandoutsPerBridge(bridgeRequest, bridges)
else:
internalMetrix.recordEmptyEmailResponse()
- return templates.buildAnswerMessage(translator, client, answer)
+ return templates.buildAnswerMessage(translator, client, answer), qrcode
def generateResponse(fromAddress, client, body, subject=None,
- messageID=None):
- """Create an :class:`EmailResponse`, which acts like an
- :class:`io.StringIO` instance, by creating and writing all headers and the
- email body into the file-like :attr:`EmailResponse.mailfile`.
+ messageID=None, qrcode=None):
+ """Create an :class:`email.message.EmailMessage`
:param str fromAddress: The :rfc:`2821` email address which should be in
the ``'From:'`` header.
@@ -135,206 +137,43 @@ def generateResponse(fromAddress, client, body, subject=None,
:type messageID: ``None`` or :any:`str`
:param messageID: The :rfc:`2822` specifier for the ``'Message-ID:'``
header, if including one is desirable.
- :returns: An :class:`EmailResponse` which contains the entire email. To
- obtain the contents of the email, including all headers, simply use
- :meth:`EmailResponse.readContents`.
+ :returns: An :class:`email.message.EmailMessage` which contains the entire
+ email. To obtain the contents of the email, including all headers,
+ simply use :meth:`EmailMessage.as_string`.
"""
- response = EmailResponse()
- response.to = client
- response.writeHeaders(fromAddress.encode('utf-8'), str(client), subject,
- inReplyTo=messageID)
- response.writeBody(body.encode('utf-8'))
+ response = EmailMessage()
+ response["From"] = fromAddress
+ response["To"] = str(client)
+
+ if not subject:
+ response["Subject"] = '[no subject]'
+ else:
+ response["Subject"] = subject
+ if messageID:
+ response.add_header("In-Reply-To", messageID)
+
+ response.add_header("Date", smtp.rfc822date().decode("utf-8"))
+ response.set_content(body)
+
+ if qrcode:
+ response.add_attachment(qrcode, maintype="image", subtype="jpeg", filename="qrcode.jpg")
# Only log the email text (including all headers) if SAFE_LOGGING is
# disabled:
if not safelog.safe_logging:
- contents = response.readContents()
- logging.debug("Email contents:\n%s" % str(contents))
+ logging.debug("Email contents:\n%s" % response.as_string())
else:
logging.debug("Email text for %r created." % str(client))
- response.rewind()
return response
-class EmailResponse(object):
- """Holds information for generating a response email for a request.
-
- .. todo:: At some point, we may want to change this class to optionally
- handle creating Multipart MIME encoding messages, so that we can
- include attachments.
-
- :var str delimiter: Delimiter between lines written to the
- :data:`mailfile`.
- :var bool closed: ``True`` if :meth:`close` has been called.
- :vartype to: :api:`twisted.mail.smtp.Address`
- :var to: The client's email address, to which this response should be sent.
- """
-
- def __init__(self):
- """Create a response to an email we have recieved.
-
- This class deals with correctly formatting text for the response email
- headers and the response body into an instance of :data:`mailfile`.
- """
- self.mailfile = io.StringIO()
- self.delimiter = '\n'
- self.closed = False
- self.to = None
-
- def close(self):
- """Close our :data:`mailfile` and set :data:`closed` to ``True``."""
- logging.debug("Closing %s.mailfile..." % (self.__class__.__name__))
- self.mailfile.close()
- self.closed = True
-
- def read(self, size=None):
- """Read, at most, **size** bytes from our :data:`mailfile`.
-
- .. note:: This method is required by Twisted's SMTP system.
-
- :param int size: The number of bytes to read. Defaults to ``None``,
- which reads until EOF.
- :rtype: str
- :returns: The bytes read from the :data:`mailfile`.
- """
- contents = ''
- logging.debug("Reading%s from %s.mailfile..."
- % ((' {0} bytes'.format(size) if size else ''),
- self.__class__.__name__))
- try:
- if size is not None:
- contents = self.mailfile.read(int(size)).encode('utf-8')
- else:
- contents = self.mailfile.read().encode('utf-8')
- except Exception as error: # pragma: no cover
- logging.exception(error)
-
- return contents
-
- def readContents(self):
- """Read the all the contents written thus far to the :data:`mailfile`,
- and then :meth:`seek` to return to the original pointer position we
- were at before this method was called.
-
- :rtype: str
- :returns: The entire contents of the :data:`mailfile`.
- """
- pointer = self.mailfile.tell()
- self.mailfile.seek(0)
- contents = self.mailfile.read()
- self.mailfile.seek(pointer)
- return contents
-
- def rewind(self):
- """Rewind to the very beginning of the :data:`mailfile`."""
- logging.debug("Rewinding %s.mailfile..." % self.__class__.__name__)
- self.mailfile.seek(0)
-
- def write(self, line):
- """Write the **line** to the :data:`mailfile`.
-
- Any **line** written to me will have :data:`delimiter` appended to it
- beforehand.
-
- :param str line: Something to append into the :data:`mailfile`.
- """
-
- line = line.decode('utf-8') if isinstance(line, bytes) else line
-
- if line.find('\r\n') != -1:
- # If **line** contains newlines, send it to :meth:`writelines` to
- # break it up so that we can replace them:
- logging.debug("Found newlines in %r. Calling writelines()." % line)
- self.writelines(line)
- else:
- line += self.delimiter
- self.mailfile.write(line)
- self.mailfile.flush()
-
- def writelines(self, lines):
- """Calls :meth:`write` for each line in **lines**.
-
- Line endings of ``'\\r\\n'`` will be replaced with :data:`delimiter`
- (i.e. ``'\\n'``). See :api:`twisted.mail.smtp.SMTPClient.getMailData`
- for the reason.
-
- :type lines: :any:`str` or :any:`list`
- :param lines: The lines to write to the :attr:`mailfile`.
- """
- if isinstance(lines, (str, bytes)):
- lines = lines.decode('utf-8') if isinstance(lines, bytes) else lines
- lines = lines.replace('\r\n', '\n')
- for ln in lines.split('\n'):
- self.write(ln)
- elif isinstance(lines, (list, tuple,)):
- for ln in lines:
- self.write(ln)
-
- def writeHeaders(self, fromAddress, toAddress, subject=None,
- inReplyTo=None, includeMessageID=True,
- contentType='text/plain; charset="utf-8"', **kwargs):
- """Write all headers into the response email.
-
- :param str fromAddress: The email address for the ``'From:'`` header.
- :param str toAddress: The email address for the ``'To:'`` header.
- :type subject: ``None`` or :any:`str`
- :param subject: The ``'Subject:'`` header.
- :type inReplyTo: ``None`` or :any:`str`
- :param inReplyTo: If set, an ``'In-Reply-To:'`` header will be
- generated. This should be set to the ``'Message-ID:'`` header from
- the client's original request email.
- :param bool includeMessageID: If ``True``, generate and include a
- ``'Message-ID:'`` header for the response.
- :param str contentType: The ``'Content-Type:'`` header.
- :kwargs: If given, the key will become the name of the header, and the
- value will become the contents of that header.
- """
-
- fromAddress = fromAddress.decode('utf-8') if isinstance(fromAddress, bytes) else fromAddress
- toAddress = toAddress.decode('utf-8') if isinstance(toAddress, bytes) else toAddress
-
- self.write("From: %s" % fromAddress)
- self.write("To: %s" % toAddress)
- if includeMessageID:
- self.write("Message-ID: %s" % smtp.messageid())
- if inReplyTo:
- self.write("In-Reply-To: %s" % inReplyTo)
- self.write("Content-Type: %s" % contentType)
- self.write("Date: %s" % smtp.rfc822date().decode('utf-8'))
-
- if not subject:
- subject = '[no subject]'
- if not subject.lower().startswith('re'):
- subject = "Re: " + subject
- self.write("Subject: %s" % subject)
-
- if kwargs:
- for headerName, headerValue in kwargs.items():
- headerName = headerName.capitalize()
- headerName = headerName.replace(' ', '-')
- headerName = headerName.replace('_', '-')
- header = "%s: %s" % (headerName, headerValue)
- self.write(header)
-
- # The first blank line designates that the headers have ended:
- self.write(self.delimiter)
-
- def writeBody(self, body):
- """Write the response body into the :attr:`mailfile`.
-
- :param str body: The body of the response email.
- """
- logging.info("Writing email body...")
- self.writelines(body)
-
-
class SMTPAutoresponder(smtp.SMTPClient):
"""An :api:`twisted.mail.smtp.SMTPClient` for responding to incoming mail.
The main worker in this class is the :meth:`reply` method, which functions
to dissect an incoming email from an incoming :class:`SMTPMessage` and
- create a :class:`EmailResponse` email message in reply to it, and then,
+ create a :class:`EmailMessage` email message in reply to it, and then,
finally, send it out.
:vartype log: :api:`twisted.python.util.LineLog`
@@ -370,8 +209,8 @@ class SMTPAutoresponder(smtp.SMTPClient):
This method must return a file-like object containing the data of the
message to be sent. Lines in the file should be delimited by ``\\n``.
- :rtype: ``None`` or :class:`EmailResponse`
- :returns: An ``EmailResponse``, if we have a response to send in reply
+ :rtype: ``None`` or :class:`EmailMessage`
+ :returns: An ``EmailMessage``, if we have a response to send in reply
to the incoming email, otherwise, returns ``None``.
"""
clients = self.getMailTo()
@@ -391,9 +230,9 @@ class SMTPAutoresponder(smtp.SMTPClient):
lang = translations.getLocaleFromPlusAddr(recipient)
logging.info("Client requested email translation: %s" % lang)
- body = createResponseBody(self.incoming.lines,
- self.incoming.context,
- client, lang)
+ body, qrcode = createResponseBody(self.incoming.lines,
+ self.incoming.context,
+ client, lang)
# The string EMAIL_MISC_TEXT[1] shows up in an email if BridgeDB
# responds with bridges. Everything else we count as an invalid
@@ -408,8 +247,7 @@ class SMTPAutoresponder(smtp.SMTPClient):
messageID = self.incoming.message.get("Message-ID", None)
subject = self.incoming.message.get("Subject", None)
- response = generateResponse(recipient, client,
- body, subject, messageID)
+ response = generateResponse(recipient, client, body, subject, messageID, qrcode)
return response
def getMailTo(self):
@@ -427,13 +265,13 @@ class SMTPAutoresponder(smtp.SMTPClient):
"""
clients = []
addrHeader = None
- try: fromAddr = email.utils.parseaddr(self.incoming.message.get("From"))[1]
+ try: fromAddr = parseaddr(self.incoming.message.get("From"))[1]
except (IndexError, TypeError, AttributeError): pass
else: addrHeader = fromAddr
if not addrHeader:
logging.warn("No From header on incoming mail.")
- try: senderHeader = email.utils.parseaddr(self.incoming.message.get("Sender"))[1]
+ try: senderHeader = parseaddr(self.incoming.message.get("Sender"))[1]
except (IndexError, TypeError, AttributeError): pass
else: addrHeader = senderHeader
if not addrHeader:
@@ -675,17 +513,17 @@ class SMTPAutoresponder(smtp.SMTPClient):
:type client: :api:`twisted.mail.smtp.Address`
:param client: The email address of the client.
- :param response: A :class:`EmailResponse`.
+ :param response: A :class:`EmailMessage`.
:param int retries: Try resending this many times. (default: ``0``)
:param int timeout: Timeout after this many seconds. (default: ``30``)
:rtype: :api:`Deferred <twisted.internet.defer.Deferred>`
:returns: Our :data:`deferred`.
"""
- logging.info("Sending reply to %s ..." % str(response.to))
+ logging.info("Sending reply to %s ..." % str(response["To"]))
factory = smtp.SMTPSenderFactory(self.incoming.context.smtpFromAddr,
- str(response.to),
- response,
+ response["To"],
+ io.BytesIO(response.as_bytes()),
self.deferred,
retries=retries,
timeout=timeout)
diff --git a/bridgedb/test/test_email_autoresponder.py b/bridgedb/test/test_email_autoresponder.py
index 9b49c67..566d837 100644
--- a/bridgedb/test/test_email_autoresponder.py
+++ b/bridgedb/test/test_email_autoresponder.py
@@ -59,16 +59,18 @@ class CreateResponseBodyTests(unittest.TestCase):
bridges."""
lines = self._getIncomingLines("testing@localhost")
lines[4] = "transport obfs3"
- ret = autoresponder.createResponseBody(lines, self.ctx, self.toAddress)
+ ret, qrcode = autoresponder.createResponseBody(lines, self.ctx, self.toAddress)
self.assertSubstring("Here are your bridges:", ret)
+ self.assertIsNotNone(qrcode)
def test_createResponseBody_bridges_obfs3(self):
"""A request for 'get transport obfs3' should receive a response."""
lines = self._getIncomingLines("testing@localhost")
lines[4] = "get transport obfs3"
- ret = autoresponder.createResponseBody(lines, self.ctx, self.toAddress)
+ ret, qrcode = autoresponder.createResponseBody(lines, self.ctx, self.toAddress)
self.assertSubstring("Here are your bridges", ret)
self.assertSubstring("obfs3", ret)
+ self.assertIsInstance(qrcode, bytes)
def test_createResponseBody_bridges_obfsobfswebz(self):
"""We should only pay attention to the *last* in a crazy request."""
@@ -76,9 +78,10 @@ class CreateResponseBodyTests(unittest.TestCase):
lines[4] = "get unblocked webz"
lines.append("get transport obfs2")
lines.append("get transport obfs3")
- ret = autoresponder.createResponseBody(lines, self.ctx, self.toAddress)
+ ret, qrcode = autoresponder.createResponseBody(lines, self.ctx, self.toAddress)
self.assertSubstring("Here are your bridges", ret)
self.assertSubstring("obfs3", ret)
+ self.assertIsInstance(qrcode, bytes)
def test_createResponseBody_bridges_obfsobfswebzipv6(self):
"""We should *still* only pay attention to the *last* request."""
@@ -87,9 +90,10 @@ class CreateResponseBodyTests(unittest.TestCase):
lines.append("get unblocked webz")
lines.append("get ipv6")
lines.append("get transport obfs2")
- ret = autoresponder.createResponseBody(lines, self.ctx, self.toAddress)
+ ret, qrcode = autoresponder.createResponseBody(lines, self.ctx, self.toAddress)
self.assertSubstring("Here are your bridges", ret)
self.assertSubstring("obfs2", ret)
+ self.assertIsInstance(qrcode, bytes)
def test_createResponseBody_two_requests_TooSoonEmail(self):
"""The same client making two requests in a row should receive a
@@ -100,10 +104,12 @@ class CreateResponseBodyTests(unittest.TestCase):
ctx = _createMailServerContext(self.config, dist)
lines = self._getIncomingLines("testing@localhost")
- first = autoresponder.createResponseBody(lines, ctx, self.toAddress)
+ first, qrcode = autoresponder.createResponseBody(lines, ctx, self.toAddress)
self.assertSubstring("Here are your bridges", first)
- second = autoresponder.createResponseBody(lines, ctx, self.toAddress)
+ self.assertIsInstance(qrcode, bytes)
+ second, qrcode = autoresponder.createResponseBody(lines, ctx, self.toAddress)
self.assertSubstring("Please slow down", second)
+ self.assertIsNone(qrcode)
def test_createResponseBody_three_requests_TooSoonEmail(self):
"""Alice making a request, next Bob making a request, and then Alice again,
@@ -115,18 +121,21 @@ class CreateResponseBodyTests(unittest.TestCase):
ctx = _createMailServerContext(self.config, dist)
aliceLines = self._getIncomingLines("alice@localhost")
- aliceFirst = autoresponder.createResponseBody(aliceLines, ctx,
- self.toAddress)
+ aliceFirst, qrcode = autoresponder.createResponseBody(aliceLines, ctx,
+ self.toAddress)
self.assertSubstring("Here are your bridges", aliceFirst)
+ self.assertIsInstance(qrcode, bytes)
bobLines = self._getIncomingLines("bob@localhost")
- bobFirst = autoresponder.createResponseBody(bobLines, ctx,
- self.toAddress)
+ bobFirst, qrcode = autoresponder.createResponseBody(bobLines, ctx,
+ self.toAddress)
self.assertSubstring("Here are your bridges", bobFirst)
+ self.assertIsInstance(qrcode, bytes)
- aliceSecond = autoresponder.createResponseBody(aliceLines, ctx,
- self.toAddress)
+ aliceSecond, qrcode = autoresponder.createResponseBody(aliceLines, ctx,
+ self.toAddress)
self.assertSubstring("Please slow down", aliceSecond)
+ self.assertIsNone(qrcode)
def test_createResponseBody_three_requests_IgnoreEmail(self):
"""The same client making three requests in a row should receive a
@@ -138,140 +147,18 @@ class CreateResponseBodyTests(unittest.TestCase):
ctx = _createMailServerContext(self.config, dist)
lines = self._getIncomingLines("testing@localhost")
- first = autoresponder.createResponseBody(lines, ctx, self.toAddress)
+ first, qrcode = autoresponder.createResponseBody(lines, ctx, self.toAddress)
self.assertSubstring("Here are your bridges", first)
- second = autoresponder.createResponseBody(lines, ctx, self.toAddress)
+ self.assertIsInstance(qrcode, bytes)
+ second, qrcode = autoresponder.createResponseBody(lines, ctx, self.toAddress)
self.assertSubstring("Please slow down", second)
- third = autoresponder.createResponseBody(lines, ctx, self.toAddress)
+ self.assertIsNone(qrcode)
+ third, qrcode = autoresponder.createResponseBody(lines, ctx, self.toAddress)
self.assertIsNone(third)
- fourth = autoresponder.createResponseBody(lines, ctx, self.toAddress)
+ self.assertIsNone(qrcode)
+ fourth, qrcode = autoresponder.createResponseBody(lines, ctx, self.toAddress)
self.assertIsNone(fourth)
-
-
-class EmailResponseTests(unittest.TestCase):
- """Tests for ``generateResponse()`` and ``EmailResponse``."""
-
- def setUp(self):
- self.fromAddr = "bridges(a)torproject.org"
- self.clientAddr = "user(a)example.com"
- self.body = """\
-People think that time is strictly linear, but, in reality, it's actually just
-a ball of timey-wimey, wibbly-warbly... stuff."""
-
- def tearDown(self):
- autoresponder.safelog.safe_logging = True
-
- def test_EmailResponse_generateResponse(self):
- response = autoresponder.generateResponse(self.fromAddr,
- self.clientAddr,
- self.body)
- self.assertIsInstance(response, autoresponder.EmailResponse)
-
- def test_EmailResponse_generateResponse_noSafelog(self):
- autoresponder.safelog.safe_logging = False
- response = autoresponder.generateResponse(self.fromAddr,
- self.clientAddr,
- self.body)
- self.assertIsInstance(response, autoresponder.EmailResponse)
-
- def test_EmailResponse_generateResponse_mailfile(self):
- response = autoresponder.generateResponse(self.fromAddr,
- self.clientAddr,
- self.body)
- self.assertIsInstance(response.mailfile, (io.BytesIO, io.StringIO))
-
- def test_EmailResponse_generateResponse_withInReplyTo(self):
- response = autoresponder.generateResponse(self.fromAddr,
- self.clientAddr,
- self.body,
- messageID="NSA")
- contents = str(response.readContents()).replace('\x00', '')
- self.assertIsInstance(response.mailfile, (io.BytesIO, io.StringIO))
- self.assertSubstring("In-Reply-To: NSA", contents)
-
- def test_EmailResponse_generateResponse_readContents(self):
- response = autoresponder.generateResponse(self.fromAddr,
- self.clientAddr,
- self.body)
- contents = str(response.readContents()).replace('\x00', '')
- self.assertSubstring('timey-wimey, wibbly-warbly... stuff.', contents)
-
- def test_EmailResponse_additionalHeaders(self):
- response = autoresponder.EmailResponse()
- response.writeHeaders(self.fromAddr, self.clientAddr,
- subject="Re: echelon", inReplyTo="NSA",
- X_been_there="They were so 2004")
- contents = str(response.readContents()).replace('\x00', '')
- self.assertIsInstance(response.mailfile, (io.BytesIO, io.StringIO))
- self.assertSubstring("In-Reply-To: NSA", contents)
- self.assertSubstring("X-been-there: They were so 2004", contents)
-
- def test_EmailResponse_close(self):
- """Calling EmailResponse.close() should close the ``mailfile`` and set
- ``closed=True``.
- """
- response = autoresponder.EmailResponse()
- self.assertEqual(response.closed, False)
- response.close()
- self.assertEqual(response.closed, True)
- self.assertRaises(ValueError, response.write, self.body)
-
- def test_EmailResponse_read(self):
- """Calling EmailResponse.read() should read bytes from the file."""
- response = autoresponder.EmailResponse()
- response.write(self.body)
- response.rewind()
- contents = response.read().replace(b'\x00', b'').decode('utf-8')
- # The newlines in the email body should have been replaced with
- # ``EmailResponse.delimiter``.
- delimited = self.body.replace('\n', response.delimiter) \
- + response.delimiter
- self.assertEqual(delimited, contents)
-
- def test_EmailResponse_read_three_bytes(self):
- """EmailResponse.read(3) should read three bytes from the file."""
- response = autoresponder.EmailResponse()
- response.write(self.body)
- response.rewind()
- contents = response.read(3).replace(b'\x00', b'').decode('utf-8')
- self.assertEqual(contents, self.body[:3])
-
- def test_EmailResponse_write(self):
- """Calling EmailResponse.write() should write to the mailfile."""
- response = autoresponder.EmailResponse()
- response.write(self.body)
- contents = str(response.readContents()).replace('\x00', '')
- # The newlines in the email body should have been replaced with
- # ``EmailResponse.delimiter``.
- delimited = self.body.replace('\n', response.delimiter) \
- + response.delimiter
- self.assertEqual(delimited, contents)
-
- def test_EmailResponse_write_withRetNewlines(self):
- """Calling EmailResponse.write() with '\r\n' in the lines should call
- writelines(), which splits up the lines and then calls write() again.
- """
- response = autoresponder.EmailResponse()
- response.write(self.body.replace('\n', '\r\n'))
- contents = str(response.readContents()).replace('\x00', '')
- # The newlines in the email body should have been replaced with
- # ``EmailResponse.delimiter``.
- delimited = self.body.replace('\n', response.delimiter) \
- + response.delimiter
- self.assertEqual(delimited, contents)
-
- def test_EmailResponse_writelines_list(self):
- """Calling EmailResponse.writelines() with a list should write the
- concatenated contents of the list into the mailfile.
- """
- response = autoresponder.EmailResponse()
- response.writelines(self.body.split('\n'))
- contents = str(response.readContents()).replace('\x00', '')
- # The newlines in the email body should have been replaced with
- # ``EmailResponse.delimiter``.
- delimited = self.body.replace('\n', response.delimiter) \
- + response.delimiter
- self.assertEqual(delimited, contents)
+ self.assertIsNone(qrcode)
class SMTPAutoresponderTests(unittest.TestCase):