Не удается выполнить агрегацию по многим столбцам в Pyspark. Существуют сотни логических столбцов, показывающих текущее состояние системы, и строка добавляется каждую секунду. Цель состоит в том, чтобы преобразовать эти данные, чтобы показать количество изменений состояния для каждого окна продолжительностью 10 секунд.
Я планировал сделать это в два этапа: сначала XOR логическое значение со значением предыдущей строки, затем вторая сумма по 10 секундное окно. Вот примерный код, который я придумал:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql import types as T, functions as F
from datetime import datetime, timedelta
from random import random
import time
sc = pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]'))
spark = SparkSession(sc)
# create dataframe
num_of_cols = 50
df = spark.createDataFrame(
[(datetime.now() + timedelta(0, i), *[round(random()) for _ in range(num_of_cols)]) for i in range(10000)],
['Time', *[f"M{m+1}" for m in range(num_of_cols)]])
cols = set(df.columns) - set(['Time'])
# Generate changes
data_window = Window.partitionBy(F.minute('Time')).orderBy('Time')
# data_window = Window.orderBy('Time')
df = df.select('Time', *[F.col(m).bitwiseXOR(F.lag(m, 1).over(data_window)).alias(m) for m in cols])
df = df.groupBy(F.window('Time', '10 seconds')) \
.agg(*[F.sum(m).alias(m) for m in cols]) \
.withColumn('start_time', F.col('window')['start']) \
.drop('window')
df.orderBy('start_time').show(20, False)
# Keep UI open
time.sleep(60*60)
С data_window
, разделенным по минутам, Spark генерирует 52 этапа, каждый из которых зависит от последнего. Увеличение num_of_cols
также увеличивает количество этапов. Мне кажется, это должно быть смущающе распараллеливаемой проблемой. Сравните каждую строку с последней, а затем агрегируйте по 10 секунд. Удаление раздела data_window
позволяет выполнять его за один этап, но при этом все данные на одном разделе вызываются для его достижения.
Почему этапы зависят друг от друга, есть ли лучший способ записи это улучшить распараллеливание? Я думаю, что было бы возможно сделать несколько агрегаций по одному и тому же окну одновременно. В конечном счете, для этого потребуется масштабировать до сотен столбцов. Есть ли какие-то хитрости для повышения производительности в этот момент?