Параллельное отображение xf () вместо f (x) для многих функций - PullRequest
0 голосов
/ 25 октября 2018

У меня есть очень большой фрейм данных панд, над которым я хочу отобразить много функций.Поскольку кадр большой, я написал некоторый код для распараллеливания этого:

import pandas as pd
import numpy as np
from multiprocessing import cpu_count(), Pool

my_frame = pd.DataFrame(...) # A large data frame with the column "data"

def parallel_map(series: pd.Series, func):
    cores = cpu_count()
    partitions = cores
    data_split = np.array_split(series, partitions)
    print(f"Parallelizing with {cores} cores...")
    with Pool(cores) as pool:
        data = pd.concat(pool.map(func, data_split))
    pool.join()
    return data

То, с чем я хочу вызвать это: pd.Series.map, т.е. я хочу вычислять вещи для каждой строки;что-то вроде этого:

def transform_data(entry):
    # Do expensive stuff
    return entry

Непараллельно, я мог бы теперь сделать

my_frame["data"].map(transform_data)

Однако для параллельной версии мне нужно определить дополнительную функцию в глобальном пространстве имен для инвертированиявызывающий, потому что Pool.map применяется f(x), но я хочу позвонить x.f().Функция должна быть способна работать с пулом, чтобы ее можно было запускать из пула:

def inverted_transform_data(column: pd.Series):
    return column.map(transform_data)

Теперь я могу назвать параллельную версию следующим образом:

parallel_map(data=my_frame["data"], func=inverted_transform_data)

Проблема в том, чтоЯ хочу сделать это для многих функций, которые должны обрабатываться последовательно, например transform_data1, transform_data2, ....Это требует от меня создания этой глобальной функции-оболочки для каждого из них.

Есть ли лучшая альтернатива, которая все еще способна к засолению?

Ответы [ 2 ]

0 голосов
/ 30 октября 2018

Я выбрал «бюджетное» решение, потому что не хотел вводить dask как зависимость.Он просто создает класс вызываемой оболочки:

class InvertedCallerMap(object):

    def __init__(self, func):
        """
        Required so the parallel map can call x.f() instead of f(x) without running into pickling issues
        :param func: Function to invert from x.f() to f(x)
        """
        self.func = func

    def __call__(self, column: pd.Series):
        return column.map(self.func)


def parallel_map(series, func, invert=True):
    cores = cpu_count()
    partitions = cores
    data_split = np.array_split(series, partitions)
    if invert:
        func = InvertedCallerMap(func=func)
    with Pool(cores) as pool:
        data = pd.concat(pool.map(func, data_split))
    pool.join()
    return data
0 голосов
/ 25 октября 2018

Даск!https://dask.org/

Dask - это проект, специально предназначенный для параллельных панд.Я настоятельно рекомендую вам рассмотреть это для вашего случая использования.Если вы просто хотите повысить производительность, придерживаясь панд, ознакомьтесь с документами здесь:

https://pandas.pydata.org/pandas-docs/stable/enhancingperf.html

И эта статья мне показалась особенно полезной:

https://engineering.upside.com/a-beginners-guide-to-optimizing-pandas-code-for-speed-c09ef2c6a4d6

Редактировать:

С помощью dask вы бы сделали:

import dask.dataframe as dd

df = # import method such as dd.read_csv("df.csv")
df.apply(func, ...) # or dd.data_col.apply(func, ...)
df.compute()
...