распараллелить разбиение и обработку данных - PullRequest
0 голосов
/ 05 ноября 2018

Постановка задачи : Как распараллелить цикл for, который разбивает фрейм данных pandas на две части, также применяет функцию к каждой части параллельно и сохраняет объединенные результаты из функции в список для использовать после окончания цикла?

Для контекста я пытаюсь распараллелить реализацию дерева решений. Многие ответы, которые я видел ранее, связанные с этим вопросом, нуждаются в результате применения функции, чтобы быть фреймом данных, а результат просто объединяется в большой фрейм данных. Я считаю, что этот вопрос немного более общий.

Например, это код, который я хотел бы распараллелить:

# suppose we have some dataframe given to us
df = pd.DataFrame(....)
computation_results = []
# I would like to parallelize this whole loop and store the results of the
# computations in computation_results. min_rows and total_rows are known
# integers.
for i in range(min_rows, total_rows - min_rows + 1):
    df_left = df.loc[range(0, i), :].copy()
    df_right = df.loc[range(i, total_rows), :].copy()
    # foo is a function that takes in a dataframe and returns some
    # result that has no pointers to the passed dataframe. The following
    # two function calls should also be parallelized.
    left_results = foo(df_left)
    right_results = foo(df_right)
    # combine the results with some function and append that combination
    # to a list. The order of the results in the list does not matter.
    computation_results.append(combine_results(left_results, right_results))
# parallelization is not needed for the following function and the loop is over
use_computation_results(computation_results)

1 Ответ

0 голосов
/ 06 ноября 2018

Проверьте пример в https://docs.python.org/3.3/library/multiprocessing.html#using-a-pool-of-workers.

Так в вашем случае:

with Pool(processes=2) as pool:                  # start 2 worker processes
  for i in range(min_rows, total_rows - min_rows + 1):
     df_left = df.loc[range(0, i), :].copy()
     call_left = pool.apply_async(foo, df_left)  # evaluate "foo(df_left)" asynchronously
     df_right = df.loc[range(i, total_rows), :].copy() 
     call_right = pool.apply_async(foo, df_left) # evaluate "foo(df_right)" asynchronously
     left_results = call_left.get(timeout=1)     # wait and get left result
     right_results = call_right.get(timeout=1)   # wait and get right result
     computation_results.append(combine_results(left_results, right_results))
...