Параллельная обработка для цикла в датафрейме - PullRequest
0 голосов
/ 27 сентября 2019

Хотите параллельно обрабатывать приведенный ниже код.По какой-то причине я должен подмножество, а затем применить функцию.Обратите внимание, что размер подмножества не будет согласованным.

for i in range(0, df['col1'].max()+1):
    subset = df[ df['col1'] == i ]
    subset_result = func(subset)
    result = result.append(subset_result)

1 Ответ

1 голос
/ 27 сентября 2019

Попробуйте этот код, используя многопроцессорную обработку :

import multiprocessing

def f(x):
    return x*x

def chunks(l, n):
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

if __name__ == '__main__':
    n_core = multiprocessing.cpu_count()
    p = multiprocessing.Pool(processes= n_core)

    data = range(0, 8)
    subsets = chunks(data, n_core)
    subset_results = []
    for subset in subsets:
        subset_results.append(p.map(f, subset))

    print(subset_results)

В вашем случае функция чанков, которая может вам помочь, выглядит следующим образом:

def chunks_series(s):
    subsets = []
    for i in range(s.max() + 1):
        subset = s[s == i]
        subsets.append(subset.values)
    return subsets

subsets = chunks_series(df['col1'])

Или вы можете делать все в одном цикле:

n_core = multiprocessing.cpu_count()
p = multiprocessing.Pool(processes=n_core)   
s = df['col1']
subset_results = []

for i in range(s.max() + 1):
    subset = s[s == i]
    subset_results.append(p.map(f, subset))

Я предпочел ввести функцию чанка, даже если для вашего случая она не дает преимуществ, чтобы сделать код более понятным и обобщаемым.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...