Spark датафрейм Scala сложно - PullRequest
       14

Spark датафрейм Scala сложно

0 голосов
/ 07 февраля 2019

Фрейм данных 1

Person  Work_order  email  
P1  111 123@gmail.com  
P2  222 123@gmail.com   
P3  111 123@gmail.com   
P4  -1  123@gmail.com   
P5  444 999@gmail.com   

val person = Seq(
 ("P1", "111", "123@gmail.com"),
("P2", "222", "123@gmail.com"),
("P3", "111", "123@gmail.com"),
("P4", "-1", "123@gmail.com"),
("P5", "444", "999@gmail.com")).toDF("person", "work_order_person", "email_person")

Фрейм данных 2

Work_order  email   
111 123@gmail.com   
222 123@gmail.com    
444 999@gmail.com 

val workOrder = Seq(
("111", "123@gmail.com"),
("222", "123@gmail.com"),
("444", "999@gmail.com")).toDF("work_order", "email")

Выход

Work_order email Count_excluding_the_self_work_order_id
111 123@gmail.com 2
222 123@gmail.com 3
444 999@gmail.com 0

Я хотел бы создать вывод, аналогичный описанному выше.Например, для первой строки: Количество должно исключать идентификатор заказа на самостоятельную работу, который присутствует в дампе [111].Также нам НЕ нужно рассчитывать идентификатор заказа на работу 444, так как он имеет другой адрес электронной почты.Заранее признателен за помощь.

1 Ответ

0 голосов
/ 07 февраля 2019

Использование функций фильтра Spark 2.4 ().Поскольку 444 следует исключить, я упомянул это в SQL.Надеюсь, шаги объяснительные

scala> val person = Seq(
     |  ("P1", "111", "123@gmail.com"),
     | ("P2", "222", "123@gmail.com"),
     | ("P3", "111", "123@gmail.com"),
     | ("P4", "-1", "123@gmail.com"),
     | ("P5", "444", "999@gmail.com")).toDF("person", "work_order_person", "email_person")
person: org.apache.spark.sql.DataFrame = [person: string, work_order_person: string ... 1 more field]

scala> val workOrder = Seq(
     | ("111", "123@gmail.com"),
     | ("222", "123@gmail.com"),
     | ("444", "999@gmail.com")).toDF("work_order", "email")
workOrder: org.apache.spark.sql.DataFrame = [work_order: string, email: string]

scala> val person_grp = person.groupBy().agg( collect_list('work_order_person) as "wo_group" )
person_grp: org.apache.spark.sql.DataFrame = [wo_group: array<string>]

scala> person.crossJoin(person_grp).show(false)
+------+-----------------+-------------+------------------------+
|person|work_order_person|email_person |wo_group                |
+------+-----------------+-------------+------------------------+
|P1    |111              |123@gmail.com|[111, 222, 111, -1, 444]|
|P2    |222              |123@gmail.com|[111, 222, 111, -1, 444]|
|P3    |111              |123@gmail.com|[111, 222, 111, -1, 444]|
|P4    |-1               |123@gmail.com|[111, 222, 111, -1, 444]|
|P5    |444              |999@gmail.com|[111, 222, 111, -1, 444]|
+------+-----------------+-------------+------------------------+


scala> val df = person.crossJoin(person_grp)
df: org.apache.spark.sql.DataFrame = [person: string, work_order_person: string ... 2 more fields]

scala> df.createOrReplaceTempView("ansip")

scala> spark.sql(" select person, work_order_person, filter(wo_group, x -> x!=work_order_person and x!=444) res1 from ansip ").show(false)
+------+-----------------+-------------------+
|person|work_order_person|res1               |
+------+-----------------+-------------------+
|P1    |111              |[222, -1]          |
|P2    |222              |[111, 111, -1]     |
|P3    |111              |[222, -1]          |
|P4    |-1               |[111, 222, 111]    |
|P5    |444              |[111, 222, 111, -1]|
+------+-----------------+-------------------+


scala> workOrder.createOrReplaceTempView("wo_tab")

scala> val df2 = spark.sql(" with t1 (select person, work_order_person, filter(wo_group, x -> x!=work_order_person and x!=444) res1 from ansip) select work_order_person
,res1 from t1 where work_order_person!=444 group by work_order_person, res1  ")
df2: org.apache.spark.sql.DataFrame = [work_order_person: string, res1: array<string>]

scala> df2.show(false)
+-----------------+---------------+
|work_order_person|res1           |
+-----------------+---------------+
|111              |[222, -1]      |
|222              |[111, 111, -1] |
|-1               |[111, 222, 111]|
+-----------------+---------------+

scala> df2.createOrReplaceTempView("ansib2")

scala> spark.sql(" select work_order, email, case when size(res1)>0 then size(res1) else 0 end res2 from wo_tab left join ansib2 on work_order=work_order_person ").show
(false)
+----------+-------------+----+
|work_order|email        |res2|
+----------+-------------+----+
|111       |123@gmail.com|2   |
|222       |123@gmail.com|3   |
|444       |999@gmail.com|0   |
+----------+-------------+----+


scala>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...