Я собираюсь выполнить около пяти различных методов суммирования для большого количества данных.Как правило, я рассчитываю рассчитать среднее, минимальное, максимальное, стандартное отклонение и сумму по определенным временным окнам и другим измерениям.
Вот такой воспроизводимый пример, который я могу сделать:
import random
import string
import datetime
from pyspark.sql import SparkSession, functions as func
from pyspark.conf import SparkConf
from pyspark.sql.types import StringType, DoubleType, IntegerType
from pyspark.sql.window import Window
########## Setting up DataFrame ##########
def random_date(start, n):
current = start
for _ in range(n):
current = current + datetime.timedelta(seconds=random.randrange(60))
yield current
start_date = datetime.datetime(2013, 9, 20, 13, 00)
n_records = 50000000
dates = list(random_date(start_date, n_records))
other_data = []
for d in dates:
categorical_data = tuple(random.choice(string.ascii_lowercase) for _ in range(1))
numerical_data = tuple(random.randrange(100) for _ in range(20))
other_data.append(categorical_data + numerical_data + (d,))
categorical_columns = ['cat_{}'.format(n) for n in range(1)]
numerical_columns = ['num_{}'.format(n) for n in range(20)]
date_column = ['date']
columns = categorical_columns + numerical_columns + date_column
df = sc.parallelize(other_data).toDF(columns)
df = df.withColumn('date_window', func.window('date', '5 minutes'))
df.registerTempTable('df')
########## End DataFrame setup ##########
На сегодняшний день я попробовал два метода: один с использованием встроенного механизма DataFrame.groupBy
;другой с использованием pyspark.sql.window.Window
* orderBy
и partitionBy
методов.
Обычно разработанный мной конвейер выглядит так:
- Для каждого числового столбца, группа- по категориальным столбцам
cat_0
и date_window
и вычислите пять суммарных статистических данных, перечисленных ранее. - Для подхода
pyspark.sql.window.Window
присоедините вычисленный столбец напрямую, используя df.withColumn
.Для подхода DataFrame.groupBy
отслеживайте каждый результирующий DataFrame (каждый с тремя столбцами: два для столбцов группировки и один для вычисляемого столбца) - в конце присоедините каждый DataFrame, выполнив в основном операцию сокращения со столбцами группировки в качестве ключей..
Я оставил некоторые из конвейерного кода ниже, но в первую очередь меня интересуют мнения о том, 1) является ли какой-либо из них "наилучшей практикой" для этого типа работы, и 2) если нет,я упускаю что-то важное в экосистеме Spark, которое могло бы помочь мне сделать это намного быстрее / с меньшим количеством ресурсов?
В настоящее время подход groupBy
работает немного лучше, но довольно громоздко с наличиемотследить каждый сгруппированный DataFrame и объединить их в конце.Подход Window
не очень хорош, хотя синтаксически он немного чище и удобнее в обслуживании, IMO.В любом случае мне приходится выделять огромное количество вычислений для выполнения задания и его записи на диск в конце (без перераспределения / объединения).
gb_cols = ['cat_0', 'date_window']
strategies = {'sum', 'mean', 'stddev', 'max', 'min'}
Xcols = [col for col in df.columns if col.startswith('num')]
for col in Xcols[:]:
for s in strategies:
new_col = '{}_{}'.format(col, s)
Xcols.append(new_col)
if s == 'mean':
calc_col_series = func.mean(col)
elif s == 'stddev':
calc_col_series = func.stddev(col)
elif s == 'max':
calc_col_series = func.max(col)
elif s == 'min':
calc_col_series = func.min(col)
elif s == 'sum':
calc_col_series = func.sum(col)
elif s == 'median':
query = '''
SELECT
PERCENTILE_APPROX({}, 0.5)
FROM df
GROUP BY {}
'''.format(col, ','.join(gb_cols))
calc_col_series = spark.sql(query)
df = df.withColumn(new_col, calc_col_series.over(agg_window))
# Differencing inputs
for difference in range(1, 3 + 1):
# Last period's datapoints... moved to the future
led_series = func.lag(df[new_col], difference).over(agg_window.orderBy(window_cols['orderBy']))
diff_series = df[new_col] - led_series
new_col_diff = '{}_{}_diff'.format(new_col, difference)
df = df.withColumn(new_col_diff, diff_series)
Xcols.append(new_col_diff)