У меня есть ежедневные конвейеры данных, которые должны прочитать файл и записать данные в базу данных postgres. Некоторые из этих файлов могут представлять собой смесь новых и старых данных.
Мой текущий процесс состоит в том, чтобы взять файл и записать его в Dataframe, выгрузить Dataframe в промежуточную таблицу, затем выполнить и сохранить. Это работает, но я потенциально обновляю значения, которые точно такие же. Я бы сравнил все столбцы в строке, чтобы увидеть, есть ли новое значение, которое кажется более сложным, чем его стоимость (в некоторых таблицах может быть более 30 столбцов).
Есть ли лучший или более оптимальный способсделать это? Я чувствую, что чем больше моя таблица или исходные файлы, тем больше времени займет этот процесс. Вот пример кода моего процесса:
def example_import(file_path, engine):
'''
ETL example
'''
df_demo = download_from_ftp(file_path)
if df_demo.empty:
return
# some cleaning might happen here
df_demo.to_sql(con = engine, name = 'tbl', schema = 'staging', if_exists = 'replace', index = False, method = 'multi')
with open('/somefolder/upsert.sql') as fp:
upsert = fp.read()
result = engine.execute(upsert)
print('Inserted/Updated {} Records From {}'.format(result.rowcount, file_path))
Пример данных :
Текущая таблица (в действительности есть гораздо больше столбцов)
![Current Table](https://i.stack.imgur.com/dz8re.png)
Входящие данные (сочетание новых и старых. Некоторые старые PK имеют новые значения)
![enter image description here](https://i.stack.imgur.com/476DE.png)
Данные после Upsert
![enter image description here](https://i.stack.imgur.com/Ika9N.png)
Пример Upsert
INSERT INTO public."tbl" ("colA",
"colB",
"colC",
"colD",
"colE",
"colF",
"colG",
"colH",
"colI",
"colJ",
"colK",
"colL",
"colM",
"colN",
"colO",
"colP",
"colQ",
"colR",
"colS",
"colT",
"colU",
"colV"
SELECT "colA",
"colB",
"colC",
"colD",
"colE"::DATE,
"colF",
"colG"::DATE,
"colH",
"colI"::DATE,
"colJ"::DATE,
"colK",
"colL",
"colM",
"colN",
"colO",
"colP",
"colQ",
"colR",
"colS",
"colT",
"colU",
"colV"
FROM staging."tbl"
ON CONFLICT ("colA") DO UPDATE
SET "colB" = excluded."colB",
"colC" = excluded."colC",
"colD" = excluded."colD",
"colE" = excluded."colE",
"colF" = excluded."colF",
"colG" = excluded."colG",
"colH" = excluded."colH",
"colI" = excluded."colI",
"colJ" = excluded."colJ",
"colK" = excluded."colK",
"colL" = excluded."colL",
"colM" = excluded."colM",
"colN" = excluded."colN",
"colO" = excluded."colO",
"colP" = excluded."colP",
"colQ" = excluded."colQ",
"colR" = excluded."colR",
"colS" = excluded."colS",
"colT" = excluded."colT",
"colU" = excluded."colU",
"colV" = excluded."colV"