Когда вы назначаете пользовательский windows, размер состояния может быстро go из-под контроля. Это главным образом потому, что каждое окно должно содержать все записи, которые попадают в него, до тех пор, пока окно не будет агрегировано и в конечном итоге не будет удалено. В вашем коде также создается впечатление, что вы создаете огромное количество windows для каждой записи.
Вы не указали свой вариант использования, но я предполагаю, что вы действительно хотите рассчитать, сколько событий растягивается в определенный момент времени для каждого ключа с размером ячейки 10 мс. Если это так, то это не является прямым примером использования windows.
. То, что вы хотите сделать:
- Разделить ваше событие на более мелкие события.
- Группировка по ключу и корзине.
- Подсчитайте вашу корзину.
Грубый набросок в коде:
input.flatMap(element -> {
...
for (long start = lastStart; start > timestamp - size; start -= slide) {
emit(new KeyTime(key, start));
}
})
.keyBy(keyTime -> keyTime)
.count()
Вы можете применить windows после keyBy
для принудительного вызова определенных выходных свойств, таких как ожидание в течение нескольких минут, а затем вывод всего и игнорирование поздних событий.
Примечание: KeyTime - это простой POJO, содержащий ключ и время корзины.
редактировать: после вашего комментария решение на самом деле намного проще.
env.addSource(source)
.keyBy(element -> new Tuple2<>(element.getKey(), element.getTime()))
.count()
.addSink(new RedisCustomizeSink(redisProperties));