Django: объединить базы данных для одного приложения - PullRequest
3 голосов
/ 28 ноября 2011

У меня одно и то же приложение django, работающее на двух серверах, у каждого своя собственная локальная база данных.Я хочу перейти на один сервер, опять же с локальной базой данных.

Какой самый простой способ заполнить новую базу данных данными моделей старых баз данных, не теряя ссылок между моделями?(проблемы с первичными ключами и т.д. ...)

Ответы [ 2 ]

2 голосов
/ 28 ноября 2011

всегда есть данные дампа из django, которые довольно просты в использовании.

или вы можете сделать это вручную:

  • если2 базы данных используют одни и те же данные (они являются зеркальными по отношению друг к другу) и одну и ту же структуру таблицы, вы можете просто запустить syncdb из django, чтобы создать новую структуру таблицы, а затем выполнить дамп и импорт (я предполагаю, что вы используете mysql, но общая идея та же самая) старая база данных в новую

  • , если две базы данных совместно используют разные данные (все еще с одинаковой структурой), вы должны импортировать каждую отдельную строкудве базы данных: таким образом, вы будете поддерживать отношения и т. д., но ваш уникальный идентификатор будет обновлен до новой единственной базы данных.

  • , если две базы данных различаются как в данных, так иструктура, вам придется запустить два импорта sincdb и два, но это не похоже на ваш случай

Это ссылка на команду дампа MySQL

0 голосов
/ 18 мая 2019

Если ваши базы данных используют одну и ту же модель данных с разными объектами, вы можете использовать эту пользовательскую команду, созданную мной для аналогичной проблемы.

Вместо того, чтобы объединять данные приборов с существующими моделями (как это делает loaddata), он добавляет объект всех приборов, сбрасывая все pk. Отношения M2M управляются в конце процесса, сопоставляя старые первичные ключи с новыми первичными ключами:

import os
import warnings
from collections import defaultdict

from django.core.management import CommandError
from django.core.management.utils import parse_apps_and_model_labels
from django.core.management.commands.loaddata import Command as LoadDataCommand, humanize
from django.core.management.color import no_style
from django.db import (
    DEFAULT_DB_ALIAS, DatabaseError, IntegrityError, connections, router
)

from django.core import serializers
from django.db import transaction
from django.db.models.fields.related import RelatedField, ManyToManyField


