Django Postgres миграция: самый быстрый способ засыпки столбца в таблице с 100 миллионами строк - PullRequest
0 голосов
/ 03 апреля 2020

У меня есть таблица в Postgres Thing, которая содержит 100 миллионов строк.

У меня есть столбец, который был заполнен с течением времени и в котором хранятся некоторые ключи. Ключи были префикс перед хранением. Давайте назовем это prefixed_keys.

Моя задача - использовать значения этого столбца, чтобы заполнить другой столбец теми же значениями, но с обрезанными префиксами. Давайте назовем это simple_keys.

Я попробовал следующую миграцию:

from django.db import migrations
import time


def backfill_simple_keys(apps, schema_editor):
    Thing = apps.get_model('thing', 'Thing')

    batch_size = 100000
    number_of_batches_completed = 0
    while Thing.objects.filter(simple_key__isnull=True).exists():
        things = Thing.objects.filter(simple_key__isnull=True)[:batch_size]
        for tng in things:
            prefixed_key = tng.prefixed_key
            if prefixed_key.startswith("prefix_A"):
                simple_key = prefixed_key[len("prefix_A"):]
            elif prefixed_key.startswith("prefix_BBB"):
                simple_key = prefixed_key[len("prefix_BBB"):]
            tng.simple_key = simple_key
        Thing.objects.bulk_update(
            things,
            ['simple_key'],
            batch_size=batch_size
        )
        number_of_batches_completed += 1
        print("Number of batches updated: ", number_of_batches_completed)
        sleep_seconds = 3
        time.sleep(sleep_seconds)

class Migration(migrations.Migration):

    dependencies = [
        ('thing', '0030_add_index_to_simple_key'),
    ]

    operations = [
         migrations.RunPython(
            backfill_simple_keys,
        ),
    ]

Каждая партия заняла ~ 7 минут. Что означает, что на это уйдут дни! Это также увеличило время ожидания базы данных, используемой в производстве.

1 Ответ

0 голосов
/ 03 апреля 2020

Поскольку вы все равно собираетесь go проходить через каждую запись в этой таблице, имеет смысл проследить ее за одну go с помощью курсора на стороне сервера.
Вызов
Thing.objects.filter(simple_key__isnull=True)[:batch_size]
Это будет дорого, особенно, когда индекс начнет расти.
Также вызов выше извлекает ВСЕ поля из этой таблицы, даже если вы собираетесь использовать только 2-3 поля.

update_query = """UPDATE table SET simple_key = data.key 
    FROM (VALUES %s) AS data (id, key) WHERE table.id = data.id"""
conn = psycopg2.connect(DSN, cursor_factory=RealDictCursor)
cursor = conn.cursor(name="key_server_side_crs")  # having a name makes it a SSC
update_cursor = conn.cursor()  # regular cursor
cursor.itersize = 5000  # how many records to retrieve at a time
cursor.execute("SELECT id, prefixed_key, simple_key FROM table")
count = 0
batch = []
for row in cursor:
    if not row["simple_key"]:
        simple_key = calculate_simple_key(row["prefixed_key"])
        batch.append[(row["id"], simple_key)]
    if len(batch) >= 1000  # how many records to update at once
        execute_values(update_cursor, update_query, batch, page_size=1000)
        batch = []
        time.sleep(0.1)  # allow the DB to "breathe"
    count += 1
    if count % 100000 == 0:  # print progress every 100K rows
        print("processed %d rows", count)

выше проверено NOT, поэтому рекомендуется создать копию нескольких миллионов строк таблицы и сначала проверить ее на соответствие ей.
Вы также можете проверить различные параметры размера пакета (как для получения, так и для обновления).

...