# orm/strategies.py
# Copyright (C) 2005-2018 the SQLAlchemy authors and contributors
# <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
"""sqlalchemy.orm.interfaces.LoaderStrategy
implementations, and related MapperOptions."""
from .. import exc as sa_exc, inspect
from .. import util, log, event
from ..sql import util as sql_util, visitors
from .. import sql
from . import (
attributes, interfaces, exc as orm_exc, loading,
unitofwork, util as orm_util, query
)
from .state import InstanceState
from .util import _none_set, aliased
from . import properties
from .interfaces import (
LoaderStrategy, StrategizedProperty
)
from .base import _SET_DEFERRED_EXPIRED, _DEFER_FOR_STATE
from .session import _state_session
import itertools
def _register_attribute(
prop, mapper, useobject,
compare_function=None,
typecallable=None,
callable_=None,
proxy_property=None,
active_history=False,
impl_class=None,
**kw
):
attribute_ext = list(util.to_list(prop.extension, default=[]))
listen_hooks = []
uselist = useobject and prop.uselist
if useobject and prop.single_parent:
listen_hooks.append(single_parent_validator)
if prop.key in prop.parent.validators:
fn, opts = prop.parent.validators[prop.key]
listen_hooks.append(
lambda desc, prop: orm_util._validator_events(
desc,
prop.key, fn, **opts)
)
if useobject:
listen_hooks.append(unitofwork.track_cascade_events)
# need to assemble backref listeners
# after the singleparentvalidator, mapper validator
if useobject:
backref = prop.back_populates
if backref:
listen_hooks.append(
lambda desc, prop: attributes.backref_listeners(
desc,
backref,
uselist
)
)
# a single MapperProperty is shared down a class inheritance
# hierarchy, so we set up attribute instrumentation and backref event
# for each mapper down the hierarchy.
# typically, "mapper" is the same as prop.parent, due to the way
# the configure_mappers() process runs, however this is not strongly
# enforced, and in the case of a second configure_mappers() run the
# mapper here might not be prop.parent; also, a subclass mapper may
# be called here before a superclass mapper. That is, can't depend
# on mappers not already being set up so we have to check each one.
for m in mapper.self_and_descendants:
if prop is m._props.get(prop.key) and \
not m.class_manager._attr_has_impl(prop.key):
desc = attributes.register_attribute_impl(
m.class_,
prop.key,
parent_token=prop,
uselist=uselist,
compare_function=compare_function,
useobject=useobject,
extension=attribute_ext,
trackparent=useobject and (
prop.single_parent or
prop.direction is interfaces.ONETOMANY),
typecallable=typecallable,
callable_=callable_,
active_history=active_history,
impl_class=impl_class,
send_modified_events=not useobject or not prop.viewonly,
doc=prop.doc,
**kw
)
for hook in listen_hooks:
hook(desc, prop)
@properties.ColumnProperty.strategy_for(instrument=False, deferred=False)
class UninstrumentedColumnLoader(LoaderStrategy):
"""Represent a non-instrumented MapperProperty.
The polymorphic_on argument of mapper() often results in this,
if the argument is against the with_polymorphic selectable.
"""
__slots__ = 'columns',
def __init__(self, parent, strategy_key):
super(UninstrumentedColumnLoader, self).__init__(parent, strategy_key)
self.columns = self.parent_property.columns
def setup_query(
self, context, entity, path, loadopt, adapter,
column_collection=None, **kwargs):
for c in self.columns:
if adapter:
c = adapter.columns[c]
column_collection.append(c)
def create_row_processor(
self, context, path, loadopt,
mapper, result, adapter, populators):
pass
@log.class_logger
@properties.ColumnProperty.strategy_for(instrument=True, deferred=False)
class ColumnLoader(LoaderStrategy):
"""Provide loading behavior for a :class:`.ColumnProperty`."""
__slots__ = 'columns', 'is_composite'
def __init__(self, parent, strategy_key):
super(ColumnLoader, self).__init__(parent, strategy_key)
self.columns = self.parent_property.columns
self.is_composite = hasattr(self.parent_property, 'composite_class')
def setup_query(
self, context, entity, path, loadopt,
adapter, column_collection, memoized_populators, **kwargs):
for c in self.columns:
if adapter:
c = adapter.columns[c]
column_collection.append(c)
fetch = self.columns[0]
if adapter:
fetch = adapter.columns[fetch]
memoized_populators[self.parent_property] = fetch
def init_class_attribute(self, mapper):
self.is_class_level = True
coltype = self.columns[0].type
# TODO: check all columns ? check for foreign key as well?
active_history = self.parent_property.active_history or \
self.columns[0].primary_key or \
mapper.version_id_col in set(self.columns)
_register_attribute(
self.parent_property, mapper, useobject=False,
compare_function=coltype.compare_values,
active_history=active_history
)
def create_row_processor(
self, context, path,
loadopt, mapper, result, adapter, populators):
# look through list of columns represented here
# to see which, if any, is present in the row.
for col in self.columns:
if adapter:
col = adapter.columns[col]
getter = result._getter(col, False)
if getter:
populators["quick"].append((self.key, getter))
break
else:
populators["expire"].append((self.key, True))
@log.class_logger
@properties.ColumnProperty.strategy_for(query_expression=True)
class ExpressionColumnLoader(ColumnLoader):
def __init__(self, parent, strategy_key):
super(ExpressionColumnLoader, self).__init__(parent, strategy_key)
def setup_query(
self, context, entity, path, loadopt,
adapter, column_collection, memoized_populators, **kwargs):
if loadopt and "expression" in loadopt.local_opts:
columns = [loadopt.local_opts["expression"]]
for c in columns:
if adapter:
c = adapter.columns[c]
column_collection.append(c)
fetch = columns[0]
if adapter:
fetch = adapter.columns[fetch]
memoized_populators[self.parent_property] = fetch
def create_row_processor(
self, context, path,
loadopt, mapper, result, adapter, populators):
# look through list of columns represented here
# to see which, if any, is present in the row.
if loadopt and "expression" in loadopt.local_opts:
columns = [loadopt.local_opts["expression"]]
for col in columns:
if adapter:
col = adapter.columns[col]
getter = result._getter(col, False)
if getter:
populators["quick"].append((self.key, getter))
break
else:
populators["expire"].append((self.key, True))
def init_class_attribute(self, mapper):
self.is_class_level = True
_register_attribute(
self.parent_property, mapper, useobject=False,
compare_function=self.columns[0].type.compare_values,
accepts_scalar_loader=False
)
@log.class_logger
@properties.ColumnProperty.strategy_for(deferred=True, instrument=True)
@properties.ColumnProperty.strategy_for(do_nothing=True)
class DeferredColumnLoader(LoaderStrategy):
"""Provide loading behavior for a deferred :class:`.ColumnProperty`."""
__slots__ = 'columns', 'group'
def __init__(self, parent, strategy_key):
super(DeferredColumnLoader, self).__init__(parent, strategy_key)
if hasattr(self.parent_property, 'composite_class'):
raise NotImplementedError("Deferred loading for composite "
"types not implemented yet")
self.columns = self.parent_property.columns
self.group = self.parent_property.group
def create_row_processor(
self, context, path, loadopt,
mapper, result, adapter, populators):
# this path currently does not check the result
# for the column; this is because in most cases we are
# working just with the setup_query() directive which does
# not support this, and the behavior here should be consistent.
if not self.is_class_level:
set_deferred_for_local_state = \
self.parent_property._deferred_column_loader
populators["new"].append((self.key, set_deferred_for_local_state))
else:
populators["expire"].append((self.key, False))
def init_class_attribute(self, mapper):
self.is_class_level = True
_register_attribute(
self.parent_property, mapper, useobject=False,
compare_function=self.columns[0].type.compare_values,
callable_=self._load_for_state,
expire_missing=False
)
def setup_query(
self, context, entity, path, loadopt,
adapter, column_collection, memoized_populators,
only_load_props=None, **kw):
if (
(
loadopt and
'undefer_pks' in loadopt.local_opts and
set(self.columns).intersection(
self.parent._should_undefer_in_wildcard)
)
or
(
loadopt and
self.group and
loadopt.local_opts.get('undefer_group_%s' % self.group, False)
)
or
(
only_load_props and self.key in only_load_props
)
):
self.parent_property._get_strategy(
(("deferred", False), ("instrument", True))
).setup_query(
context, entity,
path, loadopt, adapter,
column_collection, memoized_populators, **kw)
elif self.is_class_level:
memoized_populators[self.parent_property] = _SET_DEFERRED_EXPIRED
else:
memoized_populators[self.parent_property] = _DEFER_FOR_STATE
def _load_for_state(self, state, passive):
if not state.key:
return attributes.ATTR_EMPTY
if not passive & attributes.SQL_OK:
return attributes.PASSIVE_NO_RESULT
localparent = state.manager.mapper
if self.group:
toload = [
p.key for p in
localparent.iterate_properties
if isinstance(p, StrategizedProperty) and
isinstance(p.strategy, DeferredColumnLoader) and
p.group == self.group
]
else:
toload = [self.key]
# narrow the keys down to just those which have no history
group = [k for k in toload if k in state.unmodified]
Loading ...