scala spark: собрать список в диапазоне в одну строку - PullRequest
1 голос
/ 30 марта 2020

Предположим, у меня есть кадр данных, как показано ниже

+----+----------+----+----------------+
|colA|      colB|colC|            colD|
+----+----------+----+----------------+
|   1|2020-03-24|  21|[0.0, 2.49, 3.1]|
|   1|2020-03-17|  20|[1.0, 2.49, 3.1]|
|   1|2020-03-10|  19|[2.0, 2.49, 3.1]|
|   2|2020-03-24|  21|[0.0, 2.49, 3.1]|
|   2|2020-03-17|  20|[1.0, 2.49, 3.1]|
+----+----------+----+----------------+

Я хочу собрать colD в отдельную строку, которая также собирает только список в пределах диапазона.

Output
+----+----------+----+----------------+------------------------------------------------------+
|colA|colB      |colC|colD            |colE                                                  |
+----+----------+----+----------------+------------------------------------------------------+
|1   |2020-03-24|21  |[0.0, 2.49, 3.1]|[[0.0, 2.49, 3.1], [1.0, 2.49, 3.1]]                  |
|1   |2020-03-17|20  |[1.0, 2.49, 3.1]|[[1.0, 2.49, 3.1], [2.0, 2.49, 3.1]]
|1   |2020-03-10|19  |[2.0, 2.49, 3.1]|[[2.0, 2.49, 3.1]]                  |
|2   |2020-03-24|21  |[0.0, 2.49, 3.1]|[[0.0, 2.49, 3.1], [1.0, 2.49, 3.1]]                  |
|2   |2020-03-17|20  |[1.0, 2.49, 3.1]|[[1.0, 2.49, 3.1]]                  |
+----+----------+----+----------------+------------------------------------------------------+

Я попробовал следующее, но выдает ошибку:

cannot resolve 'RANGE BETWEEN CAST((`colC` - 2) AS STRING) FOLLOWING AND CAST(`colC` AS STRING) FOLLOWING' due to data type mismatch: Window frame lower bound 'cast((colC#575 - 2) as string)' is not a literal.;;
val data = Seq(("1", "2020-03-24", 21,  List(0.0, 2.49,3.1)), ("1", "2020-03-17", 20,  List(1.0, 2.49,3.1)), ("1", "2020-03-10", 19,  List(2.0, 2.49,3.1)), ("2", "2020-03-24", 21,  List(0.0, 2.49,3.1)), 
                ("2", "2020-03-17", 20,  List(1.0, 2.49,3.1))
            )
val rdd = spark.sparkContext.parallelize(data)
val df = rdd.toDF("colA","colB", "colC", "colD")
df.show()

val df1 =   df
            .withColumn("colE", collect_list("colD").over(Window.partitionBy("colA")
                                  .orderBy("colB").rangeBetween($"colC" - lit(2), $"colC")))
            .show(false)
...