Расчет продолжительности непрерывных событий во Флинке - PullRequest
0 голосов
/ 01 июня 2019

У меня есть поток некоторых изменений состояния устройства, например: case class DeviceState(ts: Long, state: Int).Устройство отправляет свое состояние только после его изменения.Так, например, это может быть так:

ts | state
----------
 0 | ONLINE
 3 | OFFLINE
11 | ONLINE
19 | OFFLINE

(в реальном коде ts это миллисекунды времени unix, я упростил это для примера) Я хочу разделить этот поток, переворачивая окно из 10отметьте галочки и рассчитайте общую продолжительность каждого состояния, например, если пунктуация была выбрана на отметке 45, результат должен быть таким:

 window | state   | duration
-----------------------------
 0 - 10 | ONLINE  | 3
 0 - 10 | OFFLINE | 7
10 - 20 | OFFLINE | 2
10 - 20 | ONLINE  | 8
20 - 30 | OFFLINE | 10
30 - 40 | OFFLINE | 10

Можно ли выполнить такие расчеты длительности во Flink?Я думаю, что это может быть сделано с помощью пользовательской функции Reduce, но я не могу понять, как генерировать последнее состояние, чтобы оно отображалось в каждом окне (в приведенном выше примере последнее состояние было на отметке 19, но оно все равно должно использоваться вокна 20-30, 30-40 и тд).

1 Ответ

0 голосов
/ 01 июня 2019

В оконном API Flink окно не существует, пока ему не назначено событие, что затрудняет задачу.

Одним из решений может быть использование ProcessFunction с таймером для добавления в ваш поток событий третьего типа, которые используются только для запуска окон, которые в противном случае были бы пустыми.

Другим решением было бы сделать всю работу по вычислению аналитики с помощью ProcessFunction (с некоторыми состояниями и таймерами), а не с окнами.

...