Добавление дополнительного случайного параметра в качестве аргумента в функцию pool.map в python 3.4.7 - PullRequest
1 голос
/ 06 ноября 2019

Я хочу использовать многопроцессорность для большого набора данных, чтобы найти произведение двух столбцов и отфильтровать набор данных с заданным параметром в аргументе. Я создал тестовый набор, но мне не удалось заставить многопроцессорную работу работать с этим набором.

Сначала я пытаюсь разделить набор данных в функции parallelize_dataframe, а затем применить функции умножения и фильтрации в функции subset_col. ,Позже я добавляю полный набор данных обратно в parallelize_dataframe.

import numpy as np
import pandas as pd
from multiprocessing import Pool
from multiprocessing import Lock

df = pd.DataFrame({'col1': [1, 0, 1, 1, 1, 0, 0, 1, 0, 1],
                'col2': ['aa', 'aa', 'aa', 'bb', 'bb', 'bb', 'bb', 'cc', 'cc', 'cc'],
                'col3': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
                'col4': [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]})



def subset_col(df, p):
    print("Working with number: " + str(p))
    df[col5] = df[col3]*df[col4]
    df= df[df['col1'] == p]


def parallelize_dataframe(df, p, func, n_cores=80):
    df_split = np.array_split(df, n_cores)
    pool = Pool(n_cores)
    df = pd.concat(pool.map(func, df_split, p))
    pool.close()
    pool.join()
    return df


df3 = parallelize_dataframe(df,1,subset_col)


Результатом должен быть продукт col3 и col4 с col1, отфильтрованным со значением. Но я всегда получаю сообщение об ошибке:

File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in parallelize_dataframe
struct.error: 'i' format requires -2147483648 <= number <= 2147483647 

Однако, если я уберу фильтр "p" из всех функций, он будет работать совершенно нормально. Может кто-нибудь помочь мне отладить это?

1 Ответ

1 голос
/ 06 ноября 2019

Из официальных документов multiprocessing.Pool.map он "поддерживает только один повторяемый аргумент". Следовательно, вам нужно изменить интерфейс subset_col, чтобы вместо него принимать один аргумент. Кроме того, вы забыли сделать строки столбцов, что привело к ошибкам имени. Чтобы уменьшить количество вычислений, вы должны фильтровать перед умножением. Затем должно быть возвращено значение, если ваша функция не работает исключительно через побочные эффекты (я полагаю, вы не хотите этого, так как объединяете результаты пула).

def subset_col(pair):
    df, p = pair
    print("Working with number: " + str(p))
    df = df[df['col1'] == p].copy()
    df['col5'] = df['col3']
    return df

Далее нам нужно будет исправить то, как вы вызывали pool.map, поскольку он должен принимать только 2 аргумента в зависимости от того, что вы делаете (3-й, последний аргумент будет размером с кусочек). Поскольку вы хотите, чтобы один и тот же p использовался для каждого процесса, мы скомпилируем dfs с повторяющимся значением p для каждого. Также рассмотрите возможность использования диспетчера контекста для обработки закрывающихся ресурсов.

def parallelize_dataframe(df, p, func, n_cores=None):
    if n_cores is None:
        n_cores = os.cpu_count()

    dfs = np.array_split(df, n_cores)
    pairs = zip(dfs, itertools.repeat(p))
    with Pool(n_cores) as pool:
        result = pool.map(func, pairs)

    df = pd.concat(result)
    return df

Теперь это правильно возвращает новый фрейм данных. Но я определенно сомневаюсь, что у вас есть машина с 80 ядрами. Рассмотрите возможность реализации n_cores=None, чтобы Python динамически вычислял сколько ядер на вашем компьютере, используя os.cpu_count

df3 = parallelize_dataframe(df, 1, subset_col)

Согласно вашему запросу для варианта Pool.starmap:

def subset_col(df, p):
    # remove unpacking line
    ...

def parallelize_dataframe(df, p, func, n_cores=None):
    ...
    # change `pool.map(...)` to `pool.starmap(...)`
    ...

Однако вы должны отметить, что Pool не предлагает imap или imap_unordered альтернатив для starmap, которые обе являются ленивыми оценочными версиями, которые отличаются, сохранять ли заказ или нет.

...