Я сгенерировал ваш фрейм данных так:
val df = Seq(("p1", "Parent", Seq("c1", "c2", "c3", "c4"), "Active"),
("c1", "Child", Seq(), "Inactive"),
("c2", "Child", Seq(), "Active"),
("c3", "Child", Seq(), "Active"))
.toDF("Col1", "Col2", "col3", "flag")
Затем я фильтрую только активных потомков в одном кадре данных, который является частью вашего вывода:
val active_children = df.where('flag === "Active").where('Col2 === "Child")
Я также генерирую сплющенный фрейм данных отношений родитель / потомок с explode
:
val rels = df.withColumn("child", explode('col3))
.select("Col1", "Col2", "flag", "child")
scala> rels.show
+----+------+------+-----+
|Col1| Col2| flag|child|
+----+------+------+-----+
| p1|Parent|Active| c1|
| p1|Parent|Active| c2|
| p1|Parent|Active| c3|
| p1|Parent|Active| c4|
+----+------+------+-----+
и фрейм данных с одним столбцом, соответствующим активным дочерним элементам, например:
val child_filter = active_children.select('Col1 as "child")
и используйте этот child_filter
фрейм данных для фильтрации (с объединением) интересующих вас родителей и использования groupBy для агрегирования строк обратно в ваш выходной формат:
val parents = rels
.join(child_filter, "child")
.groupBy("Col1")
.agg(first('Col2) as "Col2",
collect_list('child) as "col3",
first('flag) as "flag")
scala> parents.show
+----+------+--------+------+
|Col1| Col2| col3| flag|
+----+------+--------+------+
| p1|Parent|[c2, c3]|Active|
+----+------+--------+------+
Наконец, объединение дает ожидаемый результат:
scala> parents.union(active_children).show
+----+------+--------+------+
|Col1| Col2| col3| flag|
+----+------+--------+------+
| p1|Parent|[c2, c3]|Active|
| c2| Child| []|Active|
| c3| Child| []|Active|
+----+------+--------+------+