Spark Strucutured Streaming Window в столбце без метки времени - PullRequest
0 голосов
/ 12 сентября 2018

Я получаю поток данных в форме:

+--+---------+---+----+
|id|timestamp|val|xxx |
+--+---------+---+----+
|1 |12:15:25 | 50| 1  |
|2 |12:15:25 | 30| 1  |
|3 |12:15:26 | 30| 2  |
|4 |12:15:27 | 50| 2  |
|5 |12:15:27 | 30| 3  |
|6 |12:15:27 | 60| 4  |
|7 |12:15:28 | 50| 5  |
|8 |12:15:30 | 60| 5  |
|9 |12:15:31 | 30| 6  |
|. |...      |...|... |

Меня интересует применение оконной операции к столбцу xxx, точно так же, как оконная операция над временной меткой доступна в потоковой передаче Spark с некоторым размером окна и шагом скольжения.

Позвольте в groupBy с оконной функцией ниже, lines представляет потоковый фрейм данных с размером окна: 2 и шагом скольжения: 1.

val c_windowed_count = lines.groupBy(
  window($"xxx", "2", "1"), $"val").count().orderBy("xxx")

Итак, вывод должен быть следующим:

+------+---+-----+
|window|val|count|
+------+---+-----+
|[1, 3]|50 |  2  |
|[1, 3]|30 |  2  |
|[2, 4]|30 |  2  |
|[2, 4]|50 |  1  |
|[3, 5]|30 |  1  |
|[3, 5]|60 |  1  |
|[4, 6]|60 |  2  |
|[4, 6]|50 |  1  |
|...   |.. | ..  |

Я пытался использовать partitionBy, но он не поддерживается в Spark Structured Streaming.

Я использую Spark Structured Streaming 2.3.1.

Спасибо!

Ответы [ 2 ]

0 голосов
/ 21 сентября 2018

Новое в Spark 2.2 - это произвольные операции с состоянием

Вариант использования - управление пользовательскими сессиями, «пользовательское окно»

прокрутите половину страницы вниз, чтобы увидеть пример

Если умное решение Шайдо работает для вас, тогда я предлагаю остаться с этим. Для более сложных требований произвольные операции с состоянием выглядят как путь.

0 голосов
/ 20 сентября 2018

В настоящее время невозможно использовать окна для столбцов без меток времени, используя Spark Structured Streaming. Однако вы можете преобразовать столбец xxx в столбец отметки времени , выполнить groupBy и count, а затем преобразовать обратно.

from_unixtime может использоваться для преобразования количества секунд с 1970-01-01 в метку времени. Используйте столбец xxx в качестве секунд, и можно создать поддельную отметку времени для использования в окне:

lines.groupBy(window(from_unixtime($"xxx"), "2 seconds", "1 seconds"), $"val").count()
  .withColumn("window", struct(unix_timestamp($"window.start"), unix_timestamp($"window.end")).as("window"))
  .filter($"window.col1" =!= 0)
  .orderBy($"window.col1")

Выше, группировка выполняется по преобразованной временной метке, и следующая строка преобразует ее обратно в исходное число. Фильтр выполняется, поскольку первые две строки будут одним окном [0,2] (то есть только в строках с xxx равно 1), но их можно пропустить.

Результирующий вывод вышеуказанного ввода:

+------+---+-----+
|window|val|count|
+------+---+-----+
| [1,3]| 50|    2|
| [1,3]| 30|    2|
| [2,4]| 30|    2|
| [2,4]| 50|    1|
| [3,5]| 30|    1|
| [3,5]| 60|    1|
| [4,6]| 60|    2|
| [4,6]| 50|    1|
| [5,7]| 30|    1|
| [5,7]| 60|    1|
| [5,7]| 50|    1|
| [6,8]| 30|    1|
+------+---+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...