Как использовать многопроцессорный пул в цикле for при сохранении данных? - PullRequest
0 голосов
/ 04 октября 2018

У меня есть некоторые данные, где я пытаюсь применить multiprocessing.pool к нему, поскольку у меня есть машина с 16 процессорами.Здесь я генерирую некоторые псевдоданные:

y = pd.Series(np.random.randint(400, high=600, size=1250))
date_today = datetime.now()
x = pd.date_range(date_today, date_today + timedelta(1250), freq='D')
data = pd.DataFrame(columns=['Date','Price'])
data['Date'] = x
data['Price'] = y
d={name: group for name, group in data.groupby(np.arange(len(data)) // (len(data)))} 

Что я хочу, так это то, чтобы я применял пул в параметрах цикла for.Таким образом, используя процессор на каждую константу:

parameters = range(300,550,50)
portfolio = pd.DataFrame(columns=['Parameter','Date','Price','Calculation'])
for key, value in sorted(d.items()):
    for constante in parameters:
        print('Constante:',constante)
        # HERE I WANT TO USE MP.POOL()

В коде я использую какое-то окно смещения для выполнения вычислений.Это самая простая версия кода.Поэтому я хочу назначить процесс каждой константе в параметрах при записи в DF.Как этого добиться?

1 Ответ

0 голосов
/ 04 октября 2018

Возможно, вы захотите использовать multiprocessing.pool.map чуть-чуть вот так, хотя вам, вероятно, придется подстраиваться под ваши нужды ...

from functools import partial
from multiprocessing import Pool

def pool_map_fn(value=None, constante=None, i=None):
    s = {'val': value[i:i+constante]}
    window = pd.concat([s['val']['Date'],s['val']['Price']], axis=1)
    window['Price'] = pd.to_numeric(window['Price'], errors='coerce').fillna(0)
    calc = window['Price'].mean()                                        
    date_variable = window['Date'].iloc[-1]
    price_var = window['Price'].iloc[-1]
    if price_var < calc:
        print('Parameter',constante,'Lower than average',date_variable,price_var,calc)  
        portfolio = portfolio.append({'Parameter': constante,
                                      'Date': date_variable, 
                                      'Price': price_var,
                                      'Calculation': calc}, ignore_index=True)
    if price_var > calc:
        print('Parameter',constante,'Higher than average',date_variable,price_var,calc)

parameters = range(300,550,50)
portfolio = pd.DataFrame(columns=['Parameter','Date','Price','Calculation'])
for key, value in sorted(d.items()):
    for constante in parameters:
        with Pool() as pool:
            results = pool.map(partial(pool_map_fn, value=value, constante=constante),
                               range(len(value) - constante + 1))

Примечание: это не проверено, но должно работать, еслиВы получаете ошибки, пытайтесь их устранить, поскольку концепция должна быть обоснованной.

...