Применение функции Python к Pandas сгруппированному DataFrame - каков наиболее эффективный подход для ускорения вычислений? - PullRequest
9 голосов
/ 24 февраля 2020

Я имею дело с довольно большим Pandas DataFrame - мой набор данных напоминает следующую df настройку:

import pandas as pd
import numpy  as np

#--------------------------------------------- SIZING PARAMETERS :
R1 =                    20        # .repeat( repeats = R1 )
R2 =                    10        # .repeat( repeats = R2 )
R3 =                541680        # .repeat( repeats = [ R3, R4 ] )
R4 =                576720        # .repeat( repeats = [ R3, R4 ] )
T  =                 55920        # .tile( , T)
A1 = np.arange( 0, 2708400, 100 ) # ~ 20x re-used
A2 = np.arange( 0, 2883600, 100 ) # ~ 20x re-used

#--------------------------------------------- DataFrame GENERATION :
df = pd.DataFrame.from_dict(
         { 'measurement_id':        np.repeat( [0, 1], repeats = [ R3, R4 ] ), 
           'time':np.concatenate( [ np.repeat( A1,     repeats = R1 ),
                                    np.repeat( A2,     repeats = R1 ) ] ), 
           'group':        np.tile( np.repeat( [0, 1], repeats = R2 ), T ),
           'object':       np.tile( np.arange( 0, R1 ),                T )
           }
        )

#--------------------------------------------- DataFrame RE-PROCESSING :
df = pd.concat( [ df,
                  df                                                  \
                    .groupby( ['measurement_id', 'time', 'group'] )    \
                    .apply( lambda x: np.random.uniform( 0, 100, 10 ) ) \
                    .explode()                                           \
                    .astype( 'float' )                                    \
                    .to_frame( 'var' )                                     \
                    .reset_index( drop = True )
                  ], axis = 1
                )

Примечание: В целях получения минимального примера , это может быть легко подмножество (например, с df.loc[df['time'] <= 400, :]), но так как я все равно имитирую данные, я думал, что оригинальный размер даст лучший обзор.

Для каждой группы, определенной ['measurement_id', 'time', 'group'], мне нужно вызвать следующую функцию:

from sklearn.cluster import SpectralClustering
from pandarallel     import pandarallel

def cluster( x, index ):
    if len( x ) >= 2:
        data = np.asarray( x )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.Series( clustering.labels_ + 1, index = index )
    else:
        return pd.Series( np.nan, index = index )

Для повышения производительности я попробовал два подхода:

Пакет Pandarallel

Первый подход состоял в том, чтобы распараллелить вычисления с использованием пакета pandarallel:

pandarallel.initialize( progress_bar = True )
df \
  .groupby( ['measurement_id', 'time', 'group'] ) \
  .parallel_apply( lambda x: cluster( x['var'], x['object'] ) )

Однако, это кажется неоптимальным, поскольку он потребляет много оперативной памяти, и не все ядра используются в вычислениях (даже если явно указать количество ядер в методе pandarallel.initialize()). Кроме того, иногда вычисления завершаются с различными ошибками, хотя у меня не было возможности найти причину этого (возможно, нехватка ОЗУ?).

PySpark Pandas UDF

I также дал Spark Pandas UDF go, хотя я совершенно новичок в Spark. Вот моя попытка:

import findspark;  findspark.init()

from pyspark.sql           import SparkSession
from pyspark.conf          import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types     import *

spark = SparkSession.builder.master( "local" ).appName( "test" ).config( conf = SparkConf() ).getOrCreate()
df = spark.createDataFrame( df )

@pandas_udf( StructType( [StructField( 'id', IntegerType(), True )] ), functionType = PandasUDFType.GROUPED_MAP )
def cluster( df ):
    if len( df['var'] ) >= 2:
        data = np.asarray( df['var'] )[:, np.newaxis]
        clustering = SpectralClustering( n_clusters   =  5,
                                         random_state = 42
                                         ).fit( data )
        return pd.DataFrame( clustering.labels_ + 1,
                             index = df['object']
                             )
    else:
        return pd.DataFrame( np.nan,
                             index = df['object']
                             )

res = df                                           \
        .groupBy( ['id_half', 'frame', 'team_id'] ) \
        .apply( cluster )                            \
        .toPandas()

К сожалению, производительность также была неудовлетворительной, и из того, что я прочитал в topi c, это может быть просто бременем использования функции UDF, написанной в Python и связанная с этим необходимость преобразования всех Python объектов в объекты Spark и обратно.

Итак, вот мои вопросы:

  1. Может ли любой из моих подходов быть отрегулированы, чтобы устранить возможные узкие места и улучшить производительность? (например, настройка PySpark, настройка неоптимальных операций и т. д. c.)
  2. Являются ли они лучшими альтернативами? Как они сравниваются с предоставляемыми решениями по производительности?

Ответы [ 3 ]

1 голос
/ 02 марта 2020

Q : " Может ли любой из моих подходов быть настроен для устранения возможных узких мест и улучшения производительности? ( например, настройка PySpark, настройка неоптимальных операций и т. д. c.)"

