Scala Spark - преобразование нескольких групповых уровней с использованием массивов в качестве входных данных - PullRequest
0 голосов
/ 16 января 2019

В моей программе Scala я сталкиваюсь с проблемой объединения результатов нескольких уровней GroupBy. Набор данных, который я использую, довольно большой. В качестве небольшого примера у меня есть кадр данных, который выглядит следующим образом:

+---+---+----+-----+-----+
|  F|  L| Loy|Email|State|
+---+---+----+-----+-----+
| f1| l1|loy1| null|   s1|
| f1| l1|loy1|   e1|   s1|
| f2| l2|loy2|   e2|   s2|
| f2| l2|loy2|   e3| null|
| f1| l1|null|   e1|   s3|
+---+---+----+-----+-----+

Для первого уровня groupBy Я использую следующий скрипт, чтобы получить результат, основанный на тех же столбцах (F, L, Loy) :

df.groupBy("F", "L", "Loy").agg(collect_set($"Email").alias("Email"), collect_set($"State").alias("State")).show

Результат примерно такой:

+---+---+----+--------+-----+
|  F|  L| Loy|   Email|State|
+---+---+----+--------+-----+
| f1| l1|null|    [e1]| [s3]|
| f2| l2|loy2|[e2, e3]| [s2]|
| f1| l1|loy1|    [e1]| [s1]|
+---+---+----+--------+-----+

Проблема, с которой я сталкиваюсь, заключается в том, как я могу выполнить второй уровень groupBy , который основан на условии (F, L, Email) и принимает в качестве введите F и L как строку, а столбец Email как массив [String]. Этот groupBy должен возвращать результат следующим образом:

+---+---+----+--------+---------+
|  F|  L| Loy|   Email|    State|
+---+---+----+--------+---------+
| f1| l1|loy1|    [e1]| [s3, s1]|
| f2| l2|loy2|[e2, e3]|     [s2]|
+---+---+----+--------+---------+

Основная цель - максимально сократить количество записей, применяя groupBy на разных уровнях. Я довольно новичок в Scala, и любая помощь будет оценена :)

1 Ответ

0 голосов
/ 17 января 2019

Просто используйте concat_ws () с нулевым разделителем, который удалит массив состояний для простых элементов, а затем collect_set вернет вам массив в состояния. Проверьте это.

scala> val df = Seq( ("f1","l1","loy1",null,"s1"),("f1","l1","loy1","e1","s1"),("f2","l2","loy2","e2","s2"),("f2","l2","loy2","e3",null),("f1","l1",null,"e1","s3")).toDF("F","L","loy","email","state")
df: org.apache.spark.sql.DataFrame = [F: string, L: string ... 3 more fields]

scala> df.show(false)
+---+---+----+-----+-----+
|F  |L  |loy |email|state|
+---+---+----+-----+-----+
|f1 |l1 |loy1|null |s1   |
|f1 |l1 |loy1|e1   |s1   |
|f2 |l2 |loy2|e2   |s2   |
|f2 |l2 |loy2|e3   |null |
|f1 |l1 |null|e1   |s3   |
+---+---+----+-----+-----+


scala> val df2 = df.groupBy("F", "L", "Loy").agg(collect_set($"Email").alias("Email"), collect_set($"State").alias("State"))
df2: org.apache.spark.sql.DataFrame = [F: string, L: string ... 3 more fields]

scala> df2.show(false)
+---+---+----+--------+-----+
|F  |L  |Loy |Email   |State|
+---+---+----+--------+-----+
|f1 |l1 |null|[e1]    |[s3] |
|f2 |l2 |loy2|[e2, e3]|[s2] |
|f1 |l1 |loy1|[e1]    |[s1] |
+---+---+----+--------+-----+


scala> df2.groupBy("F","L","email").agg(max('loy).as("loy"),collect_set(concat_ws("",'state)).as("state")).show
+---+---+--------+----+--------+
|  F|  L|   email| loy|   state|
+---+---+--------+----+--------+
| f2| l2|[e2, e3]|loy2|    [s2]|
| f1| l1|    [e1]|loy1|[s3, s1]|
+---+---+--------+----+--------+


scala>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...