Repository URL to install this package:
Version:
2.4.1b3 ▾
|
import posixpath
from collections import defaultdict
from django.apps import apps
from django.contrib.contenttypes.models import ContentType
from django.db.models import CharField, Q
from django.db.models.functions import Length, Substr
from django.db.models.query import BaseIterable
from treebeard.mp_tree import MP_NodeQuerySet
from wagtail.search.queryset import SearchableQuerySetMixin
class TreeQuerySet(MP_NodeQuerySet):
"""
Extends Treebeard's MP_NodeQuerySet with additional useful tree-related operations.
"""
def descendant_of_q(self, other, inclusive=False):
q = Q(path__startswith=other.path) & Q(depth__gte=other.depth)
if not inclusive:
q &= ~Q(pk=other.pk)
return q
def descendant_of(self, other, inclusive=False):
"""
This filters the QuerySet to only contain pages that descend from the specified page.
If inclusive is set to True, it will also contain the page itself (instead of just its descendants).
"""
return self.filter(self.descendant_of_q(other, inclusive))
def not_descendant_of(self, other, inclusive=False):
"""
This filters the QuerySet to not contain any pages that descend from the specified page.
If inclusive is set to True, it will also exclude the specified page.
"""
return self.exclude(self.descendant_of_q(other, inclusive))
def child_of_q(self, other):
return self.descendant_of_q(other) & Q(depth=other.depth + 1)
def child_of(self, other):
"""
This filters the QuerySet to only contain pages that are direct children of the specified page.
"""
return self.filter(self.child_of_q(other))
def not_child_of(self, other):
"""
This filters the QuerySet to not contain any pages that are direct children of the specified page.
"""
return self.exclude(self.child_of_q(other))
def ancestor_of_q(self, other, inclusive=False):
paths = [
other.path[0:pos]
for pos in range(0, len(other.path) + 1, other.steplen)[1:]
]
q = Q(path__in=paths)
if not inclusive:
q &= ~Q(pk=other.pk)
return q
def ancestor_of(self, other, inclusive=False):
"""
This filters the QuerySet to only contain pages that are ancestors of the specified page.
If inclusive is set to True, it will also include the specified page.
"""
return self.filter(self.ancestor_of_q(other, inclusive))
def not_ancestor_of(self, other, inclusive=False):
"""
This filters the QuerySet to not contain any pages that are ancestors of the specified page.
If inclusive is set to True, it will also exclude the specified page.
"""
return self.exclude(self.ancestor_of_q(other, inclusive))
def parent_of_q(self, other):
return Q(path=self.model._get_parent_path_from_path(other.path))
def parent_of(self, other):
"""
This filters the QuerySet to only contain the parent of the specified page.
"""
return self.filter(self.parent_of_q(other))
def not_parent_of(self, other):
"""
This filters the QuerySet to exclude the parent of the specified page.
"""
return self.exclude(self.parent_of_q(other))
def sibling_of_q(self, other, inclusive=True):
q = Q(path__startswith=self.model._get_parent_path_from_path(other.path)) & Q(depth=other.depth)
if not inclusive:
q &= ~Q(pk=other.pk)
return q
def sibling_of(self, other, inclusive=True):
"""
This filters the QuerySet to only contain pages that are siblings of the specified page.
By default, inclusive is set to True so it will include the specified page in the results.
If inclusive is set to False, the page will be excluded from the results.
"""
return self.filter(self.sibling_of_q(other, inclusive))
def not_sibling_of(self, other, inclusive=True):
"""
This filters the QuerySet to not contain any pages that are siblings of the specified page.
By default, inclusive is set to True so it will exclude the specified page from the results.
If inclusive is set to False, the page will be included in the results.
"""
return self.exclude(self.sibling_of_q(other, inclusive))
class PageQuerySet(SearchableQuerySetMixin, TreeQuerySet):
def live_q(self):
return Q(live=True)
def live(self):
"""
This filters the QuerySet to only contain published pages.
"""
return self.filter(self.live_q())
def not_live(self):
"""
This filters the QuerySet to only contain unpublished pages.
"""
return self.exclude(self.live_q())
def in_menu_q(self):
return Q(show_in_menus=True)
def in_menu(self):
"""
This filters the QuerySet to only contain pages that are in the menus.
"""
return self.filter(self.in_menu_q())
def not_in_menu(self):
"""
This filters the QuerySet to only contain pages that are not in the menus.
"""
return self.exclude(self.in_menu_q())
def page_q(self, other):
return Q(id=other.id)
def page(self, other):
"""
This filters the QuerySet so it only contains the specified page.
"""
return self.filter(self.page_q(other))
def not_page(self, other):
"""
This filters the QuerySet so it doesn't contain the specified page.
"""
return self.exclude(self.page_q(other))
def type_q(self, klass):
content_types = ContentType.objects.get_for_models(*[
model for model in apps.get_models()
if issubclass(model, klass)
]).values()
return Q(content_type__in=list(content_types))
def type(self, model):
"""
This filters the QuerySet to only contain pages that are an instance
of the specified model (including subclasses).
"""
return self.filter(self.type_q(model))
def not_type(self, model):
"""
This filters the QuerySet to not contain any pages which are an instance of the specified model.
"""
return self.exclude(self.type_q(model))
def exact_type_q(self, klass):
return Q(content_type=ContentType.objects.get_for_model(klass))
def exact_type(self, model):
"""
This filters the QuerySet to only contain pages that are an instance of the specified model
(matching the model exactly, not subclasses).
"""
return self.filter(self.exact_type_q(model))
def not_exact_type(self, model):
"""
This filters the QuerySet to not contain any pages which are an instance of the specified model
(matching the model exactly, not subclasses).
"""
return self.exclude(self.exact_type_q(model))
def public_q(self):
from wagtail.core.models import PageViewRestriction
q = Q()
for restriction in PageViewRestriction.objects.all():
q &= ~self.descendant_of_q(restriction.page, inclusive=True)
return q
def public(self):
"""
This filters the QuerySet to only contain pages that are not in a private section
"""
return self.filter(self.public_q())
def not_public(self):
"""
This filters the QuerySet to only contain pages that are in a private section
"""
return self.exclude(self.public_q())
def first_common_ancestor(self, include_self=False, strict=False):
"""
Find the first ancestor that all pages in this queryset have in common.
For example, consider a page hierarchy like::
- Home/
- Foo Event Index/
- Foo Event Page 1/
- Foo Event Page 2/
- Bar Event Index/
- Bar Event Page 1/
- Bar Event Page 2/
The common ancestors for some queries would be:
.. code-block:: python
>>> Page.objects\\
... .type(EventPage)\\
... .first_common_ancestor()
<Page: Home>
>>> Page.objects\\
... .type(EventPage)\\
... .filter(title__contains='Foo')\\
... .first_common_ancestor()
<Page: Foo Event Index>
This method tries to be efficient, but if you have millions of pages
scattered across your page tree, it will be slow.
If `include_self` is True, the ancestor can be one of the pages in the
queryset:
.. code-block:: python
>>> Page.objects\\
... .filter(title__contains='Foo')\\
... .first_common_ancestor()
<Page: Foo Event Index>
>>> Page.objects\\
... .filter(title__exact='Bar Event Index')\\
... .first_common_ancestor()
<Page: Bar Event Index>
A few invalid cases exist: when the queryset is empty, when the root
Page is in the queryset and ``include_self`` is False, and when there
are multiple page trees with no common root (a case Wagtail does not
support). If ``strict`` is False (the default), then the first root
node is returned in these cases. If ``strict`` is True, then a
``ObjectDoesNotExist`` is raised.
"""
# An empty queryset has no ancestors. This is a problem
if not self.exists():
if strict:
raise self.model.DoesNotExist('Can not find ancestor of empty queryset')
return self.model.get_first_root_node()
if include_self:
# Get all the paths of the matched pages.
paths = self.order_by().values_list('path', flat=True)
else:
# Find all the distinct parent paths of all matched pages.
# The empty `.order_by()` ensures that `Page.path` is not also
# selected to order the results, which makes `.distinct()` works.
paths = self.order_by()\
.annotate(parent_path=Substr(
'path', 1, Length('path') - self.model.steplen,
output_field=CharField(max_length=255)))\
.values_list('parent_path', flat=True)\
.distinct()
# This method works on anything, not just file system paths.
common_parent_path = posixpath.commonprefix(paths)
# That may have returned a path like (0001, 0002, 000), which is
# missing some chars off the end. Fix this by trimming the path to a
# multiple of `Page.steplen`
extra_chars = len(common_parent_path) % self.model.steplen
if extra_chars != 0:
common_parent_path = common_parent_path[:-extra_chars]
if common_parent_path is '':
# This should only happen when there are multiple trees,
# a situation that Wagtail does not support;
# or when the root node itself is part of the queryset.
if strict:
raise self.model.DoesNotExist('No common ancestor found!')
# Assuming the situation is the latter, just return the root node.
# The root node is not its own ancestor, so this is technically
# incorrect. If you want very correct operation, use `strict=True`
# and receive an error.
return self.model.get_first_root_node()
# Assuming the database is in a consistent state, this page should
# *always* exist. If your database is not in a consistent state, you've
# got bigger problems.
return self.model.objects.get(path=common_parent_path)
def unpublish(self):
"""
This unpublishes all live pages in the QuerySet.
"""
for page in self.live():
page.unpublish()
def specific(self, defer=False):
"""
This efficiently gets all the specific pages for the queryset, using
the minimum number of queries.
When the "defer" keyword argument is set to True, only the basic page
fields will be loaded and all specific fields will be deferred. It
will still generate a query for each page type though (this may be
improved to generate only a single query in a future release).
"""
clone = self._clone()
if defer:
clone._iterable_class = DeferredSpecificIterable
else:
clone._iterable_class = SpecificIterable
return clone
def in_site(self, site):
"""
This filters the QuerySet to only contain pages within the specified site.
"""
return self.descendant_of(site.root_page, inclusive=True)
def specific_iterator(qs, defer=False):
"""
This efficiently iterates all the specific pages in a queryset, using
the minimum number of queries.
This should be called from ``PageQuerySet.specific``
"""
pks_and_types = qs.values_list('pk', 'content_type')
pks_by_type = defaultdict(list)
for pk, content_type in pks_and_types:
pks_by_type[content_type].append(pk)
# Content types are cached by ID, so this will not run any queries.
content_types = {pk: ContentType.objects.get_for_id(pk)
for _, pk in pks_and_types}
# Get the specific instances of all pages, one model class at a time.
pages_by_type = {}
for content_type, pks in pks_by_type.items():
# look up model class for this content type, falling back on the original
# model (i.e. Page) if the more specific one is missing
model = content_types[content_type].model_class() or qs.model
pages = model.objects.filter(pk__in=pks)
if defer:
# Defer all specific fields
from wagtail.core.models import Page
fields = [field.attname for field in Page._meta.get_fields() if field.concrete]
pages = pages.only(*fields)
pages_by_type[content_type] = {page.pk: page for page in pages}
# Yield all of the pages, in the order they occurred in the original query.
for pk, content_type in pks_and_types:
yield pages_by_type[content_type][pk]
class SpecificIterable(BaseIterable):
def __iter__(self):
return specific_iterator(self.queryset)
class DeferredSpecificIterable(BaseIterable):
def __iter__(self):
return specific_iterator(self.queryset, defer=True)