Прежде всего, я довольно новичок в спарке, поэтому извиняюсь, если мне не хватает очевидного!
Я разрабатываю POC, используя Spark, который использует поток данных из Apache Kafka. Моей первой целью были общие скользящие средние, которые были просты с помощью функции «окна» в Spark и вычисляли некоторые средние на основе некоторых ключей.
Моя следующая цель - вычислить «дельту» с момента последнего окна. Таким образом, если у меня есть вызов параметра «noise», функция «window» вычисляет avg (noise). Но я также хочу включить дельту avg (шума) между текущим окном и предыдущим окном.
Я пытался использовать функцию lag
, однако она не выглядит так, как предполагается:
Non-time-based windows are not supported on streaming DataFrames/Datasets
Мой вопрос заключается в том, предоставляет ли Spark Structure Streaming какой-то способ рассчитать это из коробки? Я обдумывал использование MapGroupsWithStateFunction
, которое, я думаю, могло бы работать, но если есть встроенный подход, который явно предпочтительнее.
Мой код для этого:
WindowSpec w = Window.partitionBy(functions.col("window.start"), functions.col("keyName")).orderBy(functions.col("window.start"));
Dataset<Row> outputDS = inputDataset.withWatermark("timeStamp", "1 days")
.groupBy(functions.col("keyName"), functions.window(functions.col("timeStamp"), "1 hours", "1 hours"))
.avg("noise").withColumn("delta", functions.lag("avg(noise)", 1).over(w));