Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import csv
import datetime

from django.core.exceptions import PermissionDenied
from django.core.paginator import InvalidPage
from django.http import HttpResponse
from django.shortcuts import get_object_or_404, redirect
from django.utils.encoding import smart_str
from django.utils.translation import ungettext
from django.views.generic import ListView, TemplateView

from wagtail.admin import messages
from wagtail.contrib.forms.forms import SelectDateForm
from wagtail.contrib.forms.utils import get_forms_for_user
from wagtail.core.models import Page
from wagtail.utils.pagination import DEFAULT_PAGE_KEY


def get_submissions_list_view(request, *args, **kwargs):
    """ Call the form page's list submissions view class """
    page_id = kwargs.get('page_id')
    form_page = get_object_or_404(Page, id=page_id).specific
    return form_page.serve_submissions_list_view(request, *args, **kwargs)


class SafePaginateListView(ListView):
    """ Listing view with safe pagination, allowing incorrect or out of range values """

    paginate_by = 20
    page_kwarg = DEFAULT_PAGE_KEY

    def paginate_queryset(self, queryset, page_size):
        """Paginate the queryset if needed with nice defaults on invalid param."""
        paginator = self.get_paginator(
            queryset,
            page_size,
            orphans=self.get_paginate_orphans(),
            allow_empty_first_page=self.get_allow_empty()
        )
        page_kwarg = self.page_kwarg
        page_request = self.kwargs.get(page_kwarg) or self.request.GET.get(page_kwarg) or 0
        try:
            page_number = int(page_request)
        except ValueError:
            if page_request == 'last':
                page_number = paginator.num_pages
            else:
                page_number = 0
        try:
            if page_number > paginator.num_pages:
                page_number = paginator.num_pages  # page out of range, show last page
            page = paginator.page(page_number)
            return (paginator, page, page.object_list, page.has_other_pages())
        except InvalidPage:
            page = paginator.page(1)
            return (paginator, page, page.object_list, page.has_other_pages())
        return super().paginage_queryset(queryset, page_size)


class FormPagesListView(SafePaginateListView):
    """ Lists the available form pages for the current user """
    template_name = 'wagtailforms/index.html'
    context_object_name = 'form_pages'

    def get_queryset(self):
        """ Return the queryset of form pages for this view """
        queryset = get_forms_for_user(self.request.user)
        ordering = self.get_ordering()
        if ordering:
            if isinstance(ordering, str):
                ordering = (ordering,)
            queryset = queryset.order_by(*ordering)
        return queryset


class DeleteSubmissionsView(TemplateView):
    """ Delete the selected submissions """
    template_name = 'wagtailforms/confirm_delete.html'
    page = None
    submissions = None
    success_url = 'wagtailforms:list_submissions'

    def get_queryset(self):
        """ Returns a queryset for the selected submissions """
        submission_ids = self.request.GET.getlist('selected-submissions')
        submission_class = self.page.get_submission_class()
        return submission_class._default_manager.filter(id__in=submission_ids)

    def handle_delete(self, submissions):
        """ Deletes the given queryset """
        count = submissions.count()
        submissions.delete()
        messages.success(
            self.request,
            ungettext(
                'One submission has been deleted.',
                '%(count)d submissions have been deleted.',
                count
            ) % {'count': count}
        )

    def get_success_url(self):
        """ Returns the success URL to redirect to after a successful deletion """
        return self.success_url

    def dispatch(self, request, *args, **kwargs):
        """ Check permissions, set the page and submissions, handle delete """
        page_id = kwargs.get('page_id')

        if not get_forms_for_user(self.request.user).filter(id=page_id).exists():
            raise PermissionDenied

        self.page = get_object_or_404(Page, id=page_id).specific

        self.submissions = self.get_queryset()

        if self.request.method == 'POST':
            self.handle_delete(self.submissions)
            return redirect(self.get_success_url(), page_id)

        return super().dispatch(request, *args, **kwargs)

    def get_context_data(self, **kwargs):
        """ Get the context for this view """
        context = super().get_context_data(**kwargs)

        context.update({
            'page': self.page,
            'submissions': self.submissions,
        })

        return context


