Мне нужно смущенно распараллеливать работу по извлечению тысяч sql запросов из базы данных. Вот упрощенный пример.
##Env info: python=3.7 postgresql=10 dask=latest
##generate the example db table.
from sqlalchemy import create_engine
import pandas as pd
import numpy as np
engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
data = pd.DataFrame(np.random.randint(0,100 , size=(30000,5)),columns=['a','b','c','d','e'])
data.to_sql('tablename',engine,index=True,if_exists='append')
Во-первых, это базовый пример c без параллели между точками.
from sqlalchemy import create_engine
import pandas as pd
import numpy as np
engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
def job(indexstr):
'send the query, fetch the data, do some calculate and return'
sql='select * from public.tablename where index='+indexstr
df=pd.read_sql_query(sql, engine, index_col='index',)
##get the data and do some analysis.
return np.sum(df.values)
for v in range(1000):
lists.append(job(str(v)))
### wall time:17s
Это не так быстро, как мы представляем, так как запрос к базе данных и анализ данных могут стоить времени и там больше простаивающих процессоров.
Затем я пытаюсь использовать dask для параллельного параллелизма.
def jobWithEngine(indexstr):
`engine cannot be serialized between processes thus create each own.`
engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
sql='select * from public.tablename where index='+indexstr
df=pd.read_sql_query(sql, engine, index_col='index',)
return np.sum(df.values)
import dask
dask.config.set(scheduler='processes')
import dask.bag as db
dbdata=db.from_sequence([str(v) for v in range(1000)])
dbdata=dbdata.map(lambda x:jobWithEngine(x))
results_bag = dbdata.compute()
###Wall time:1min8s
Проблема в том, что я считаю, что создание движка занимает больше времени, а есть тысячи об этом.
Он будет воссоздан в каждом sql запросе, который действительно дорог и может стоить sh службы базы данных!
Так что я думаю, это должно быть более элегантно, как это:
import dask
dask.config.set(scheduler='processes')
import dask.bag as db
dbdata=db.from_sequence([str(v) for v in range(1000)])
dbdata=dbdata.map(lambda x:job(x,init=create_engine))
results_bag = dbdata.compute()
1. Основной процесс создает 8 подпроцессов.
2. Каждый процесс создает свой собственный механизм для инициализации подготовки задания.
3. Затем Основной процесс отправляет им 1000 заданий и возвращает 1000.
4.После того, как все сделано, механизм подпроцесса останавливается и уничтожает подпроцесс.
Или dask уже сделал это и дополнительное время приходит от связи между процессами ESS