просто по-другому - groupBy, собирать как установлено, а затем, если размер набора равен 1, он либо активен, либо неактивен, либо оба
scala> val df = Seq(("id1", "ACTIVE"), ("id1", "INACTIVE"), ("id1", "INACTIVE"), ("id2", "ACTIVE"), ("id3", "INACTIVE"), ("id3", "INACTIVE"), ("id4", "ACTIVE"), ("id5", "ACTIVE"), ("id6", "INACTIVE"), ("id7", "ACTIVE"), ("id7", "INACTIVE")).toDF("id", "status")
df: org.apache.spark.sql.DataFrame = [id: string, status: string]
scala> df.show(false)
+---+--------+
|id |status |
+---+--------+
|id1|ACTIVE |
|id1|INACTIVE|
|id1|INACTIVE|
|id2|ACTIVE |
|id3|INACTIVE|
|id3|INACTIVE|
|id4|ACTIVE |
|id5|ACTIVE |
|id6|INACTIVE|
|id7|ACTIVE |
|id7|INACTIVE|
+---+--------+
scala> val allstatusDF = df.groupBy("id").agg(collect_set("status") as "allstatus")
allstatusDF: org.apache.spark.sql.DataFrame = [id: string, allstatus: array<string>]
scala> allstatusDF.show(false)
+---+------------------+
|id |allstatus |
+---+------------------+
|id7|[ACTIVE, INACTIVE]|
|id3|[INACTIVE] |
|id5|[ACTIVE] |
|id6|[INACTIVE] |
|id1|[ACTIVE, INACTIVE]|
|id2|[ACTIVE] |
|id4|[ACTIVE] |
+---+------------------+
scala> allstatusDF.withColumn("status", when(size($"allstatus") === 1, $"allstatus".getItem(0)).otherwise("BOTH")).show(false)
+---+------------------+--------+
|id |allstatus |status |
+---+------------------+--------+
|id7|[ACTIVE, INACTIVE]|BOTH |
|id3|[INACTIVE] |INACTIVE|
|id5|[ACTIVE] |ACTIVE |
|id6|[INACTIVE] |INACTIVE|
|id1|[ACTIVE, INACTIVE]|BOTH |
|id2|[ACTIVE] |ACTIVE |
|id4|[ACTIVE] |ACTIVE |
+---+------------------+--------+