Условное агрегирование в scala на основе типа данных - PullRequest
1 голос
/ 12 марта 2019

Как вы агрегируете динамически в Scala Spark на основе типов данных?

Например:

SELECT ID, SUM(when  DOUBLE type)
, APPEND(when STRING), MAX(when BOOLEAN) 
from tbl  
GROUP BY ID

Пример данных

1 Ответ

1 голос
/ 12 марта 2019

Вы можете сделать это, получив соответствие схемы времени выполнения для типа данных, например:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val df =Seq(
  (1, 1.0, true, "a"),
  (1, 2.0, false, "b")
).toDF("id","d","b","s")

val dataTypes: Map[String, DataType] = df.schema.map(sf => (sf.name,sf.dataType)).toMap

def genericAgg(c:String) = {
  dataTypes(c) match {
    case DoubleType => sum(col(c))
    case StringType => concat_ws(",",collect_list(col(c))) // "append"
    case BooleanType => max(col(c))
  }
}

val aggExprs: Seq[Column] = df.columns.filterNot(_=="id") // use all
    .map(c => genericAgg(c))

df
  .groupBy("id")
  .agg(
    aggExprs.head,aggExprs.tail:_*
  )
  .show()

дает

+---+------+------+-----------------------------+
| id|sum(d)|max(b)|concat_ws(,, collect_list(s))|
+---+------+------+-----------------------------+
|  1|   3.0|  true|                          a,b|
+---+------+------+-----------------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...