Возможность использования опережающих и запаздывающих функций вместе с groupBy в Spark. - PullRequest
0 голосов
/ 29 апреля 2019

Мне интересно, есть ли способ использовать опережение \ отставание, чтобы сосчитать что-то вроде этого

Первый шаг: у меня есть фрейм данных

+----+-----------+------+
| id | timestamp | sess |
+----+-----------+------+
| xx | 1         | A    |
+----+-----------+------+
| yy | 2         | A    |
+----+-----------+------+
| zz | 1         | B    |
+----+-----------+------+
| yy | 3         | B    |
+----+-----------+------+
| tt | 4         | B    |
+----+-----------+------+

И я хочу собрать идентификаторы, которыепредшествует определенному разделению идентификатора с помощью session_id

+----+---------+
| id | id_list |
+----+---------+
| yy | [xx,zz] |
+----+---------+
| xx | []      |
+----+---------+
| zz | []      |
+----+---------+
| tt | [yy]    |
+----+---------+

1 Ответ

1 голос
/ 29 апреля 2019

Вы можете создать window над столбцом sess и lag идентификаторы, как вы упомянули в вопросе.Затем вы можете использовать groupBy с агрегатной функцией collect_list, чтобы получить вывод.

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy($"sess").orderBy($"timestamp")
val df1 = df.withColumn("lagged", lag($"id", 1).over(w))
 df1.select("id", "lagged").groupBy($"id").agg(collect_list($"lagged").as("id_list")).show

//+---+--------------------+
//| id|             id_list|
//+---+--------------------+
//| tt|                [yy]|
//| xx|                  []|
//| zz|                  []|
//| yy|            [zz, xx]|
//+---+--------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...