Использование функций фильтра Spark 2.4 ().Поскольку 444 следует исключить, я упомянул это в SQL.Надеюсь, шаги объяснительные
scala> val person = Seq(
| ("P1", "111", "123@gmail.com"),
| ("P2", "222", "123@gmail.com"),
| ("P3", "111", "123@gmail.com"),
| ("P4", "-1", "123@gmail.com"),
| ("P5", "444", "999@gmail.com")).toDF("person", "work_order_person", "email_person")
person: org.apache.spark.sql.DataFrame = [person: string, work_order_person: string ... 1 more field]
scala> val workOrder = Seq(
| ("111", "123@gmail.com"),
| ("222", "123@gmail.com"),
| ("444", "999@gmail.com")).toDF("work_order", "email")
workOrder: org.apache.spark.sql.DataFrame = [work_order: string, email: string]
scala> val person_grp = person.groupBy().agg( collect_list('work_order_person) as "wo_group" )
person_grp: org.apache.spark.sql.DataFrame = [wo_group: array<string>]
scala> person.crossJoin(person_grp).show(false)
+------+-----------------+-------------+------------------------+
|person|work_order_person|email_person |wo_group |
+------+-----------------+-------------+------------------------+
|P1 |111 |123@gmail.com|[111, 222, 111, -1, 444]|
|P2 |222 |123@gmail.com|[111, 222, 111, -1, 444]|
|P3 |111 |123@gmail.com|[111, 222, 111, -1, 444]|
|P4 |-1 |123@gmail.com|[111, 222, 111, -1, 444]|
|P5 |444 |999@gmail.com|[111, 222, 111, -1, 444]|
+------+-----------------+-------------+------------------------+
scala> val df = person.crossJoin(person_grp)
df: org.apache.spark.sql.DataFrame = [person: string, work_order_person: string ... 2 more fields]
scala> df.createOrReplaceTempView("ansip")
scala> spark.sql(" select person, work_order_person, filter(wo_group, x -> x!=work_order_person and x!=444) res1 from ansip ").show(false)
+------+-----------------+-------------------+
|person|work_order_person|res1 |
+------+-----------------+-------------------+
|P1 |111 |[222, -1] |
|P2 |222 |[111, 111, -1] |
|P3 |111 |[222, -1] |
|P4 |-1 |[111, 222, 111] |
|P5 |444 |[111, 222, 111, -1]|
+------+-----------------+-------------------+
scala> workOrder.createOrReplaceTempView("wo_tab")
scala> val df2 = spark.sql(" with t1 (select person, work_order_person, filter(wo_group, x -> x!=work_order_person and x!=444) res1 from ansip) select work_order_person
,res1 from t1 where work_order_person!=444 group by work_order_person, res1 ")
df2: org.apache.spark.sql.DataFrame = [work_order_person: string, res1: array<string>]
scala> df2.show(false)
+-----------------+---------------+
|work_order_person|res1 |
+-----------------+---------------+
|111 |[222, -1] |
|222 |[111, 111, -1] |
|-1 |[111, 222, 111]|
+-----------------+---------------+
scala> df2.createOrReplaceTempView("ansib2")
scala> spark.sql(" select work_order, email, case when size(res1)>0 then size(res1) else 0 end res2 from wo_tab left join ansib2 on work_order=work_order_person ").show
(false)
+----------+-------------+----+
|work_order|email |res2|
+----------+-------------+----+
|111 |123@gmail.com|2 |
|222 |123@gmail.com|3 |
|444 |999@gmail.com|0 |
+----------+-------------+----+
scala>