Итерации для столбца массива с динамическим размером c в Spark Scala Dataframe - PullRequest
0 голосов
/ 21 апреля 2020

Мне знаком этот подход - пример из примера Как получить среднее значение для столбца типа массива в scala -проблески по всем записям строк на запись?

val array_size = 3
val avgAgg = for (i <- 0 to array_size -1) yield avg($"value".getItem(i))
df.select(array(avgAgg: _*).alias("avg_value")).show(false)

Однако в реальности жестко закодировано 3.

Как бы я ни старался не использовать UDF, я не могу делать такие вещи динамически в зависимости от размера столбца массива. уже присутствует в кадре данных. Например:

...
val z =  for (i <- 1 to size($"sortedCol")   ) yield array (element_at($"sortedCol._2", i), element_at($"sortedCol._3", i) )
...
...
.withColumn("Z", array(z: _*)  )

Я смотрю, как это можно сделать, применяя к существующему массиву col переменной длины. преобразовать, expr? Не уверен.

Полный код по запросу:

import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

case class abc(year: Int, month: Int, item: String, quantity: Int)

val df0 = Seq(abc(2019, 1, "TV", 8), 
              abc(2019, 7, "AC", 10),  
              abc(2018, 1, "TV", 2),  
              abc(2018, 2, "AC", 3), 
              abc(2019, 2, "CO", 10)).toDS()

val df1 = df0.toDF()
// Gen some data, can be done easier, but not the point.

val itemsList= collect_list(struct("month", "item", "quantity"))

// This nn works.
val nn = 3
val z =  for (i <- 1 to nn) yield array (element_at($"sortedCol.item", i), element_at($"sortedCol.quantity", i) )
// But want this.
//val z =  for (i <- 1 to size($"sortedCol")   ) yield array (element_at($"sortedCol.item", i), element_at($"sortedCol.quantity", i) )


val df2 = df1.groupBy($"year")
   .agg(itemsList as "items")
   .withColumn("sortedCol", sort_array($"items", asc = true))  
   .withColumn("S", size($"sortedCol")) // cannot use this either
   .withColumn("Z", array(z: _*)  )
   .drop("items")
   .orderBy($"year".desc)
df2.show(false)
// Col Z is the output I want, but not the null value Array 

UPD

В apache искра SQL, как удалить дубликаты строк при использовании collect_list в оконной функции? там я решаю с очень простым UDF, но я искал способ без UDF и, в частности, динамический c параметр to value в for loop. Ответ доказывает, что определенные конструкции невозможны, что является проверкой.

1 Ответ

0 голосов
/ 21 апреля 2020

Если я правильно понимаю ваши потребности, вы можете просто использовать функцию transform следующим образом:

val df2 = df1.groupBy($"year")
             .agg(itemsList as "items")
             .withColumn("sortedCol", sort_array($"items", asc = true))


val transform_expr = "transform(sortedCol, x -> array(x.item, x.quantity))"

df2.withColumn("Z", expr(transform_expr)).show(false)

//+----+--------------------------------------+--------------------------------------+-----------------------------+
//|year|items                                 |sortedCol                             |Z                            |
//+----+--------------------------------------+--------------------------------------+-----------------------------+
//|2018|[[1, TV, 2], [2, AC, 3]]              |[[1, TV, 2], [2, AC, 3]]              |[[TV, 2], [AC, 3]]           |
//|2019|[[1, TV, 8], [7, AC, 10], [2, CO, 10]]|[[1, TV, 8], [2, CO, 10], [7, AC, 10]]|[[TV, 8], [CO, 10], [AC, 10]]|
//+----+--------------------------------------+--------------------------------------+-----------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...