отправка каждого зацикленного вычисления панд в отдельный поток (python3.6.5) с помощью pool.map - PullRequest
0 голосов
/ 07 сентября 2018

с базовой панда df данных OHLCV финансового рынка, я пытаюсь добавить многочисленные вычисляемые столбцы в df. Большое количество столбцов и вычислений делает это медленным медленным медленным! Попытка многопроцессорности с помощью pool.map, но безрезультатно. В идеале каждая итерация цикла должна отправляться в отдельный поток. Упрощенные скользящие средние в коде ниже. Показан простой словарь и скользящее среднее работает МЕДЛЕННО TypeError: map () отсутствует 1 обязательный позиционный аргумент: 'iterable' Всю помощь оценил-тхх

import pandas as pd
from multiprocessing.dummy import Pool as ThreadPool

#####################################################
# DJIA_OHLCV_test.csv has format:
# Date,Open,High,Low,Close,Adj Close,Volume
# 
1/2/2015,17823.07031,17951.7793,17731.30078,17832.99023,17832.99023,76270000
# 
1/3/2015,17823.07031,17951.7793,17731.30078,17832.99023,17832.99023,76270000
DJIA = pd.read_csv('DJIA_OHLCV_test.csv')
"""
#####################################################
# # This works! please comment out to switch 
# MAdict = {'MA50':50, 'MA100':100, 'MA200':200} # Define Moving Average 
Windows

# for MAkey in MAdict:
#     DJIA[('ma' + MAkey)] = pd.Series.rolling(DJIA['Adj Close'], 
              window=MAdict[MAkey]).mean()

#####################################################
"""
# This doesn't work! please comment out to switch 
MAdict = {'MA50':50, 'MA100':100, 'MA200':200}
pool = ThreadPool(3)

def moving_average(MAkey):
    return pd.Series.rolling(DJIA['Adj Close'], window=MAdict[MAkey]).mean()

for MAkey in MAdict:
    DJIA[('ma' + MAkey)] = pool.map(moving_average(MAkey))

#####################################################
print(DJIA.tail())

1 Ответ

0 голосов
/ 09 сентября 2018

pool.map является блокирующим вызовом, поэтому вместо итерации по MAdict и вызова pool.map вам нужно передать итерируемое непосредственно в качестве аргумента pool.map:

import pandas as pd
from multiprocessing.dummy import Pool


def moving_average(ma):
    return pd.Series.rolling(djia['Adj Close'], window=ma).mean()


if __name__ == '__main__':

    N_WORKERS = 3
    MA_DICT = {'MA50':50, 'MA100':100, 'MA200':200}

    djia = pd.read_csv('DJIA_OHLCV_test.csv')

    with Pool(N_WORKERS) as pool:
        results = pool.map(moving_average, iterable=MA_DICT.values())

    # concatenate results and rename columns
    results = pd.concat(results, axis=1)
    results.columns = ['ma' + key for key in MA_DICT]

    djia = pd.concat([djia, results], axis=1)

    print(djia.tail())
...