Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
django-mailer / engine.py
Size: Mime:
from __future__ import unicode_literals

import contextlib
import logging
import smtplib
import sys
import time
from socket import error as socket_error

import lockfile
import six
from django import VERSION as DJANGO_VERSION
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.core.mail import get_connection
from django.core.mail.message import make_msgid
from django.core.mail.utils import DNS_NAME
from django.db import DatabaseError, NotSupportedError, OperationalError, transaction
from django.utils.module_loading import import_string

from mailer.models import (RESULT_FAILURE, RESULT_SUCCESS, Message, MessageLog, get_message_id)

if DJANGO_VERSION[0] >= 2:
    NotSupportedFeatureException = NotSupportedError
else:
    NotSupportedFeatureException = DatabaseError


# when queue is empty, how long to wait (in seconds) before checking again
EMPTY_QUEUE_SLEEP = getattr(settings, "MAILER_EMPTY_QUEUE_SLEEP", 30)

# lock timeout value. how long to wait for the lock to become available.
# default behavior is to never wait for the lock to be available.
LOCK_WAIT_TIMEOUT = getattr(settings, "MAILER_LOCK_WAIT_TIMEOUT", -1)

# allows for a different lockfile path. The default is a file
# in the current working directory.
LOCK_PATH = getattr(settings, "MAILER_LOCK_PATH", None)

logger = logging.getLogger(__name__)


def prioritize():
    """
    Returns the messages in the queue in the order they should be sent.
    """
    return Message.objects.non_deferred().order_by('priority', 'when_added')


@contextlib.contextmanager
def sender_context(message):
    """
    Makes a context manager appropriate for sending a message.
    Entering the context using `with` may return a `None` object if the message
    has been sent/deleted already.
    """
    # We wrap each message sending inside a transaction (otherwise
    # select_for_update doesn't work).

    # We also do `nowait` for databases that support it. The result of this is
    # that if two processes (which might be on different machines) both attempt
    # to send the same queue, the loser for the first message will immediately
    # get an error, and will be able to try the second message. This means the
    # work for sending the messages will be distributed between the two
    # processes. Otherwise, the losing process has to wait for the winning
    # process to finish and release the lock, and the winning process will
    # almost always win the next message etc.
    with transaction.atomic():
        try:
            try:
                yield Message.objects.filter(id=message.id).select_for_update(nowait=True).get()
            except NotSupportedFeatureException:
                # MySQL
                yield Message.objects.filter(id=message.id).select_for_update().get()
        except Message.DoesNotExist:
            # Deleted by someone else
            yield None
        except OperationalError:
            # Locked by someone else
            yield None


def get_messages_for_sending():
    """
    Returns a series of context managers that are used for sending mails in the queue.
    Entering the context manager returns the actual message
    """
    for message in prioritize():
        yield sender_context(message)


def ensure_message_id(msg):
    if get_message_id(msg) is None:
        # Use cached DNS_NAME for performance
        msg.extra_headers['Message-ID'] = make_msgid(domain=DNS_NAME)


def _limits_reached(sent, deferred):
    # Allow sending a fixed/limited amount of emails in each delivery run
    # defaults to None which means send everything in the queue
    EMAIL_MAX_BATCH = getattr(settings, "MAILER_EMAIL_MAX_BATCH", None)

    if EMAIL_MAX_BATCH is not None and sent >= EMAIL_MAX_BATCH:
        logger.info("EMAIL_MAX_BATCH (%s) reached, "
                    "stopping for this round", EMAIL_MAX_BATCH)
        return True

    # Stop sending emails in the current round if more than X emails get
    # deferred - defaults to None which means keep going regardless
    EMAIL_MAX_DEFERRED = getattr(settings, "MAILER_EMAIL_MAX_DEFERRED", None)

    if EMAIL_MAX_DEFERRED is not None and deferred >= EMAIL_MAX_DEFERRED:
        logger.warning("EMAIL_MAX_DEFERRED (%s) reached, "
                       "stopping for this round", EMAIL_MAX_DEFERRED)
        return True


def _throttle_emails():
    # When delivering, wait some time between emails to avoid server overload
    # defaults to 0 for no waiting
    EMAIL_THROTTLE = getattr(settings, "MAILER_EMAIL_THROTTLE", 0)

    if EMAIL_THROTTLE:
        logger.debug("Throttling email delivery. "
                     "Sleeping %s seconds", EMAIL_THROTTLE)
        time.sleep(EMAIL_THROTTLE)


