Pandas применяется параллельно, когда ось = 0 - PullRequest
7 голосов
/ 19 марта 2020

Я хочу применить некоторую функцию ко всем pandas столбцам параллельно. Например, я хочу сделать это параллельно:

def my_sum(x, a):
    return x + a


df = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                   'num_wings': [2, 0, 0, 0]})
df.apply(lambda x: my_sum(x, 2), axis=0)

Я знаю, что есть пакет swifter, но он не поддерживает axis=0 в применении:

NotImplementedError: Swifter не может выполнить ось = 0 применяется к большим наборам данных. В настоящее время Dask не имеет примененной оси = 0. Более подробная информация на https://github.com/jmcarpenter2/swifter/issues/10

Dask также не поддерживает это для axis=0 (в соответствии с документацией в swifter).

Я гуглил несколько источники, но не смогли найти простое решение.

Не могу поверить, что это так сложно в pandas.

Ответы [ 3 ]

3 голосов
/ 22 марта 2020

Koalas предоставляет способ выполнять вычисления на кадре данных параллельно. Он принимает те же команды, что и pandas, но выполняет их на Apache Spark engine в фоновом режиме.

Обратите внимание, что для правильного использования необходима параллельная инфраструктура.

В своем блоге они сравнивают следующие куски кода:

pandas:

import pandas as pd
df = pd.DataFrame({'x': [1, 2], 'y': [3, 4], 'z': [5, 6]})
# Rename columns
df.columns = [‘x’, ‘y’, ‘z1’]
# Do some operations in place
df[‘x2’] = df.x * df.x

Коалы:

import databricks.koalas as ks
df = ks.DataFrame({'x': [1, 2], 'y': [3, 4], 'z': [5, 6]})
# Rename columns
df.columns = [‘x’, ‘y’, ‘z1’]
# Do some operations in place
df[‘x2’] = df.x * df.x
0 голосов
/ 28 марта 2020

По моему мнению, этот случай следует рассмотреть, сосредоточившись на том, как данные распределяются по доступным ресурсам. Dask предлагает map_partitions, который применяет функцию Python к каждому разделу DataFrame. Конечно, количество строк на раздел, с которыми может работать ваша рабочая станция, зависит от доступных аппаратных ресурсов. Вот пример, основанный на информации, которую вы предоставили в своем вопросе:

# imports
import dask
from dask import dataframe as dd
import multiprocessing as mp
import numpy as np
import pandas as pd

# range for values to be randomly generated
range_ = {
    "min": 0,
    "max": 100
}

# rows and columns for the fake dataframe
df_shape = (
                int(1e8), # rows
                2 # columns
            )

# some fake data
data_in = pd.DataFrame(np.random.randint(range_["min"], range_["max"], size = df_shape), columns = ["legs", "wings"])

# function to apply adding some value a to the partition
def my_sum(x, a):
    return x + a
"""
applies my_sum on the partitions rowwise (axis = 0)

number of partitions = cpu_count

the scheduler can be:
"threads": Uses a ThreadPool in the local process
"processes": Uses a ProcessPool to spread work between processes
"single-threaded": Uses a for-loop in the current thread
"""
data_out = dd.from_pandas(data_in, npartitions = mp.cpu_count()).map_partitions(
        lambda df: df.apply(
            my_sum, axis = 0, a = 2
        )
).compute(scheduler = "threads")

# inspection
print(data_in.head(5))
print(data_out.head(5))

Эта реализация была протестирована на случайно сгенерированном кадре данных с 100 000 000 строк и 2 столбцами.

Характеристики рабочей станции
Процессор: Процессор Intel (R) Core (TM) i7-8750H @ 2,20 ГГц
Общий объем памяти: 16698340 кБ
ОС: Ubuntu 18.04.4 LTS

0 голосов
/ 22 марта 2020

Вы можете использовать интерфейс с задержкой dask для настройки пользовательского рабочего процесса:

import pandas as pd
import dask
import distributed

# start local cluster, by default one worker per core
client = distributed.Client() 

@dask.delayed
def my_sum(x, a):
    return x + a

df = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                   'num_wings': [2, 0, 0, 0]})    

# Here, we mimic the apply command. However, we do not
# actually run any computation. Instead, that line of code 
# results in a list of delayed objects, which contain the 
# information what computation should be performed eventually
delayeds = [my_sum(df[column], 2) for column in df.columns]

# send the list of delayed objects to the cluster, which will 
# start computing the result in parallel. 
# It returns future objects, pointing to the computation while
# it is still running
futures = client.compute(delayeds)

# get all the results, as soon as they are ready. This returns 
# a list of pandas Series objects, each is one column of the 
# output dataframe
computed_columns = client.gather(futures)

# create dataframe out of individual columns
computed_df = pd.concat(computed_columns, axis = 1)

В качестве альтернативы, вы также можете использовать многопроцессорный бэкэнд dask:

import pandas as pd
import dask

@dask.delayed
def my_sum(x, a):
    return x + a

df = pd.DataFrame({'num_legs': [2, 4, 8, 0],
                   'num_wings': [2, 0, 0, 0]})    

# same as above
delayeds = [my_sum(df[column], 2) for column in df.columns]

# run the computation using the dask's multiprocessing backend
computed_columns = dask.compute(delayeds, scheduler = 'processes')

# create dataframe out of individual columns
computed_df = pd.concat(computed_columns, axis = 1)
...