Python ETL - Пакетная или итеративная загрузка больших наборов данных в базу данных Oracle с помощью cx_Oracle - PullRequest
2 голосов
/ 09 октября 2019

Использование Python для загрузки набора данных из 10-мм записей в таблицу базы данных Oracle. Датафрейм создан без проблем. При загрузке записи в фрейм данных слишком большая ошибка из cx_Oralce.

Стремясь циклически перебирать кадры данных и пакетно загружать записи 10 ММ, вставляя по 100 000 записей за раз.

Код, показанный ниже, работает, но только для небольших наборов данных, которые помещаются в выделенную память. Мне нужен тот, который работает для пакетов и больших наборов данных

Пробовал перебирать строки, но это занимает очень много времени. Также попытался загрузить гораздо меньший фрейм данных - это работает, но не достигает цели.

Также пытался использовать Bindarray и размер массива для размещения данных, но ничего не получалось.

import pandas as pd
import datetime
import sys
import re
from itertools import groupby, islice, takewhile
import cx_Oracle

format = '%y_%m_%d'

TODAY = datetime.date.today()
add = datetime.timedelta(days=1)
yesterday = datetime.date.today() - add
dd = datetime.date.strftime(TODAY,format)

# connection variables
connection = cx_Oracle.connect("user/Oracle_database_connect_info")
cur = connection.cursor()

# dataframe headers
columns = ['C1','C2','C3','C4']

# -- >> test and sample the file
csv_df = pd.read_csv(r'csvfile_location')

# add record_id for values
csv_df_idx = csv_df.index.values +1
csv_df.insert(0,column = 'RECORD_ID' , value=csv_df_idx)


### TABLE ALREADY CREATED IN DATABASE ###


for index, row in csv_df.iterrows():
    ### Insert and Iterate to inset records
    ### Convert to list for easy load into DB
    csv_df_dataset_lst = csv_df.values.tolist()
    insert_statement = """
    INSERT INTO TEST_LOAD
        ( RECORD_ID ,C1 ,C2 ,C3 ,C4)values (:1,:2,:3,:4,:5)    """

    # control number of records to bind for insert
    # cur.bindarraysize = 100000 # --->>> did not work
    # cur.arraysize = 100000 # --->>> did not work
    cur.executemany(insert_statement,csv_df_dataset_lst)
    connection.commit()
connection.close()

1 Ответ

0 голосов
/ 24 октября 2019

Разобрался. Хитрость заключалась в том, чтобы написать функцию, которая разбивала фрейм данных на сегменты в зависимости от размера пакетов, которые нужно загрузить.

Ниже приведен окончательный код.

import pandas as pd
import numpy as np
import datetime
import sys
import re
from itertools import groupby, islice, takewhile
import cx_Oracle

format = '%y_%m_%d'

TODAY = datetime.date.today()
add = datetime.timedelta(days=1)
yesterday = datetime.date.today() - add
dd = datetime.date.strftime(TODAY,format)

# connection variables
connection = cx_Oracle.connect("user/Oracle_database_connect_info")
cur = connection.cursor()

# dataframe headers
columns = ['C1','C2','C3','C4']

# -- >> test and sample the file
csv_df = pd.read_csv(r'csvfile_location')

# add record_id for values
csv_df_idx = csv_df.index.values +1
csv_df.insert(0,column = 'RECORD_ID' , value=csv_df_idx)


### TABLE ALREADY CREATED IN DATABASE ###


# set batch size ie record count
batch_size = 100000

# create chunker function to separate the dataframe into batches
# Note: last batch will contain smallest amout of records.
def chunker(seq,size):
    return(seq[pos:pos+size] for pos in range(0,len(seq),size))


insert_statement = """
    INSERT INTO TEST_LOAD
        ( RECORD_ID ,C1 ,C2 ,C3 ,C4)values (:1,:2,:3,:4,:5)    """

# Optional use cursor.prepare so Oracle DB avoids compiling the insert statement over and over
try:
    cur.prepare(insert_statement)
except cx_Oracle.DatabaseError as Exception:
    printf('Failed to prepare insert cursor')
    printException(Exception)
    exit(1)

for i in chunker(csv_df,batch_size):
    ### Insert and Iterate to inset records
    ### Convert to list for easy load into DB
    csv_df_dataset_lst = csv_df.values.tolist()

    cur.executemany(insert_statement,csv_df_dataset_lst)
    connection.commit()
    # record counter to monitor the loading.
    number_of_records_loaded = cur.execute("""SELECT COUNT(*), SYSDATE FROM TEST_LOAD GROUP BY SYSDATE""")
    record_out = cur.fetchall()
    for row in record_out:
        print(row)
connection.close()
...