Задача
Мне нужно обновить эту строку в моем коде. Как я могу это сделать?
"case StringType => concat_ws(",",collect_list(col(c)))"
Добавлять только те строки, которых еще нет в существующем поле. В этом примере буква «b» не будет появляться дважды.
код
val df =Seq(
(1, 1.0, true, "a"),
(2, 2.0, false, "b")
(3, 2.0, false, "b")
(3, 2.0, false, "c")
).toDF("id","d","b","s")
val dataTypes: Map[String, DataType] = df.schema.map(sf =>
(sf.name,sf.dataType)).toMap
def genericAgg(c:String) = {
dataTypes(c) match {
case DoubleType => sum(col(c))
case StringType => concat_ws(",",collect_list(col(c)))
case BooleanType => max(col(c))
}
}
val aggExprs: Seq[Column] = df.columns.filterNot(_=="id")
.map(c => genericAgg(c))
df
.groupBy("id")
.agg(
aggExprs.head,aggExprs.tail:_*
)
.show()