Как использовать psycopg2.extras.execute_values ​​для размещения в режиме ON CONFLICT - PullRequest
0 голосов
/ 13 мая 2018

Python 3.6
psycopg2 2.7.3.1
PostgreSQL 9,6

Hello

Я пытаюсь улучшить способ загрузки данных в свою базу данных и пытаюсь объединить пример с ОБНОВЛЕНИЕМ, используя psycopg2.extras.execute_values, найденный здесь с INSERT ... ON CONFLICT .. Пример .UPDATE найден здесь . Я довольно новичок в этом, поэтому не исключаю, что чего-то очевидного мне здесь не хватает.

Код как ниже:

for zip_file in os.listdir():
    if zipfile.is_zipfile(zip_file):
        logger.info("Processing %s", zip_file)
        for member in zipfile.ZipFile(zip_file).namelist():
            now = dt.datetime.now()
            prices_list = []
            zipfile.ZipFile(zip_file).extract(member, path=temp)
            local_xml = os.path.join(temp, member)
            tree = ET.parse(local_xml)
            root = tree.getroot()
            ns = root.tag[:-4]
            for finhist in root.findall("./%sFinancialHistory" % ns):
                asset_id = int(finhist.get("Id"))
                logger.debug("Processing %s", asset_id)
                for prices in finhist.findall("./%sPrices" % ns):
                    price_currency = prices.get("Currency")
                    for priceset in prices.findall("./%sPriceSet" % ns):
                        price_date = priceset.get("Date")
                        for price in priceset.findall("./%sPrice" % ns):
                            price_value = float(price.text)
                            price_type = price.get("Type")
                            prices_list.append((asset_id, price_date, price_type, price_value, price_currency, now, zip_file))
            try:
                os.remove(local_xml)
            except Exception:
                logger.error("File cannot be deleted", exc_info=True)
            cur = conn.cursor()
            try:
                execute_values(cur, 'WITH VALUES %s AS data (asset_id, price_date, price_type, price_value, price_currency, now, zip_file) \
                           INSERT INTO "LTSF"."Prices"("Asset_ID", "Price_Date", "Price_Type", "Price_Value", "Price_Currency", "Mod_Date", "Zip_File") \
                           data.asset_id, data.price_date, data.price_type, data.price_value, data.price_currency, data.now) \
                           ON CONFLICT \
                           UPDATE "LTSF"."Prices" \
                           "Price_Value"=data.price_value, "Price_Currency"=data.price_currency, "Mod_Date"=data.now, "Zip_File"=data.zip_file \
                           WHERE "Asset_ID"=data.asset_id, "Price_Date"=data.price_date, "Price_Type"=data.price_type', prices_list)
            except Exception:
                logger.error("Problem upserting Prices", exc_info=True)
            conn.commit()
            cur.close()

Ошибка, которую я получаю, в частности, выглядит следующим образом, заставляя меня думать, что ей не нравится моя часть WITH ... AS, но она основана на вики-части PostgreSQL:

Traceback (most recent call last):
  File "C:/Users/u0136211/Documents/LTSF/LTSF Model Import.py", line 76, in <module>
    WHERE "Asset_ID"=data.asset_id, "Price_Date"=data.price_date, "Price_Type"=data.price_type', prices_list)
  File "C:\Users\u0136211\AppData\Local\Continuum\Anaconda3\lib\site-packages\psycopg2\extras.py", line 1256, in execute_values
    cur.execute(b''.join(parts))
psycopg2.ProgrammingError: syntax error at or near "68082102"
LINE 1: WITH VALUES (68082102,'2018-04-30','Nav',2.01275,'BRL','2018...

Буду очень признателен за любые указатели здесь, так как мне нужен гораздо более эффективный способ апсайта, чем я делал до настоящего времени, и был очень рад видеть разницу, которую выполняет execute_values ​​при выполнении простых вставок.

Спасибо!

1 Ответ

0 голосов
/ 13 мая 2018

Больше поисков и попыток, и этот ответ о переполнении стека помог мне получить нужное значение, используя 'исключенный'.

Код, переработанный и работающий:

for zip_file in os.listdir():
    if zipfile.is_zipfile(zip_file):
        logger.info("Processing %s", zip_file)
        for member in zipfile.ZipFile(zip_file).namelist():
            now = dt.datetime.now()
            prices_list = []
            zipfile.ZipFile(zip_file).extract(member, path=temp)
            local_xml = os.path.join(temp, member)
            tree = ET.parse(local_xml)
            root = tree.getroot()
            ns = root.tag[:-4]
            for finhist in root.findall("./%sFinancialHistory" % ns):
                asset_id = int(finhist.get("Id"))
                logger.debug("Processing %s", asset_id)
                for prices in finhist.findall("./%sPrices" % ns):
                    price_currency = prices.get("Currency")
                    for priceset in prices.findall("./%sPriceSet" % ns):
                        price_date = priceset.get("Date")
                        for price in priceset.findall("./%sPrice" % ns):
                            price_value = float(price.text)
                            price_type = price.get("Type")
                            prices_list.append((asset_id, price_date, price_type, price_value, price_currency, now, zip_file))
            try:
                os.remove(local_xml)
            except Exception:
                logger.error("File cannot be deleted", exc_info=True)
            cur = conn.cursor()
            try:
                execute_values(cur, 'INSERT INTO "LTSF"."Prices"("Asset_ID", "Price_Date", "Price_Type", "Price_Value", "Price_Currency", "Mod_Date", "Zip_File")   \
                           VALUES %s \
                           ON CONFLICT ("Asset_ID", "Price_Date", "Price_Type") \
                           DO UPDATE SET \
                           "Price_Value"=excluded."Price_Value", "Price_Currency"=excluded."Price_Currency", "Mod_Date"=excluded."Mod_Date", "Zip_File"=excluded."Zip_File"', prices_list)
            except Exception:
                logger.error("Problem upserting Prices", exc_info=True)
            conn.commit()
            cur.close()

Спасибо " a_horse_with_no_name "!

...