Вы можете сгруппировать DataFrame по ID
для агрегирования Category
, используя collect_set
, чтобы собрать массивы категорий, и создать новый столбец на основе содержимого в массивах категорий, используя array_contains
:
import org.apache.spark.sql.functions._
val df = Seq(
(31898, "Transfer"),
(31898, "e-Transfer"),
(32614, "Transfer"),
(32614, "e-Transfer"),
(32614, "IMT"),
(33987, "Transfer"),
(34193, "e-Transfer")
).toDF("ID", "Category")
df.groupBy("ID").agg(collect_set("Category").as("CategorySet")).
withColumn( "Category",
when(array_contains($"CategorySet", "e-Transfer") && array_contains($"CategorySet", "IMT"),
"e-Transfer + IMT").otherwise(
when(array_contains($"CategorySet", "e-Transfer") && array_contains($"CategorySet", "Transfer"),
"e-Transfer").otherwise(
when($"CategorySet" === Array("e-Transfer") || $"CategorySet" === Array("MIT"),
$"CategorySet"(0)).otherwise(
when($"CategorySet" === Array("Transfer"), "Other")
)))
).
show(false)
// +-----+---------------------------+----------------+
// |ID |CategorySet |Category |
// +-----+---------------------------+----------------+
// |33987|[Transfer] |Other |
// |32614|[Transfer, e-Transfer, IMT]|e-Transfer + IMT|
// |34193|[e-Transfer] |e-Transfer |
// |31898|[Transfer, e-Transfer] |e-Transfer |
// +-----+---------------------------+----------------+
Возможно, ваши данные выборки не охватывали все случаи (например, [Transfer, MIT]
). Существующий пример кода будет генерировать значение категории null
для любых оставшихся случаев. Просто измените / разверните условную проверку, если выявлены дополнительные случаи.