Все можно выполнить одним групповым вызовом, однако я бы посоветовал для (незначительного) выигрыша в производительности и для удобства чтения кода разделиться на 2 вызова:
import org.apache.spark.sql.functions.{col, size, collect_set, max, min, when, lit}
val res1DF = df.groupBy(col("ID_VISITE_CALCULE")).agg(
min(col("START")).alias("START"),
max(col("END")).alias("END"),
collect_set(col("EXTERNAL_PERSON_ID")).alias("EXTERNAL_PERSON_ID"),
collect_set(col("EXTERNAL_ORGANIZATION_ID")).alias("EXTERNAL_ORGANIZATION_ID")
)
val res2DF = res1DF.withColumn("EXTERNAL_PERSON_ID",
when(
size(col("EXTERNAL_PERSON_ID")) > 1,
lit("UNDEFINED")).otherwise(col("EXTERNAL_PERSON_ID").getItem(0)
)
).withColumn("EXTERNAL_ORGANIZATION_ID",
when(
size(col("EXTERNAL_ORGANIZATION_ID")) > 1,
lit("UNDEFINED")).otherwise(col("EXTERNAL_ORGANIZATION_ID").getItem(0)
)
)
Метод getItem
выполняет большинство условий в фоновом режиме. Если набор значений пуст, он вернет null
, а если имеется только одно значение, он вернет значение.