class SubmissionsListView(SafePaginateListView):
    """ Lists submissions for the provided form page """
    template_name = 'wagtailforms/index_submissions.html'
    context_object_name = 'submissions'
    form_page = None
    ordering = ('-submit_time',)
    ordering_csv = ('submit_time',)  # keep legacy CSV ordering
    orderable_fields = ('id', 'submit_time',)  # used to validate ordering in URL
    select_date_form = None

    def dispatch(self, request, *args, **kwargs):
        """ Check permissions and set the form page """

        self.form_page = kwargs.get('form_page')

        if not get_forms_for_user(request.user).filter(pk=self.form_page.id).exists():
            raise PermissionDenied

        self.is_csv_export = (self.request.GET.get('action') == 'CSV')
        if self.is_csv_export:
            self.paginate_by = None

        return super().dispatch(request, *args, **kwargs)

    def get_queryset(self):
        """ Return queryset of form submissions with filter and order_by applied """
        submission_class = self.form_page.get_submission_class()
        queryset = submission_class._default_manager.filter(page=self.form_page)

        filtering = self.get_filtering()
        if filtering and isinstance(filtering, dict):
            queryset = queryset.filter(**filtering)

        ordering = self.get_ordering()
        if ordering:
            if isinstance(ordering, str):
                ordering = (ordering,)
            queryset = queryset.order_by(*ordering)

        return queryset

    def get_paginate_by(self, queryset):
        """ Get the number of items to paginate by, or ``None`` for no pagination """
        if self.is_csv_export:
            return None
        return self.paginate_by

    def get_validated_ordering(self):
        """ Return a dict of field names with ordering labels if ordering is valid """
        orderable_fields = self.orderable_fields or ()
        ordering = dict()
        if self.is_csv_export:
            #  Revert to CSV order_by submit_time ascending for backwards compatibility
            default_ordering = self.ordering_csv or ()
        else:
            default_ordering = self.ordering or ()
        if isinstance(default_ordering, str):
            default_ordering = (default_ordering,)
        ordering_strs = self.request.GET.getlist('order_by') or list(default_ordering)
        for order in ordering_strs:
            try:
                _, prefix, field_name = order.rpartition('-')
                if field_name in orderable_fields:
                    ordering[field_name] = (
                        prefix, 'descending' if prefix == '-' else 'ascending'
                    )
            except (IndexError, ValueError):
                continue  # invalid ordering specified, skip it
        return ordering

    def get_ordering(self):
        """ Return the field or fields to use for ordering the queryset """
        ordering = self.get_validated_ordering()
        return [values[0] + name for name, values in ordering.items()]

    def get_filtering(self):
        """ Return filering as a dict for submissions queryset """
        self.select_date_form = SelectDateForm(self.request.GET)
        result = dict()
        if self.select_date_form.is_valid():
            date_from = self.select_date_form.cleaned_data.get('date_from')
            date_to = self.select_date_form.cleaned_data.get('date_to')
            if date_to:
                # careful: date_to must be increased by 1 day
                # as submit_time is a time so will always be greater
                date_to += datetime.timedelta(days=1)
                if date_from:
                    result['submit_time__range'] = [date_from, date_to]
                else:
                    result['submit_time__lte'] = date_to
            elif date_from:
                result['submit_time__gte'] = date_from
        return result

    def get_csv_filename(self):
        """ Returns the filename for the generated CSV file """
        return 'export-{}.csv'.format(
            datetime.datetime.today().strftime('%Y-%m-%d')
        )

    def get_csv_response(self, context):
        """ Returns a CSV response """
        filename = self.get_csv_filename()
        response = HttpResponse(content_type='text/csv; charset=utf-8')
        response['Content-Disposition'] = 'attachment;filename={}'.format(filename)

        writer = csv.writer(response)
        writer.writerow(context['data_headings'])
        for data_row in context['data_rows']:
            writer.writerow(data_row)
        return response

    def render_to_response(self, context, **response_kwargs):
        if self.is_csv_export:
            return self.get_csv_response(context)
        return super().render_to_response(context, **response_kwargs)

    def get_context_data(self, **kwargs):
        """ Return context for view, handle CSV or normal output """
        context = super().get_context_data(**kwargs)
        submissions = context[self.context_object_name]
        data_fields = self.form_page.get_data_fields()
        data_rows = []

        if self.is_csv_export:
            # Build data_rows as list of lists containing formatted data values
            # Using smart_str prevents UnicodeEncodeError for values with non-ansi symbols
            for submission in submissions:
                form_data = submission.get_data()
                data_row = []
                for name, label in data_fields:
                    val = form_data.get(name)
                    if isinstance(val, list):
                        val = ', '.join(val)
                    data_row.append(smart_str(val))
                data_rows.append(data_row)
            data_headings = [smart_str(label) for name, label in data_fields]
        else:
            # Build data_rows as list of dicts containing model_id and fields
            for submission in submissions:
                form_data = submission.get_data()
                data_row = []
                for name, label in data_fields:
                    val = form_data.get(name)
                    if isinstance(val, list):
                        val = ', '.join(val)
                    data_row.append(val)
                data_rows.append({
                    'model_id': submission.id,
                    'fields': data_row
                })
            # Build data_headings as list of dicts containing model_id and fields
            ordering_by_field = self.get_validated_ordering()
            orderable_fields = self.orderable_fields
            data_headings = []
            for name, label in data_fields:
                order_label = None
                if name in orderable_fields:
                    order = ordering_by_field.get(name)
                    if order:
                        order_label = order[1]  # 'ascending' or 'descending'
                    else:
                        order_label = 'orderable'  # not ordered yet but can be
                data_headings.append({
                    'name': name,
                    'label': label,
                    'order': order_label,
                })

        context.update({
            'form_page': self.form_page,
            'select_date_form': self.select_date_form,
            'data_headings': data_headings,
            'data_rows': data_rows,
            'submissions': submissions,
        })

        return context