Python Pandas Upsert - PullRequest
       3

Python Pandas Upsert

0 голосов
/ 16 октября 2019

У меня есть ежедневные конвейеры данных, которые должны прочитать файл и записать данные в базу данных postgres. Некоторые из этих файлов могут представлять собой смесь новых и старых данных.

Мой текущий процесс состоит в том, чтобы взять файл и записать его в Dataframe, выгрузить Dataframe в промежуточную таблицу, затем выполнить и сохранить. Это работает, но я потенциально обновляю значения, которые точно такие же. Я бы сравнил все столбцы в строке, чтобы увидеть, есть ли новое значение, которое кажется более сложным, чем его стоимость (в некоторых таблицах может быть более 30 столбцов).

Есть ли лучший или более оптимальный способсделать это? Я чувствую, что чем больше моя таблица или исходные файлы, тем больше времени займет этот процесс. Вот пример кода моего процесса:

def example_import(file_path, engine):
    '''
    ETL example
    '''

    df_demo = download_from_ftp(file_path)

    if df_demo.empty:
        return

    # some cleaning might happen here

    df_demo.to_sql(con = engine, name = 'tbl', schema = 'staging', if_exists = 'replace', index = False, method = 'multi')

    with open('/somefolder/upsert.sql') as fp:
        upsert = fp.read()

    result = engine.execute(upsert)
    print('Inserted/Updated {} Records From {}'.format(result.rowcount, file_path))

Пример данных :

Текущая таблица (в действительности есть гораздо больше столбцов)

Current Table

Входящие данные (сочетание новых и старых. Некоторые старые PK имеют новые значения)

enter image description here

Данные после Upsert

enter image description here

Пример Upsert

INSERT INTO public."tbl" ("colA",
                                 "colB",
                                 "colC",
                                 "colD",
                                 "colE",
                                 "colF",
                                 "colG",
                                 "colH",
                                 "colI",
                                 "colJ",
                                 "colK",
                                 "colL",
                                 "colM",
                                 "colN",
                                 "colO",
                                 "colP",
                                 "colQ",
                                 "colR",
                                 "colS",
                                 "colT",
                                 "colU",
                                 "colV"
SELECT "colA",
         "colB",
         "colC",
         "colD",
         "colE"::DATE,
         "colF",
         "colG"::DATE,
         "colH",
         "colI"::DATE,
         "colJ"::DATE,
         "colK",
         "colL",
         "colM",
         "colN",
         "colO",
         "colP",
         "colQ",
         "colR",
         "colS",
         "colT",
         "colU",
         "colV"
FROM staging."tbl"
ON CONFLICT ("colA") DO UPDATE 
    SET "colB" = excluded."colB",
         "colC" = excluded."colC",
         "colD" = excluded."colD",
         "colE" = excluded."colE",
         "colF" = excluded."colF",
         "colG" = excluded."colG",
         "colH" = excluded."colH",
         "colI" = excluded."colI",
         "colJ" = excluded."colJ",
         "colK" = excluded."colK",
         "colL" = excluded."colL",
         "colM" = excluded."colM",
         "colN" = excluded."colN",
         "colO" = excluded."colO",
         "colP" = excluded."colP",
         "colQ" = excluded."colQ",
         "colR" = excluded."colR",
         "colS" = excluded."colS",
         "colT" = excluded."colT",
         "colU" = excluded."colU",
         "colV" = excluded."colV"
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...