from __future__ import unicode_literals
import datetime
import functools
import re
from itertools import chain
from django.conf import settings
from django.db import models
from django.db.migrations import operations
from django.db.migrations.migration import Migration
from django.db.migrations.operations.models import AlterModelOptions
from django.db.migrations.optimizer import MigrationOptimizer
from django.db.migrations.questioner import MigrationQuestioner
from django.db.migrations.utils import COMPILED_REGEX_TYPE, RegexObject
from django.utils import six
from .topological_sort import stable_topological_sort
class MigrationAutodetector(object):
"""
Takes a pair of ProjectStates, and compares them to see what the
first would need doing to make it match the second (the second
usually being the project's current state).
Note that this naturally operates on entire projects at a time,
as it's likely that changes interact (for example, you can't
add a ForeignKey without having a migration to add the table it
depends on first). A user interface may offer single-app usage
if it wishes, with the caveat that it may not always be possible.
"""
def __init__(self, from_state, to_state, questioner=None):
self.from_state = from_state
self.to_state = to_state
self.questioner = questioner or MigrationQuestioner()
self.existing_apps = {app for app, model in from_state.models}
def changes(self, graph, trim_to_apps=None, convert_apps=None, migration_name=None):
"""
Main entry point to produce a list of applicable changes.
Takes a graph to base names on and an optional set of apps
to try and restrict to (restriction is not guaranteed)
"""
changes = self._detect_changes(convert_apps, graph)
changes = self.arrange_for_graph(changes, graph, migration_name)
if trim_to_apps:
changes = self._trim_to_apps(changes, trim_to_apps)
return changes
def deep_deconstruct(self, obj):
"""
Recursive deconstruction for a field and its arguments.
Used for full comparison for rename/alter; sometimes a single-level
deconstruction will not compare correctly.
"""
if isinstance(obj, list):
return [self.deep_deconstruct(value) for value in obj]
elif isinstance(obj, tuple):
return tuple(self.deep_deconstruct(value) for value in obj)
elif isinstance(obj, dict):
return {
key: self.deep_deconstruct(value)
for key, value in obj.items()
}
elif isinstance(obj, functools.partial):
return (obj.func, self.deep_deconstruct(obj.args), self.deep_deconstruct(obj.keywords))
elif isinstance(obj, COMPILED_REGEX_TYPE):
return RegexObject(obj)
elif isinstance(obj, type):
# If this is a type that implements 'deconstruct' as an instance method,
# avoid treating this as being deconstructible itself - see #22951
return obj
elif hasattr(obj, 'deconstruct'):
deconstructed = obj.deconstruct()
if isinstance(obj, models.Field):
# we have a field which also returns a name
deconstructed = deconstructed[1:]
path, args, kwargs = deconstructed
return (
path,
[self.deep_deconstruct(value) for value in args],
{
key: self.deep_deconstruct(value)
for key, value in kwargs.items()
},
)
else:
return obj
def only_relation_agnostic_fields(self, fields):
"""
Return a definition of the fields that ignores field names and
what related fields actually relate to.
Used for detecting renames (as, of course, the related fields
change during renames)
"""
fields_def = []
for name, field in sorted(fields):
deconstruction = self.deep_deconstruct(field)
if field.remote_field and field.remote_field.model:
del deconstruction[2]['to']
fields_def.append(deconstruction)
return fields_def
def _detect_changes(self, convert_apps=None, graph=None):
"""
Returns a dict of migration plans which will achieve the
change from from_state to to_state. The dict has app labels
as keys and a list of migrations as values.
The resulting migrations aren't specially named, but the names
do matter for dependencies inside the set.
convert_apps is the list of apps to convert to use migrations
(i.e. to make initial migrations for, in the usual case)
graph is an optional argument that, if provided, can help improve
dependency generation and avoid potential circular dependencies.
"""
# The first phase is generating all the operations for each app
# and gathering them into a big per-app list.
# We'll then go through that list later and order it and split
# into migrations to resolve dependencies caused by M2Ms and FKs.
self.generated_operations = {}
# Prepare some old/new state and model lists, separating
# proxy models and ignoring unmigrated apps.
self.old_apps = self.from_state.concrete_apps
self.new_apps = self.to_state.apps
self.old_model_keys = []
self.old_proxy_keys = []
self.old_unmanaged_keys = []
self.new_model_keys = []
self.new_proxy_keys = []
self.new_unmanaged_keys = []
for al, mn in sorted(self.from_state.models.keys()):
model = self.old_apps.get_model(al, mn)
if not model._meta.managed:
self.old_unmanaged_keys.append((al, mn))
elif al not in self.from_state.real_apps:
if model._meta.proxy:
self.old_proxy_keys.append((al, mn))
else:
self.old_model_keys.append((al, mn))
for al, mn in sorted(self.to_state.models.keys()):
model = self.new_apps.get_model(al, mn)
if not model._meta.managed:
self.new_unmanaged_keys.append((al, mn))
elif (
al not in self.from_state.real_apps or
(convert_apps and al in convert_apps)
):
if model._meta.proxy:
self.new_proxy_keys.append((al, mn))
else:
self.new_model_keys.append((al, mn))
# Renames have to come first
self.generate_renamed_models()
# Prepare lists of fields and generate through model map
self._prepare_field_lists()
self._generate_through_model_map()
# Generate non-rename model operations
self.generate_deleted_models()
self.generate_created_models()
self.generate_deleted_proxies()
self.generate_created_proxies()
self.generate_altered_options()
self.generate_altered_managers()
# Generate field operations
self.generate_renamed_fields()
self.generate_removed_fields()
self.generate_added_fields()
self.generate_altered_fields()
self.generate_altered_unique_together()
self.generate_altered_index_together()
self.generate_altered_db_table()
self.generate_altered_order_with_respect_to()
self._sort_migrations()
self._build_migration_list(graph)
self._optimize_migrations()
return self.migrations
def _prepare_field_lists(self):
"""
Prepare field lists, and prepare a list of the fields that used
through models in the old state so we can make dependencies
from the through model deletion to the field that uses it.
"""
self.kept_model_keys = set(self.old_model_keys).intersection(self.new_model_keys)
self.kept_proxy_keys = set(self.old_proxy_keys).intersection(self.new_proxy_keys)
self.kept_unmanaged_keys = set(self.old_unmanaged_keys).intersection(self.new_unmanaged_keys)
self.through_users = {}
self.old_field_keys = set()
self.new_field_keys = set()
for app_label, model_name in sorted(self.kept_model_keys):
old_model_name = self.renamed_models.get((app_label, model_name), model_name)
old_model_state = self.from_state.models[app_label, old_model_name]
new_model_state = self.to_state.models[app_label, model_name]
self.old_field_keys.update((app_label, model_name, x) for x, y in old_model_state.fields)
self.new_field_keys.update((app_label, model_name, x) for x, y in new_model_state.fields)
def _generate_through_model_map(self):
"""
Through model map generation
"""
for app_label, model_name in sorted(self.old_model_keys):
old_model_name = self.renamed_models.get((app_label, model_name), model_name)
old_model_state = self.from_state.models[app_label, old_model_name]
for field_name, field in old_model_state.fields:
old_field = self.old_apps.get_model(app_label, old_model_name)._meta.get_field(field_name)
if (hasattr(old_field, "remote_field") and getattr(old_field.remote_field, "through", None)
and not old_field.remote_field.through._meta.auto_created):
through_key = (
old_field.remote_field.through._meta.app_label,
old_field.remote_field.through._meta.model_name,
)
self.through_users[through_key] = (app_label, old_model_name, field_name)
def _build_migration_list(self, graph=None):
"""
We need to chop the lists of operations up into migrations with
dependencies on each other. We do this by stepping up an app's list of
operations until we find one that has an outgoing dependency that isn't
in another app's migration yet (hasn't been chopped off its list). We
then chop off the operations before it into a migration and move onto
the next app. If we loop back around without doing anything, there's a
circular dependency (which _should_ be impossible as the operations are
all split at this point so they can't depend and be depended on).
"""
self.migrations = {}
num_ops = sum(len(x) for x in self.generated_operations.values())
chop_mode = False
while num_ops:
# On every iteration, we step through all the apps and see if there
# is a completed set of operations.
# If we find that a subset of the operations are complete we can
# try to chop it off from the rest and continue, but we only
# do this if we've already been through the list once before
# without any chopping and nothing has changed.
for app_label in sorted(self.generated_operations.keys()):
chopped = []
dependencies = set()
for operation in list(self.generated_operations[app_label]):
deps_satisfied = True
operation_dependencies = set()
for dep in operation._auto_deps:
is_swappable_dep = False
if dep[0] == "__setting__":
# We need to temporarily resolve the swappable dependency to prevent
# circular references. While keeping the dependency checks on the
# resolved model we still add the swappable dependencies.
# See #23322
resolved_app_label, resolved_object_name = getattr(settings, dep[1]).split('.')
original_dep = dep
dep = (resolved_app_label, resolved_object_name.lower(), dep[2], dep[3])
is_swappable_dep = True
if dep[0] != app_label and dep[0] != "__setting__":
# External app dependency. See if it's not yet
# satisfied.
for other_operation in self.generated_operations.get(dep[0], []):
if self.check_dependency(other_operation, dep):
deps_satisfied = False
break
if not deps_satisfied:
break
else:
if is_swappable_dep:
operation_dependencies.add((original_dep[0], original_dep[1]))
elif dep[0] in self.migrations:
operation_dependencies.add((dep[0], self.migrations[dep[0]][-1].name))
else:
# If we can't find the other app, we add a first/last dependency,
# but only if we've already been through once and checked everything
if chop_mode:
# If the app already exists, we add a dependency on the last migration,
# as we don't know which migration contains the target field.
# If it's not yet migrated or has no migrations, we use __first__
if graph and graph.leaf_nodes(dep[0]):
operation_dependencies.add(graph.leaf_nodes(dep[0])[0])
else:
operation_dependencies.add((dep[0], "__first__"))
else:
deps_satisfied = False
if deps_satisfied:
chopped.append(operation)
dependencies.update(operation_dependencies)
self.generated_operations[app_label] = self.generated_operations[app_label][1:]
else:
break
# Make a migration! Well, only if there's stuff to put in it
if dependencies or chopped:
if not self.generated_operations[app_label] or chop_mode:
subclass = type(str("Migration"), (Migration,), {"operations": [], "dependencies": []})
instance = subclass("auto_%i" % (len(self.migrations.get(app_label, [])) + 1), app_label)
instance.dependencies = list(dependencies)
instance.operations = chopped
instance.initial = app_label not in self.existing_apps
self.migrations.setdefault(app_label, []).append(instance)
chop_mode = False
else:
self.generated_operations[app_label] = chopped + self.generated_operations[app_label]
new_num_ops = sum(len(x) for x in self.generated_operations.values())
if new_num_ops == num_ops:
if not chop_mode:
chop_mode = True
else:
raise ValueError("Cannot resolve operation dependencies: %r" % self.generated_operations)
num_ops = new_num_ops
def _sort_migrations(self):
"""
Reorder to make things possible. The order we have already isn't bad,
but we need to pull a few things around so FKs work nicely inside the
same app
"""
for app_label, ops in sorted(self.generated_operations.items()):
# construct a dependency graph for intra-app dependencies
dependency_graph = {op: set() for op in ops}
for op in ops:
for dep in op._auto_deps:
if dep[0] == app_label:
for op2 in ops:
if self.check_dependency(op2, dep):
dependency_graph[op].add(op2)
# we use a stable sort for deterministic tests & general behavior
self.generated_operations[app_label] = stable_topological_sort(ops, dependency_graph)
def _optimize_migrations(self):
# Add in internal dependencies among the migrations
for app_label, migrations in self.migrations.items():
for m1, m2 in zip(migrations, migrations[1:]):
m2.dependencies.append((app_label, m1.name))
# De-dupe dependencies
Loading ...