Проверьте это:
scala> val df = Seq(("a",75,"age","<",18,"Minor"),("b",10,"age","<",18,"Minor"),("c",30,"age","<",18,"Minor"),("a",75,"age",">=",18,"Major"),("b",10,"age",">=",18,"Major"),("c",30,"age",">=",18,"Major"),("a",75,"age",">",60,"Senior Citizen"),("b",10,"age",">",60,"Senior Citizen"),("c",30,"age",">",60,"Senior Citizen")).toDF("name","age","field","optr","value","rule")
df: org.apache.spark.sql.DataFrame = [name: string, age: int ... 4 more fields]
scala> df.show(false)
+----+---+-----+----+-----+--------------+
|name|age|field|optr|value|rule |
+----+---+-----+----+-----+--------------+
|a |75 |age |< |18 |Minor |
|b |10 |age |< |18 |Minor |
|c |30 |age |< |18 |Minor |
|a |75 |age |>= |18 |Major |
|b |10 |age |>= |18 |Major |
|c |30 |age |>= |18 |Major |
|a |75 |age |> |60 |Senior Citizen|
|b |10 |age |> |60 |Senior Citizen|
|c |30 |age |> |60 |Senior Citizen|
+----+---+-----+----+-----+--------------+
scala> val df2 = df.withColumn("condn", concat('field,'optr,'value))
df2: org.apache.spark.sql.DataFrame = [name: string, age: int ... 5 more fields]
scala> val condn_list=df2.groupBy().agg(collect_set('condn).as("condns")).as[(Seq[String])].first
condn_list: Seq[String] = List(age>60, age<18, age>=18)
scala> val df_filters = condn_list.map{ x => df2.filter(s""" condn='${x}' and $x """) }
df_filters: Seq[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = List([name: string, age: int ... 5 more fields], [name: string, age: int ... 5 more fields], [name: string, age: int ... 5 more fields])
scala> df_filters(0).union(df_filters(1)).union(df_filters(2)).show(false)
+----+---+-----+----+-----+--------------+-------+
|name|age|field|optr|value|rule |condn |
+----+---+-----+----+-----+--------------+-------+
|b |10 |age |< |18 |Minor |age<18 |
|a |75 |age |> |60 |Senior Citizen|age>60 |
|a |75 |age |>= |18 |Major |age>=18|
|c |30 |age |>= |18 |Major |age>=18|
+----+---+-----+----+-----+--------------+-------+
scala>
Чтобы получить союзы, вы можете сделать что-то вроде
scala> var res = df_filters(0)
res: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, age: int ... 5 more fields]
scala> (1 until df_filters.length).map( x => { res = res.union(df_filters(x)) } )
res20: scala.collection.immutable.IndexedSeq[Unit] = Vector((), ())
scala> res.show(false)
+----+---+-----+----+-----+--------------+-------+
|name|age|field|optr|value|rule |condn |
+----+---+-----+----+-----+--------------+-------+
|b |10 |age |< |18 |Minor |age<18 |
|a |75 |age |> |60 |Senior Citizen|age>60 |
|a |75 |age |>= |18 |Major |age>=18|
|c |30 |age |>= |18 |Major |age>=18|
+----+---+-----+----+-----+--------------+-------+
scala>