Вот два решения. Они более или менее эквивалентны по своему базовому поведению, но вам может показаться, что один или другой легче понять, поддержать или протестировать.
Что касается вашего вопроса, нет, нет способа l oop вернуть (переместить в очередь) неизрасходованные события, не возвращая их обратно в Kinesis. Но просто держаться за них, пока они не понадобятся, должно быть хорошо.
Решение 1. Используйте RichFlatMapFunction
По мере поступления событий T-типа добавляйте их к объекту ListState
. Когда приходит событие J-типа, соберите на выходе все соответствующие события T-типа из списка и обновите список, чтобы сохранить только те события T-типа, которые будут принадлежать более поздним событиям J-типа.
Решение 2. Используйте Global Windows с пользовательским триггером и Evictor
В дополнение к тому, что вы уже сделали, внедрите Evictor
, который (после того, как окно было FIREd) удаляет только событие типа J и все соответствующие события типа T из окна.
Обновление: состояние очистки для устаревших ключей / мертвых датчиков
С решением 1, Вы можете использовать состояние TTL для организации любого неактивного состояния, связанного с удалением мертвых ключей. Или вы можете использовать KeyedProcessFunction
вместо RichFlatMapFunction
и использовать таймеры для выполнения sh того же.
Управление состоянием для устаревших ключей с помощью оконного API может быть менее простым, но для решения 2 Я считаю, что вы можете расширить свой пользовательский триггер, включив тайм-аут, который очистит окно. И если вы использовали глобальное состояние в ProcessWindowFunction
, вам нужно полагаться на состояние TTL, чтобы очистить его.