Python - pool.map_async передает параметры в функцию - PullRequest
0 голосов
/ 01 марта 2019

Я получаю данные в список pandas.Dataframe.Затем мне нужно отправить эти данные в БД.Это инструмент ~ 2 минуты на итерацию.

Итак, я хотел бы знать, что параллельная обработка - это хорошая идея?(нет проблем с блокировкой или что-то в этом роде?)

Итак, я хотел бы реализовать это из:

for df in df_list:
    # Send a DF's batch to the DB
    print('Sending DF\'s data to DB')
    df.to_sql('ga_canaux', engine, if_exists='append', index=False)
    db.check_data()

К чему-то, что я нашел о многопроцессорности:

with multiprocessing.Pool(processes=4) as pool:
    results = pool.map_async(df.to_sql(???), df_list)
    results.wait()

Как я могупередать необходимые мне параметры в df.to_sql с помощью map_async?

РЕДАКТИРОВАТЬ:

Я пытаюсь что-то передать N аргументов, таких как:

pool = multiprocessing.Pool()
    args = ((df, engine, db) for df in df_list)
    results = pool.map(multiproc, args)
    results.wait()

, но я получаюошибка TypeError: can't pickle _thread._local objects

EDIT2:

Я немного изменил способ, которым я делаю mp, и это своего рода работа (179 с 732 с тем же набором данных).Но я сталкиваюсь с ошибкой при попытке чтения из БД внутри пула.

# Connect to the remote DB
global DB
DB = Database()
global ENGINE
ENGINE = DB.connect()

pool = mp.Pool(mp.cpu_count() - 1)
pool.map(multiproc, df_list)
pool.close()
pool.join()

def multiproc(df):
    print('Sending DF\'s data to DB')
    df.to_sql('ga_canaux', ENGINE, if_exists='append', index=False)
    DB.check_data() // HERE

Ошибка:

(psycopg2.OperationalError) SSL SYSCALL error: EOF detected
 [SQL: 'SELECT COUNT(*) FROM ga_canaux'] (Background on this error at: http://sqlalche.me/e/e3q8)

РЕДАКТИРОВАТЬ 3получил тайм-аут БД: psycopg2.DatabaseError: SSL SYSCALL error: Operation timed out

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...