Я считаю, что оконный подход должен быть лучшим решением, но перед использованием оконных функций вы должны заново разделить информационный фрейм на основе идентификатора. Это будет перетасовывать данные только один раз, и все оконные функции должны выполняться с уже перетасованным кадром данных. Надеюсь, это поможет.
Код должен быть примерно таким.
val windowedDF = df.repartition(col("id"))
.select (
col("id"),
col("date"),
col("KPI_1"),
col("KPI_2"),
collect_list(struct(col("date"),col("KPI_1"))).over(window),
collect_list(struct(col("date"),col("KPI_2"))).over(window)
)
@ Рафаэль Рот
Здесь мы агрегируем по одному окну. Вот почему вы можете увидеть тот же план выполнения. Пожалуйста, посмотрите пример ниже, где агрегация по нескольким окнам может быть выполнена только из одного раздела.
val list = Seq(( "2", null, 1, 11, 1, 1 ),
( "2", null, 1, 22, 2, 2 ),
( "2", null, 1, 11, 1, 3 ),
( "2", null, 1, 22, 2, 1 ),
( "2", null, 1, 33, 1, 2 ),
( null, "3", 3, 33, 1, 2 ),
( null, "3", 3, 33, 2, 3 ),
( null, "3", 3, 11, 1, 1 ),
( null, "3", 3, 22, 2, 2 ),
( null, "3", 3, 11, 1, 3 )
)
val df = spark.sparkContext.parallelize(list).toDF("c1","c2","batchDate","id", "pv" , "vv")
val c1Window = Window.partitionBy("batchDate", "c1")
val c2Window = Window.partitionBy("batchDate", "c2")
val agg1df = df.withColumn("c1List",collect_list("pv").over(c1Window))
.withColumn("c2List", collect_list("pv").over(c2Window))
val agg2df = df.repartition($"batchDate")
.withColumn("c1List",collect_list("pv").over(c1Window))
.withColumn("c2List", collect_list("pv").over(c2Window))
agg1df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#38], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(batchDate#16, c2#15, 1)
+- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#28], [batchDate#16, c1#14]
+- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(batchDate#16, c1#14, 1)
+- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
+- Scan ExternalRDDScan[obj#6]
agg2df.explain()
== Physical Plan ==
Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c2#15, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c2List#60], [batchDate#16, c2#15]
+- *Sort [batchDate#16 ASC NULLS FIRST, c2#15 ASC NULLS FIRST], false, 0
+- Window [collect_list(pv#18, 0, 0) windowspecdefinition(batchDate#16, c1#14, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS c1List#50], [batchDate#16, c1#14]
+- *Sort [batchDate#16 ASC NULLS FIRST, c1#14 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(batchDate#16, 1)
+- *Project [_1#7 AS c1#14, _2#8 AS c2#15, _3#9 AS batchDate#16, _4#10 AS id#17, _5#11 AS pv#18, _6#12 AS vv#19]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._1, true) AS _1#7, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple6, true])._2, true) AS _2#8, assertnotnull(input[0, scala.Tuple6, true])._3 AS _3#9, assertnotnull(input[0, scala.Tuple6, true])._4 AS _4#10, assertnotnull(input[0, scala.Tuple6, true])._5 AS _5#11, assertnotnull(input[0, scala.Tuple6, true])._6 AS _6#12]
+- Scan ExternalRDDScan[obj#6]