У меня есть данные в следующем формате, отсортированном по отметке времени, каждая строка представляет событие:
+----------+--------+---------+
|event_type| data |timestamp|
+----------+--------+---------+
| A | d1 | 1 |
| B | d2 | 2 |
| C | d3 | 3 |
| C | d4 | 4 |
| C | d5 | 5 |
| A | d6 | 6 |
| A | d7 | 7 |
| B | d8 | 8 |
| C | d9 | 9 |
| B | d10 | 12 |
| C | d11 | 20 |
+----------+--------+---------+
Мне нужно собрать эти события в серии следующим образом:
1. Событие типа C помечаетконец серии
2. Если имеется несколько последовательных событий типа C, они попадают в одну серию, а последняя отмечает конец этой серии
3. Каждая серия может охватывать7 дней при макс. , даже если нет события C для его завершения
Обратите также внимание, что в один день может быть несколько серий.На самом деле столбцы отметок времени являются стандартными отметками времени UNIX, здесь цифры для простоты обозначают дни.
Поэтому желаемый результат будет выглядеть следующим образом:
+---------------------+--------------------------------------------------------------------+
|first_event_timestamp| events: List[(event_type, data, timestamp)] |
+---------------------+--------------------------------------------------------------------+
| 1 | List((A, d1, 1), (B, d2, 2), (C, d3, 3), (C, d4, 4), (C, d5, 5)) |
| 6 | List((A, d6, 6), (A, d7, 7), (B, d8, 8), (C, d9, 9)) |
| 12 | List((B, d10, 12)) |
| 20 | List((C, d11, 20)) |
+---------------------+--------------------------------------------------------------------+
Я пытался решить эту проблемуиспользуя функции окна, где я бы добавил 2 столбца, например:
1. Событие, помеченное столбцом семени, непосредственно после события типа C с использованием некоторого уникального идентификатора
2. SeriesId был заполнен значениями из столбца семени, используя last ()чтобы пометить все события в одной серии с одинаковым идентификатором
3. Затем я бы сгруппировал события по SeriesId
К сожалению, это не представляется возможным:
+----------+--------+---------+------+-----------+
|event_type| data |timestamp| seed | series_id |
+----------+--------+---------+------+-----------+
| A | d1 | 1 | null | null |
| B | d2 | 2 | null | null |
| C | d3 | 3 | null | null |
| C | d4 | 4 | 0 | 0 |
| C | d5 | 5 | 1 | 1 |
| A | d6 | 6 | 2 | 2 |
| A | d7 | 7 | null | 2 |
| B | d8 | 8 | null | 2 |
| C | d9 | 9 | null | 2 |
| B | d10 | 12 | 3 | 3 |
| C | d11 | 20 | null | 3 |
+----------+--------+---------+------+-----------+
- Я не могу проверить предыдущую строку на равенство, используя lag (), то есть следующий код:
df.withColumn(
"seed",
when(
(lag($"eventType", 1) === ventType.Conversion).over(w),
typedLit(DigestUtils.sha256Hex("some fields").substring(0, 32))
)
)
throws
org.apache.spark.sql.AnalysisException: Expression '(lag (eventType # 76, 1, null) = C)' не поддерживается в оконной функции.
Как видно из таблицы, она не срабатывает в случае, когда есть несколько последовательных событий C, а также не будет работать для первой и последней серии.
Я застрял здесь, любая помощь будет принята (использование Dataframe / dataset api предпочтительнее).