# -*- coding: utf-8 -*-
"""
Title: Dumpscript management command
Project: Hardytools (queryset-refactor version)
Author: Will Hardy (http://willhardy.com.au)
Date: June 2008
Usage: python manage.py dumpscript appname > scripts/scriptname.py
$Revision: 217 $
Description:
Generates a Python script that will repopulate the database using objects.
The advantage of this approach is that it is easy to understand, and more
flexible than directly populating the database, or using XML.
* It also allows for new defaults to take effect and only transfers what is
needed.
* If a new database schema has a NEW ATTRIBUTE, it is simply not
populated (using a default value will make the transition smooth :)
* If a new database schema REMOVES AN ATTRIBUTE, it is simply ignored
and the data moves across safely (I'm assuming we don't want this
attribute anymore.
* Problems may only occur if there is a new model and is now a required
ForeignKey for an existing model. But this is easy to fix by editing the
populate script. Half of the job is already done as all ForeingKey
lookups occur though the locate_object() function in the generated script.
Improvements:
See TODOs and FIXMEs scattered throughout :-)
"""
import datetime
import sys
import django
import six
# conditional import, force_unicode was renamed in Django 1.5
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import (
AutoField, BooleanField, DateField, DateTimeField, FileField, ForeignKey,
)
from django_extensions.management.utils import signalcommand
from django_extensions.compat import (
list_app_labels, get_model_compat, get_models_for_app
)
from django_extensions.compat import CompatibilityBaseCommand as BaseCommand
try:
from django.utils.encoding import smart_unicode, force_unicode # NOQA
except ImportError:
from django.utils.encoding import smart_text as smart_unicode, force_text as force_unicode # NOQA
def orm_item_locator(orm_obj):
"""
This function is called every time an object that will not be exported is required.
Where orm_obj is the referred object.
We postpone the lookup to locate_object() which will be run on the generated script
"""
the_class = orm_obj._meta.object_name
original_class = the_class
pk_name = orm_obj._meta.pk.name
original_pk_name = pk_name
pk_value = getattr(orm_obj, pk_name)
while hasattr(pk_value, "_meta") and hasattr(pk_value._meta, "pk") and hasattr(pk_value._meta.pk, "name"):
the_class = pk_value._meta.object_name
pk_name = pk_value._meta.pk.name
pk_value = getattr(pk_value, pk_name)
clean_dict = make_clean_dict(orm_obj.__dict__)
for key in clean_dict:
v = clean_dict[key]
if v is not None and not isinstance(v, (six.string_types, six.integer_types, float, datetime.datetime)):
clean_dict[key] = six.u("%s" % v)
output = """ importer.locate_object(%s, "%s", %s, "%s", %s, %s ) """ % (
original_class, original_pk_name,
the_class, pk_name, pk_value, clean_dict
)
return output
class Command(BaseCommand):
help = 'Dumps the data as a customised python script.'
args = '[appname ...]'
def add_arguments(self, parser):
parser.add_argument(
'--autofield', action='store_false', dest='skip_autofield',
default=True, help='Include Autofields (like pk fields)')
@signalcommand
def handle(self, *app_labels, **options):
# Get the models we want to export
models = get_models(app_labels)
# A dictionary is created to keep track of all the processed objects,
# so that foreign key references can be made using python variable names.
# This variable "context" will be passed around like the town bicycle.
context = {}
# Create a dumpscript object and let it format itself as a string
script = Script(
models=models,
context=context,
stdout=self.stdout,
stderr=self.stderr,
options=options,
)
self.stdout.write(str(script))
self.stdout.write("\n")
def get_models(app_labels):
""" Gets a list of models for the given app labels, with some exceptions.
TODO: If a required model is referenced, it should also be included.
Or at least discovered with a get_or_create() call.
"""
# These models are not to be output, e.g. because they can be generated automatically
# TODO: This should be "appname.modelname" string
EXCLUDED_MODELS = (ContentType, )
models = []
# If no app labels are given, return all
if not app_labels:
for app_label in list_app_labels():
models += [m for m in get_models_for_app(app_label)
if m not in EXCLUDED_MODELS]
return models
# Get all relevant apps
for app_label in app_labels:
# If a specific model is mentioned, get only that model
if "." in app_label:
app_label, model_name = app_label.split(".", 1)
models.append(get_model_compat(app_label, model_name))
# Get all models for a given app
else:
models += [m for m in get_models_for_app(app_label)
if m not in EXCLUDED_MODELS]
return models
class Code(object):
""" A snippet of python script.
This keeps track of import statements and can be output to a string.
In the future, other features such as custom indentation might be included
in this class.
"""
def __init__(self, indent=-1, stdout=None, stderr=None):
if not stdout:
stdout = sys.stdout
if not stderr:
stderr = sys.stderr
self.indent = indent
self.stdout = stdout
self.stderr = stderr
def __str__(self):
""" Returns a string representation of this script.
"""
if self.imports:
self.stderr.write(repr(self.import_lines))
return flatten_blocks([""] + self.import_lines + [""] + self.lines, num_indents=self.indent)
else:
return flatten_blocks(self.lines, num_indents=self.indent)
def get_import_lines(self):
""" Takes the stored imports and converts them to lines
"""
if self.imports:
return ["from %s import %s" % (value, key) for key, value in self.imports.items()]
else:
return []
import_lines = property(get_import_lines)
class ModelCode(Code):
""" Produces a python script that can recreate data for a given model class. """
def __init__(self, model, context=None, stdout=None, stderr=None, options=None):
super(ModelCode, self).__init__(indent=0, stdout=stdout, stderr=stderr)
self.model = model
if context is None:
context = {}
self.context = context
self.options = options
self.instances = []
def get_imports(self):
""" Returns a dictionary of import statements, with the variable being
defined as the key.
"""
return {self.model.__name__: smart_unicode(self.model.__module__)}
imports = property(get_imports)
def get_lines(self):
""" Returns a list of lists or strings, representing the code body.
Each list is a block, each string is a statement.
"""
code = []
for counter, item in enumerate(self.model._default_manager.all()):
instance = InstanceCode(instance=item, id=counter + 1, context=self.context, stdout=self.stdout, stderr=self.stderr, options=self.options)
self.instances.append(instance)
if instance.waiting_list:
code += instance.lines
# After each instance has been processed, try again.
# This allows self referencing fields to work.
for instance in self.instances:
if instance.waiting_list:
code += instance.lines
return code
lines = property(get_lines)
class InstanceCode(Code):
""" Produces a python script that can recreate data for a given model instance. """
def __init__(self, instance, id, context=None, stdout=None, stderr=None, options=None):
""" We need the instance in question and an id """
super(InstanceCode, self).__init__(indent=0, stdout=stdout, stderr=stderr)
self.imports = {}
self.options = options
self.instance = instance
self.model = self.instance.__class__
if context is None:
context = {}
self.context = context
self.variable_name = "%s_%s" % (self.instance._meta.db_table, id)
self.skip_me = None
self.instantiated = False
self.waiting_list = list(self.model._meta.fields)
self.many_to_many_waiting_list = {}
for field in self.model._meta.many_to_many:
self.many_to_many_waiting_list[field] = list(getattr(self.instance, field.name).all())
def get_lines(self, force=False):
""" Returns a list of lists or strings, representing the code body.
Each list is a block, each string is a statement.
force (True or False): if an attribute object cannot be included,
it is usually skipped to be processed later. With 'force' set, there
will be no waiting: a get_or_create() call is written instead.
"""
code_lines = []
# Don't return anything if this is an instance that should be skipped
if self.skip():
return []
# Initialise our new object
# e.g. model_name_35 = Model()
code_lines += self.instantiate()
# Add each field
# e.g. model_name_35.field_one = 1034.91
# model_name_35.field_two = "text"
code_lines += self.get_waiting_list()
if force:
# TODO: Check that M2M are not affected
code_lines += self.get_waiting_list(force=force)
# Print the save command for our new object
# e.g. model_name_35.save()
if code_lines:
code_lines.append("%s = importer.save_or_locate(%s)\n" % (self.variable_name, self.variable_name))
code_lines += self.get_many_to_many_lines(force=force)
return code_lines
lines = property(get_lines)
def skip(self):
""" Determine whether or not this object should be skipped.
If this model instance is a parent of a single subclassed
instance, skip it. The subclassed instance will create this
parent instance for us.
TODO: Allow the user to force its creation?
"""
if self.skip_me is not None:
return self.skip_me
def get_skip_version():
""" Return which version of the skip code should be run
Django's deletion code was refactored in r14507 which
was just two days before 1.3 alpha 1 (r14519)
"""
if not hasattr(self, '_SKIP_VERSION'):
version = django.VERSION
# no, it isn't lisp. I swear.
self._SKIP_VERSION = (
version[0] > 1 or ( # django 2k... someday :)
version[0] == 1 and ( # 1.x
version[1] >= 4 or # 1.4+
version[1] == 3 and not ( # 1.3.x
(version[3] == 'alpha' and version[1] == 0)
)
)
)
) and 2 or 1 # NOQA
return self._SKIP_VERSION
if get_skip_version() == 1:
try:
# Django trunk since r7722 uses CollectedObjects instead of dict
from django.db.models.query import CollectedObjects
sub_objects = CollectedObjects()
except ImportError:
# previous versions don't have CollectedObjects
sub_objects = {}
self.instance._collect_sub_objects(sub_objects)
sub_objects = sub_objects.keys()
elif get_skip_version() == 2:
from django.db.models.deletion import Collector
from django.db import router
cls = self.instance.__class__
using = router.db_for_write(cls, instance=self.instance)
collector = Collector(using=using)
collector.collect([self.instance], collect_related=False)
Loading ...