Я получаю данные в список pandas.Dataframe
.Затем мне нужно отправить эти данные в БД.Это инструмент ~ 2 минуты на итерацию.
Итак, я хотел бы знать, что параллельная обработка - это хорошая идея?(нет проблем с блокировкой или что-то в этом роде?)
Итак, я хотел бы реализовать это из:
for df in df_list:
# Send a DF's batch to the DB
print('Sending DF\'s data to DB')
df.to_sql('ga_canaux', engine, if_exists='append', index=False)
db.check_data()
К чему-то, что я нашел о многопроцессорности:
with multiprocessing.Pool(processes=4) as pool:
results = pool.map_async(df.to_sql(???), df_list)
results.wait()
Как я могупередать необходимые мне параметры в df.to_sql
с помощью map_async
?
РЕДАКТИРОВАТЬ:
Я пытаюсь что-то передать N аргументов, таких как:
pool = multiprocessing.Pool()
args = ((df, engine, db) for df in df_list)
results = pool.map(multiproc, args)
results.wait()
, но я получаюошибка TypeError: can't pickle _thread._local objects
EDIT2:
Я немного изменил способ, которым я делаю mp
, и это своего рода работа (179 с 732 с тем же набором данных).Но я сталкиваюсь с ошибкой при попытке чтения из БД внутри пула.
# Connect to the remote DB
global DB
DB = Database()
global ENGINE
ENGINE = DB.connect()
pool = mp.Pool(mp.cpu_count() - 1)
pool.map(multiproc, df_list)
pool.close()
pool.join()
def multiproc(df):
print('Sending DF\'s data to DB')
df.to_sql('ga_canaux', ENGINE, if_exists='append', index=False)
DB.check_data() // HERE
Ошибка:
(psycopg2.OperationalError) SSL SYSCALL error: EOF detected
[SQL: 'SELECT COUNT(*) FROM ga_canaux'] (Background on this error at: http://sqlalche.me/e/e3q8)
РЕДАКТИРОВАТЬ 3получил тайм-аут БД: psycopg2.DatabaseError: SSL SYSCALL error: Operation timed out