Массовое копирование с SQL Server на PostgreSQL - PullRequest
0 голосов
/ 01 декабря 2019

Мне нужно копировать большую таблицу SAP (28 столбцов, в основном nvarchar, 100 миллионов записей) из SQL Server в базу данных PostgreSQL как можно быстрее каждый день (дельта-обновление невозможно) с использованием Python.

Перед копированием я создал пустую таблицу с такой же структурой (эквивалентными типами данных), что и исходная таблица в базе данных PostgreSQL.

Сейчас я использую следующую реализацию, основанную на методе read_sql_query из pandasдля чтения и метод copy_from для записи; для копирования 1 млн записей требуется ~ 1 мин.

Кто-нибудь знает более быструю реализацию или есть идеи по оптимизации ?. Я уже переключил драйвер на ODBC и удалил все ограничения в целевой таблице, такие как первичные ключи.

PS: инструмент ETL Pentaho Dataintegration в два раза быстрее и является эталоном для меня.

Оператор SQL create для создания пустой целевой таблицы в базе данных PostgreSQL:

CREATE TABLE IF NOT EXISTS q_nlei (
  mandt varchar(3),
  einri varchar(4),
  falnr int,
  leist varchar(10),
  lnrls varchar(10),
  lfdbew varchar(5),
  anfoe varchar(8),
  anman varchar(1),
  erboe varchar(8),
  erman varchar(1),
  haust varchar(2),
  preis decimal(13,2),
  abrkz varchar(1),
  imeng varchar(4),
  ibgdt date,
  iendt date,
  ibzt time,
  iezt time,
  tarif varchar(2),
  tarls varchar(10),
  tarsp varchar(2),
  erdat date,
  updat date,
  storn varchar(1),
  remrk varchar(40),
  anpoe varchar(8),
  refky varchar(20),
  thergroup smallint);

Python-скрипт для копирования данных:

import pandas as pd
import io
from sqlalchemy import create_engine

src_engine = create_engine(f"mssql+pyodbc://{usr}:{pw}@{host}:1433/{db}?driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True)
snk_engine = create_engine(f"postgresql+psycopg2://{usr}:{pw}@{host}/{db}")

sql_query = "SELECT * FROM Q_NLEI"

for chunk in pd.read_sql_query(sql_query, src_engine, chunksize = 500000):
  chunk.columns = map(str.lower, chunk.columns)

  # Prepare data
  output = io.StringIO()
  chunk.to_csv(output, sep='\t', header=False, encoding="utf8", index=False)
  output.seek(0)
  # Insert data
  connection = snk_engine.raw_connection()
  cursor = connection.cursor()
  cursor.copy_from(output, snk_tbl, sep='\t', null='')
  connection.commit()
  cursor.close()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...