Предположим, у меня есть кадр данных, как показано ниже
+----+----------+----+----------------+
|colA| colB|colC| colD|
+----+----------+----+----------------+
| 1|2020-03-24| 21|[0.0, 2.49, 3.1]|
| 1|2020-03-17| 20|[1.0, 2.49, 3.1]|
| 1|2020-03-10| 19|[2.0, 2.49, 3.1]|
| 2|2020-03-24| 21|[0.0, 2.49, 3.1]|
| 2|2020-03-17| 20|[1.0, 2.49, 3.1]|
+----+----------+----+----------------+
Я хочу собрать colD в отдельную строку, которая также собирает только список в пределах диапазона.
Output
+----+----------+----+----------------+------------------------------------------------------+
|colA|colB |colC|colD |colE |
+----+----------+----+----------------+------------------------------------------------------+
|1 |2020-03-24|21 |[0.0, 2.49, 3.1]|[[0.0, 2.49, 3.1], [1.0, 2.49, 3.1]] |
|1 |2020-03-17|20 |[1.0, 2.49, 3.1]|[[1.0, 2.49, 3.1], [2.0, 2.49, 3.1]]
|1 |2020-03-10|19 |[2.0, 2.49, 3.1]|[[2.0, 2.49, 3.1]] |
|2 |2020-03-24|21 |[0.0, 2.49, 3.1]|[[0.0, 2.49, 3.1], [1.0, 2.49, 3.1]] |
|2 |2020-03-17|20 |[1.0, 2.49, 3.1]|[[1.0, 2.49, 3.1]] |
+----+----------+----+----------------+------------------------------------------------------+
Я попробовал следующее, но выдает ошибку:
cannot resolve 'RANGE BETWEEN CAST((`colC` - 2) AS STRING) FOLLOWING AND CAST(`colC` AS STRING) FOLLOWING' due to data type mismatch: Window frame lower bound 'cast((colC#575 - 2) as string)' is not a literal.;;
val data = Seq(("1", "2020-03-24", 21, List(0.0, 2.49,3.1)), ("1", "2020-03-17", 20, List(1.0, 2.49,3.1)), ("1", "2020-03-10", 19, List(2.0, 2.49,3.1)), ("2", "2020-03-24", 21, List(0.0, 2.49,3.1)),
("2", "2020-03-17", 20, List(1.0, 2.49,3.1))
)
val rdd = spark.sparkContext.parallelize(data)
val df = rdd.toDF("colA","colB", "colC", "colD")
df.show()
val df1 = df
.withColumn("colE", collect_list("colD").over(Window.partitionBy("colA")
.orderBy("colB").rangeBetween($"colC" - lit(2), $"colC")))
.show(false)