Агрегация Spark DataFrame: окна + разбиение по сравнению с операциями groupBy - PullRequest
0 голосов
/ 25 сентября 2018

Я собираюсь выполнить около пяти различных методов суммирования для большого количества данных.Как правило, я рассчитываю рассчитать среднее, минимальное, максимальное, стандартное отклонение и сумму по определенным временным окнам и другим измерениям.

Вот такой воспроизводимый пример, который я могу сделать:

import random
import string
import datetime

from pyspark.sql import SparkSession, functions as func
from pyspark.conf import SparkConf
from pyspark.sql.types import StringType, DoubleType, IntegerType
from pyspark.sql.window import Window

########## Setting up DataFrame ##########
def random_date(start, n):
    current = start
    for _ in range(n):
        current = current + datetime.timedelta(seconds=random.randrange(60))
        yield current

start_date = datetime.datetime(2013, 9, 20, 13, 00)

n_records = 50000000

dates = list(random_date(start_date, n_records))

other_data = []
for d in dates:
    categorical_data = tuple(random.choice(string.ascii_lowercase) for _ in range(1))
    numerical_data = tuple(random.randrange(100) for _ in range(20))
    other_data.append(categorical_data + numerical_data + (d,))

categorical_columns = ['cat_{}'.format(n) for n in range(1)]
numerical_columns = ['num_{}'.format(n) for n in range(20)]
date_column = ['date']

columns = categorical_columns + numerical_columns + date_column

df = sc.parallelize(other_data).toDF(columns)

df = df.withColumn('date_window', func.window('date', '5 minutes'))

df.registerTempTable('df')

########## End DataFrame setup ##########

На сегодняшний день я попробовал два метода: один с использованием встроенного механизма DataFrame.groupBy;другой с использованием pyspark.sql.window.Window * orderBy и partitionBy методов.

Обычно разработанный мной конвейер выглядит так:

  1. Для каждого числового столбца, группа- по категориальным столбцам cat_0 и date_window и вычислите пять суммарных статистических данных, перечисленных ранее.
  2. Для подхода pyspark.sql.window.Window присоедините вычисленный столбец напрямую, используя df.withColumn.Для подхода DataFrame.groupBy отслеживайте каждый результирующий DataFrame (каждый с тремя столбцами: два для столбцов группировки и один для вычисляемого столбца) - в конце присоедините каждый DataFrame, выполнив в основном операцию сокращения со столбцами группировки в качестве ключей..

Я оставил некоторые из конвейерного кода ниже, но в первую очередь меня интересуют мнения о том, 1) является ли какой-либо из них "наилучшей практикой" для этого типа работы, и 2) если нет,я упускаю что-то важное в экосистеме Spark, которое могло бы помочь мне сделать это намного быстрее / с меньшим количеством ресурсов?

В настоящее время подход groupBy работает немного лучше, но довольно громоздко с наличиемотследить каждый сгруппированный DataFrame и объединить их в конце.Подход Window не очень хорош, хотя синтаксически он немного чище и удобнее в обслуживании, IMO.В любом случае мне приходится выделять огромное количество вычислений для выполнения задания и его записи на диск в конце (без перераспределения / объединения).

gb_cols = ['cat_0', 'date_window']
strategies = {'sum', 'mean', 'stddev', 'max', 'min'}

Xcols = [col for col in df.columns if col.startswith('num')]

for col in Xcols[:]:
    for s in strategies:
        new_col = '{}_{}'.format(col, s)
        Xcols.append(new_col)

        if s == 'mean':
            calc_col_series = func.mean(col)
        elif s == 'stddev':
            calc_col_series = func.stddev(col)
        elif s == 'max':
            calc_col_series = func.max(col)
        elif s == 'min':
            calc_col_series = func.min(col)
        elif s == 'sum':
            calc_col_series = func.sum(col)
        elif s == 'median':
            query = '''
                SELECT
                    PERCENTILE_APPROX({}, 0.5)
                FROM df
                GROUP BY {}
            '''.format(col, ','.join(gb_cols))
            calc_col_series = spark.sql(query)

        df = df.withColumn(new_col, calc_col_series.over(agg_window))

        # Differencing inputs
        for difference in range(1, 3 + 1):
            # Last period's datapoints... moved to the future
            led_series = func.lag(df[new_col], difference).over(agg_window.orderBy(window_cols['orderBy']))
            diff_series = df[new_col] - led_series

            new_col_diff = '{}_{}_diff'.format(new_col, difference)
            df = df.withColumn(new_col_diff, diff_series)
            Xcols.append(new_col_diff)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...