class Command(LoadDataCommand):
    help = 'Installs the named fixture(s) in the database.'
    missing_args_message = (
        "No database fixture specified. Please provide the path of at least "
        "one fixture in the command line."
    )

    def add_arguments(self, parser):
        parser.add_argument('args', metavar='fixture', nargs='+', help='Fixture labels.')
        parser.add_argument(
            '--database', default=DEFAULT_DB_ALIAS,
            help='Nominates a specific database to load fixtures into. Defaults to the "default" database.',
        )
        parser.add_argument(
            '--app', dest='app_label',
            help='Only look for fixtures in the specified app.',
        )
        parser.add_argument(
            '-e', '--exclude', action='append', default=[],
            help='An app_label or app_label.ModelName to exclude. Can be used multiple times.',
        )
        parser.add_argument(
            '--format',
            help='Format of serialized data when reading from stdin.',
        )

    # TODO delete equals to overridden
    def handle(self, *fixture_labels, **options):
        self.using = options['database']
        self.app_label = options['app_label']
        self.verbosity = options['verbosity']
        self.excluded_models, self.excluded_apps = parse_apps_and_model_labels(options['exclude'])
        self.format = options['format']

        with transaction.atomic(using=self.using):
            self.appenddata(fixture_labels)

        # Close the DB connection -- unless we're still in a transaction. This
        # is required as a workaround for an edge case in MySQL: if the same
        # connection is used to create tables, load data, and query, the query
        # can return incorrect results. See Django #7572, MySQL #37735.
        if transaction.get_autocommit(self.using):
            connections[self.using].close()

    def appenddata(self, fixture_labels):
        # Most of the code is used only to manage transaction and fixture file format reuser it and override load_label instead
        self.loaddata(fixture_labels)

    def load_label(self, fixture_label):
        """Load fixtures files for a given label."""
        self.objs_idx = ObjectDict()
        self.objects = []
        self.show_progress = self.verbosity >= 3
        self.deferred_m2m = []

        for fixture_file, fixture_dir, fixture_name in self.find_fixtures(fixture_label):
            _, ser_fmt, cmp_fmt = self.parse_name(os.path.basename(fixture_file))
            open_method, mode = self.compression_formats[cmp_fmt]
            fixture = open_method(fixture_file, mode)
            try:
                self.fixture_count += 1
                objects_in_fixture = 0
                loaded_objects_in_fixture = 0
                if self.verbosity >= 2:
                    self.stdout.write(
                        "Installing %s fixture '%s' from %s."
                        % (ser_fmt, fixture_name, humanize(fixture_dir))
                    )

                objects = serializers.deserialize(
                    ser_fmt, fixture, using=self.using, ignorenonexistent=True,
                    handle_forward_references=False,
                )

                for obj in objects:
                    objects_in_fixture += 1
                    if (obj.object._meta.app_config in self.excluded_apps or
                            type(obj.object) in self.excluded_models):
                        continue
                    if router.allow_migrate_model(self.using, obj.object.__class__):
                        loaded_objects_in_fixture += 1
                        self.models.add(obj.object.__class__)
                        # Load all fixture in memory
                        self.objs_idx.append_deserialized_object(obj)
                        self.objects.append(obj)
                    if obj.deferred_fields:
                        self.objs_with_deferred_fields.append(obj)
                if objects and self.show_progress:
                    self.stdout.write('')  # add a newline after progress indicator
                self.loaded_object_count += loaded_objects_in_fixture
                self.fixture_object_count += objects_in_fixture
            except Exception as e:
                if not isinstance(e, CommandError):
                    e.args = ("Problem installing fixture '%s': %s" % (fixture_file, e),)
                raise e
            finally:
                fixture.close()

            # Warn if the fixture we loaded contains 0 objects.
            if objects_in_fixture == 0:
                warnings.warn(
                    "No fixture data found for '%s'. (File format may be "
                    "invalid.)" % fixture_name,
                    RuntimeWarning
                )
        # Once you have all object in memory you can load them
        for obj in self.objects:
            self.process_object(obj.object)
        if self.verbosity >= 1:
            self.stdout.write('... All objects saved ...')
        # Once all objects have been save (append mode) and new pks have been assigned add m2m relations
        for obj, field_attname, related_pk in self.deferred_m2m:
            attr = getattr(obj, field_attname)
            attr.add(related_pk)
            if self.verbosity >= 3:
                self.stdout.write('Adding relation for field {0}: {1} -> {2}'.format(field_attname, obj.pk, related_pk))
        # Disabled for security reason
        # raise ValueError('Disabled')

    def process_object(self, obj):
        if obj is None:
            raise ValueError('None object in process object')

        old_pk = obj.pk
        new_pk = self.objs_idx[obj]['new_pk']
        # Object has been save yet no work
        if new_pk:
            return new_pk

        self.manage_related_field(obj)

        if self.verbosity >= 2:
            self.stdout.write('Saving object: (%s, %s)' % (obj.__class__, obj))
        obj.pk = None
        try:
            obj.save(using=self.using)
            if self.show_progress:
                self.stdout.write(
                    '\rSaving object: (%s, %s)' % (obj.__class__, obj),
                    ending=''
                )
        # psycopg2 raises ValueError if data contains NUL chars.
        except (DatabaseError, IntegrityError, ValueError) as e:
            e.args = ("Could not load %(app_label)s.%(object_name)s(pk=%(pk)s): %(error_msg)s" % {
                'app_label': obj.object._meta.app_label,
                'object_name': obj.object._meta.object_name,
                'pk': obj.object.pk,
                'error_msg': e,
            },)
            raise
        self.objs_idx.data[obj._meta.model][old_pk]['new_pk'] = obj.pk
        return obj.pk

    def manage_related_field(self, obj):
        related_fields = [field for field in obj._meta.get_fields() if isinstance(field, RelatedField)]
        if len(related_fields) > 0:  # has not related field
            for field in related_fields:
                if field.related_model in self.excluded_models:
                    continue

                if type(field) is ManyToManyField:
                    attr = getattr(obj, field.attname)
                    attr.clear()
                    m2m_pks = self.objs_idx[obj]['deserialized_object'].m2m_data[field.name]
                    for m2m_pk in m2m_pks:
                        related_obj = self.objs_idx.data[field.related_model][m2m_pk]['object']
                        new_related_pk = self.process_object(related_obj)
                        self.deferred_m2m.append((obj, field.attname, new_related_pk))
                        # attr.add(new_related_pk)
                else:
                    related_obj = self.objs_idx.data[field.related_model][getattr(obj, field.attname)]['object']
                    if related_obj is not None:
                        new_related_pk = self.process_object(related_obj)
                        setattr(obj, field.attname, new_related_pk)


