Настройка
- База данных PostgreSQL 10.6, настроенная с использованием Django Rest Framework (DRF), которая в конечном итоге заменит прежнюю систему.Содержит сотни миллионов строк.
- Один удаленный экземпляр PostgreSQL 10.6 с устаревшей схемой.
Задача
Импорт данных из устаревшей базы данных в Djangoбазы данных за разумное время (в минутах), чтобы эта задача могла быть выполнена во время запусков CI.
Пока что работаем
Сначала мы создали отдельный скрипт Python для использования API Django для импорта данных.Команда знала, что это будет медленно, но мы хотели протестировать API и получили полезную обратную связь.В наших средах разработки (в ВМ) это приводило к созданию, возможно, пары десятков строк в секунду.
Теперь мы хотим импортировать гораздо больше данных.Сначала мы переместили скрипт в команду администратора Django, чтобы мы могли создавать экземпляры сериализаторов DRF напрямую, а не вызывать API.Это было немного быстрее.
Затем мы попытались создать экземпляр сериализатора DRF с помощью many=True
, но это было не заметно быстрее.
Текущая самая быстрая реализация импортирует, возможно, 80-100 строк вво-вторых, поэтому мы решили сменить тактику.Импортер тщательно протестирован, поэтому мы решили попробовать использовать упаковщик сторонних данных для импорта данных с использованием необработанного SQL, начиная с одной из небольших таблиц.Полученный код Python выглядит следующим образом (для удобства чтения):
connection = psycopg2.connect(dsn='[Django DB connection parameters]')
…
with connection.cursor() as cursor:
cursor.execute('CREATE EXTENSION postgres_fdw')
cursor.execute(
"CREATE SERVER legacy FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '{}', dbname '{}', updatable 'false')".format(…)
)
cursor.execute(
"CREATE USER MAPPING FOR postgres SERVER legacy OPTIONS (user '{}', password '{}')".format(…)
)
cursor.execute('CREATE SCHEMA IF NOT EXISTS legacy_schema')
cursor.execute(
"IMPORT FOREIGN SCHEMA public LIMIT TO (small_table) FROM SERVER legacy INTO legacy_schema"
)
…
with connection.cursor() as cursor:
cursor.execute(
"""
INSERT INTO django_table (…)
SELECT …
FROM legacy_schema.small_table
RETURNING id, …
"""
)
Запуск этого SQL вручную в оболочке PostgreSQL работает нормально :
# INSERT INTO django_table …
id | …
----+-…
3 | …
(1 row)
INSERT 0 1
Проблема
Код Python висит на последнем execute
.На данный момент состояние в устаревшей базе данных выглядит следующим образом:
# SELECT query FROM pg_stat_activity WHERE state = 'active';
query
-----------------------------------------------------------------------------
DECLARE c1 CURSOR FOR SELECT … FROM public.small_table …
На данный момент SELECT pg_cancel_backend([pid])
ничего не делает.После SELECT pg_terminate_backend([pid])
код Python продолжает работать как обычно.Кроме того, запрос SELECT
в приведенном выше курсоре мгновенно возвращается, так что это не простая проблема с размером.
Я также обнаружил, что после простого START TRANSACTION
приведенный выше оператор DECLARE
прекрасно работает вустаревшая база данных, поэтому я подозреваю, что проблема в Python.strace -f
команды показывает бесконечный цикл (повторяющийся каждые 20/30 секунд), например:
<... futex resumed> ) = -1 ETIMEDOUT (Connection timed out)
futex(0x5627e2e8e430, FUTEX_WAKE, 1) = 1
<... futex resumed> ) = 0
futex(0xc420063d48, FUTEX_WAKE, 1 <unfinished ...>
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=20000}, NULL <unfinished ...>
<... futex resumed> ) = 1
<... futex resumed> ) = 0
pselect6(0, NULL, NULL, NULL, {tv_sec=0, tv_nsec=3000}, NULL <unfinished ...>
futex(0xc420062948, FUTEX_WAKE, 1) = 1
<... futex resumed> ) = 0
futex(0x5627e2e92d00, FUTEX_WAIT, 0, {tv_sec=29, tv_nsec=997730142} <unfinished ...>
futex(0xc420062948, FUTEX_WAIT, 0, NULL <unfinished ...>
<... pselect6 resumed> ) = 0 (Timeout)
futex(0xc420063d48, FUTEX_WAIT, 0, NULL <unfinished ...>
<... pselect6 resumed> ) = 0 (Timeout)
futex(0x5627e2e8e430, FUTEX_WAIT, 0, {tv_sec=60, tv_nsec=0}
Интересно, что я получаю совсем другую ошибку, если создаю курсоры с именами.Например, если я создаю первый курсор как cursor(name='cd1')
, я получаю
cursor.execute('CREATE EXTENSION postgres_fdw')
psycopg2.ProgrammingError: syntax error at or near "CREATE"
LINE 1: DECLARE "c1" CURSOR WITHOUT HOLD FOR CREATE EXTENSION postgr...
Почему Python зависает при создании / использовании курсора для этого запроса?
Использование psycopg2-binary 2.7.6.1 из PyPI.
Похоже, проблема в том, что при запуске этого кода уже установлено другое соединение непосредственно с устаревшей базой данных, потому что, когда я избегаю, он работает правильно,Я попробую обойти это и посмотреть, что произойдет.