Как создать механизм подключения к базе данных в каждом подпроцессе Dask для параллельных тысяч запросов sql, без повторного создания механизма в каждом запросе - PullRequest
0 голосов
/ 17 марта 2020

Мне нужно смущенно распараллеливать работу по извлечению тысяч sql запросов из базы данных. Вот упрощенный пример.

##Env info: python=3.7 postgresql=10 dask=latest
##generate the example db table.
from sqlalchemy import create_engine
import pandas as pd
import numpy as np

engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
data = pd.DataFrame(np.random.randint(0,100 , size=(30000,5)),columns=['a','b','c','d','e'])
data.to_sql('tablename',engine,index=True,if_exists='append')

Во-первых, это базовый пример c без параллели между точками.

from sqlalchemy import create_engine
import pandas as pd
import numpy as np

engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
def job(indexstr):
    'send the query, fetch the data, do some calculate and return'
    sql='select * from public.tablename where index='+indexstr
    df=pd.read_sql_query(sql, engine, index_col='index',)
    ##get the data and do some analysis.
    return np.sum(df.values)
for v in range(1000):
    lists.append(job(str(v)))
### wall time:17s

Это не так быстро, как мы представляем, так как запрос к базе данных и анализ данных могут стоить времени и там больше простаивающих процессоров.

Затем я пытаюсь использовать dask для параллельного параллелизма.

def jobWithEngine(indexstr):
    `engine cannot be serialized between processes thus create each own.`
    engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
    sql='select * from public.tablename where index='+indexstr
    df=pd.read_sql_query(sql, engine, index_col='index',)
    return np.sum(df.values)
import dask
dask.config.set(scheduler='processes')
import dask.bag as db
dbdata=db.from_sequence([str(v) for v in range(1000)])
dbdata=dbdata.map(lambda x:jobWithEngine(x))
results_bag = dbdata.compute()
###Wall time:1min8s

Проблема в том, что я считаю, что создание движка занимает больше времени, а есть тысячи об этом.

Он будет воссоздан в каждом sql запросе, который действительно дорог и может стоить sh службы базы данных!

Так что я думаю, это должно быть более элегантно, как это:

import dask
dask.config.set(scheduler='processes')
import dask.bag as db
dbdata=db.from_sequence([str(v) for v in range(1000)])
dbdata=dbdata.map(lambda x:job(x,init=create_engine))
results_bag = dbdata.compute()

1. Основной процесс создает 8 подпроцессов.

2. Каждый процесс создает свой собственный механизм для инициализации подготовки задания.

3. Затем Основной процесс отправляет им 1000 заданий и возвращает 1000.

4.После того, как все сделано, механизм подпроцесса останавливается и уничтожает подпроцесс.

Или dask уже сделал это и дополнительное время приходит от связи между процессами ESS

...