class ObjectDict(object):
    """
    Dictionary to easily retrieve fixture object based on class and their original primary key
    """

    def __init__(self):
        self.data = defaultdict(lambda: defaultdict(lambda: {'new_pk': None, 'object': None}))

    @staticmethod
    def from_deserialized_objects(deserialized_objects):
        instance = ObjectDict()
        for deserialized_object in deserialized_objects:
            instance.append_deserialized_object(deserialized_object)
        return instance

    def __getitem__(self, item):
        return self.data[item._meta.model][item.old_pk]

    def append_deserialized_object(self, deserialized_object):
        obj = deserialized_object.object
        setattr(obj, 'old_pk', obj.pk)
        self[obj]['object'] = obj
        self[obj]['deserialized_object'] = deserialized_object

Я предлагаю вам протестировать все в тестовой базе данных django по умолчанию с помощью этого TestCase. Для заполнения тестовой базы данных он использует предварительно сохраненный файл фикстуры из приложения django (используя python manage.py dumpdata), после чего применяет пользовательскую команду для добавления всех объектов из других дампов фикстур базы данных.

from collections import defaultdict

from django.core import serializers
from django.core.management.utils import parse_apps_and_model_labels
from django.test import Client, TestCase
from utils.tests import reverse
from django.core.management import call_command
from django.test import TestCase
from django.apps import apps

from apps.commons.accounts.models import User
from apps.commons.accounts.tests import MultiUserTestCase

class TestCustomCommands(TestCase):
    # Create fixture from the primary database to test everything
    fixtures = ['tmp/dump/test_append_data_fixtures_pre.json']

    def test_appenddata(self):
        fixture_to_import = 'tmp/dump/fixtures_to_import.json'
        excludes = ['sites.Site']
        # Counts objects before appenddata per model
        count_pre = {}
        for model in apps.get_models():
            count_pre[model] = model.objects.count()

        self.excluded_models, self.excluded_apps = parse_apps_and_model_labels(excludes)

        # Counts objects to append per model
        with open(fixture_to_import, 'r') as f:
            objects = serializers.deserialize('json', f, ignorenonexistent=True)
            count_new = defaultdict(lambda: 0)
            for obj in objects:
                if obj.object._meta.model in excludes:
                    continue
                count_new[obj.object._meta.model] += 1

        command = ['appenddata', fixture_to_import]
        for exclude in excludes:
            command += ['-e', exclude]
        command += ['-v', '0']
        call_command(*command)

        # Verify with count that all objects have been imported
        for model in apps.get_models():
            self.assertEqual(count_pre[model] + count_new[model], model.objects.count(), msg='Count mismatch for model %s' % model)

Пример теста (добавление данных с Website2 на Website1):

# Website 1
python manage.py dumpdata app1 app2 ... > test_append_data_fixtures_pre.json

# Website 2
python manage.py dumpdata app1 app2 ... > fixture_to_import.json

# Website 1, run the provided test
python manage.py test TestCustomCommands.test_appenddata

Пример использования (добавление данных с Website2 на Website1):

# Website 1
python manage.py appenddata fixture_to_import.json
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...