def handle_delivery_exception(connection, message, exc):
    if isinstance(exc, (smtplib.SMTPAuthenticationError,
                        smtplib.SMTPDataError,
                        smtplib.SMTPRecipientsRefused,
                        smtplib.SMTPSenderRefused,
                        socket_error)):
        message.defer()
        logger.info("message deferred due to failure: %s" % exc)
        MessageLog.objects.log(message, RESULT_FAILURE, log_message=str(exc))

        connection = None  # i.e. enforce creation of a new connection
        status = 'deferred'

        return connection, status

    # The idea is (1) to be backwards compatible with existing behavior
    # and (2) not have delivery errors go unnoticed
    six.reraise(*sys.exc_info())


def acquire_lock():
    logger.debug("acquiring lock...")
    if LOCK_PATH is not None:
        lock_file_path = LOCK_PATH
    else:
        lock_file_path = "send_mail"

    lock = lockfile.FileLock(lock_file_path)

    try:
        lock.acquire(LOCK_WAIT_TIMEOUT)
    except lockfile.AlreadyLocked:
        logger.error("lock already in place. quitting.")
        return False, lock
    except lockfile.LockTimeout:
        logger.error("waiting for the lock timed out. quitting.")
        return False, lock
    logger.debug("acquired.")
    return True, lock


def release_lock(lock):
    logger.debug("releasing lock...")
    lock.release()
    logger.debug("released.")


def _require_no_backend_loop(mailer_email_backend):
    if mailer_email_backend == settings.EMAIL_BACKEND == 'mailer.backend.DbBackend':
        raise ImproperlyConfigured('EMAIL_BACKEND and MAILER_EMAIL_BACKEND'
                                   ' should not both be set to "{}"'
                                   ' at the same time'
                                   .format(settings.EMAIL_BACKEND))


def send_all():
    """
    Send all eligible messages in the queue.
    """
    # The actual backend to use for sending, defaulting to the Django default.
    # To make testing easier this is not stored at module level.
    mailer_email_backend = getattr(
        settings,
        "MAILER_EMAIL_BACKEND",
        "django.core.mail.backends.smtp.EmailBackend"
    )

    # allows disabling file locking. The default is True
    use_file_lock = getattr(settings, "MAILER_USE_FILE_LOCK", True)

    error_handler = import_string(
        getattr(settings, 'MAILER_ERROR_HANDLER',
                'mailer.engine.handle_delivery_exception')
    )

    _require_no_backend_loop(mailer_email_backend)

    if use_file_lock:
        acquired, lock = acquire_lock()
        if not acquired:
            return

    start_time = time.time()

    counts = {'deferred': 0, 'sent': 0}

    try:
        connection = None
        for context in get_messages_for_sending():
            with context as message:
                if message is None:
                    # We didn't acquire the lock
                    continue
                try:
                    if connection is None:
                        connection = get_connection(backend=mailer_email_backend)
                    logger.info("sending message '{0}' to {1}".format(
                        message.subject,
                        ", ".join(message.to_addresses))
                    )
                    email = message.email
                    if email is not None:
                        email.connection = connection
                        ensure_message_id(email)
                        email.send()

                        # connection can't be stored in the MessageLog
                        email.connection = None
                        message.email = email  # For the sake of MessageLog
                        MessageLog.objects.log(message, RESULT_SUCCESS)
                        counts['sent'] += 1
                    else:
                        logger.warning("message discarded due to failure in converting from DB. Added on '%s' with priority '%s'" % (message.when_added, message.priority))  # noqa
                    message.delete()

                except Exception as err:
                    connection, action_taken = error_handler(connection, message, err)
                    counts[action_taken] += 1

            # Check if we reached the limits for the current run
            if _limits_reached(counts['sent'], counts['deferred']):
                break

            _throttle_emails()

    finally:
        if use_file_lock:
            release_lock(lock)

    logger.info("")
    logger.info("%s sent; %s deferred;" % (counts['sent'], counts['deferred']))
    logger.info("done in %.2f seconds" % (time.time() - start_time))


def send_loop():
    """
    Loop indefinitely, checking queue at intervals of EMPTY_QUEUE_SLEEP and
    sending messages if any are on queue.
    """

    while True:
        while not Message.objects.all():
            logger.debug("sleeping for %s seconds before checking queue again" % EMPTY_QUEUE_SLEEP)
            time.sleep(EMPTY_QUEUE_SLEEP)
        send_all()