+1 для упоминания настройки накладные расходы для любой стратегии вычислений. Это всегда создает точку безубыточности, только после этого не-1025 * стратегия может принести какую-то полезную радость некоторого желаемого ускорения [TIME] -Домена (хотя, если иное, как правило, [SPACE] -Домена затрат на домен разрешить или остаться возможным - да, ОЗУ ... наличие и доступ к устройству такого размера, бюджет и другие подобные ограничения реального мира)

Во-первых,
предварительно проверка полета перед взлетом

новая, строгая формулировка закона Амдаля в настоящее время может включать оба эти дополнения pSO + pTO накладные расходы и отражают их при прогнозировании достижимых уровней ускорения, включая точку безубыточности, поскольку они могут стать значимыми (с точки зрения затрат / эффекта, эффективности) до go параллели.

enter image description here

Тем не менее,
, то есть , а не наша основная проблема здесь .
Это происходит следующим образом:

Далее
с учетом вычислительных затрат SpectralClustering(), который собирается использовать ядро ​​радиальной функции Больцмана ~ exp( -gamma * distance( data, data )**2 ), кажется, что нет никакого продвижения от разбиения data -объекта на любое количество разрозненных рабочих единиц, так как distance( data, data ) -компонент, по определению, имеет посетить все элементы data (ref. затраты на коммуникацию для топологий, переданных по принципу «любой к любому» с передачей значений { process | node }, по очевидным причинам ужасно плохи, если не наихудшие сценарии использования для обработки с распределением { process | node }, если не прямые анти-паттерны (за исключением некоторых действительно непонятных, без памяти / без состояний, но все же вычислительных структур).

Для аналитиков pedanti c, да - добавьте к этому (и мы уже можем сказать плохо состояние) затраты на - снова - от любого к любому k-означает -обработка, здесь о O( N^( 1 + 5 * 5 ) ), что идет за N ~ len( data ) ~ 1.12E6+, ужасно против нашего wi sh, чтобы иметь некоторую умную и быструю обработку.

Ну и что?

Хотя затраты на настройку не игнорируются, увеличение затрат на связь почти наверняка отключит любое улучшение от использования Вышеприведенные наброски попыток перейти из чисто [SERIAL] технологического процесса в некую форму просто - [CONCURRENT] или True- [PARALLEL] оркестровка некоторых рабочих подразделений из-за увеличения накладных расходов, связанных с необходимостью реализации (тандемная пара) любое-к-любому значение- прохождение топологии.

Если бы не они?

Ну, это звучит как оксюморон вычислительной науки - даже если бы это было возможно, расходы на предварительно вычисленные расстояния от любого к любому (что потребовало бы этих огромных [TIME] -Стоимость сложности домена "заранее" (Где? Как? Существуют ли какие-либо другие задержки, которых нельзя избежать, позволяющие скрыть возможную задержку с помощью некоторого (пока неизвестного) инкрементного наращивания полной в будущем матрицы расстояния на любую дистанцию? )) только перенесет эти принципиально существующие затраты в другое место в [TIME] - и [SPACE] - доменах, а не уменьшит их.

Q : «Являются ли они лучшими альтернативами? »

Единственный, как мне пока известно, - попытаться, если проблема может быть решена повторно. сформулировано в другой, сформулированный QUBO, проблемный способ хорошая новость заключается в том, что инструменты для этого существуют, база знаний из первых рук и практический опыт решения проблем существуют и расширяются)

Q : Как они сравниваются с предоставленными решениями по производительности?

Производительность захватывает дух - сформулированная в QUBO задача имеет многообещающий O(1) (!) решатель в постоянном времени (в [TIME] -Домене) и несколько ограничена в [SPACE] -Домене (где недавно анонсированные трюки LLNL могут помогите избежать этого физического мира, текущей реализации QPU, ограничения размеров задач).

0 голосов
/ 04 марта 2020

Я не эксперт по Dask, но в качестве основы я предоставляю следующий код:

import dask.dataframe as ddf

df = ddf.from_pandas(df, npartitions=4) # My PC has 4 cores

task = df.groupby(["measurement_id", "time", "group"]).apply(
    lambda x: cluster(x["var"], x["object"]),
    meta=pd.Series(np.nan, index=pd.Series([0, 1, 1, 1])),
)

res = task.compute()
0 голосов
/ 27 февраля 2020

Это не ответ, но ...

Если вы запустите

df.groupby(['measurement_id', 'time', 'group']).apply(
    lambda x: cluster(x['var'], x['object']))

(т.е. только с Pandas), вы заметите, что вы уже используете несколько сердечники. Это потому, что sklearn использует joblib по умолчанию для распараллеливания работы. Вы можете поменять планировщик в пользу Dask и, возможно, получить большую эффективность за счет совместного использования данных между потоками, но до тех пор, пока работа, которую вы выполняете, связана с процессором, как это, вы ничего не сможете сделать, чтобы ускорить его.

Короче говоря, это проблема алгоритма: выясните, что вам действительно нужно вычислить, прежде чем пытаться рассмотреть различные рамки для его вычисления.

...