Передача элементов обратно во входной поток после обработки в Flink? - PullRequest
3 голосов
/ 20 февраля 2020

Сценарий:

У меня есть поток событий от датчика. Событие может иметь тип T-типа или J-типа .

  • События типа T с событием произошедшей отметки времени.
  • J Для событий типа «Тип» есть отметка времени начала и окончания.

На основе отметки времени начала и окончания события типа J примените агрегированные логики c ко всем событиям типа T, попадающим в промежуток времени. ранжировать и записать результат в БД.

Для этого я создал пользовательский триггер, который запускается при получении события J-типа. В моей пользовательской функции ProcessWindowFunction я выполняю агрегационную логику c и проверку времени.

Но может быть сценарий, когда событие T-типа не попадает во временной диапазон текущего J- Введите событие. В этом случае событие T-типа должно быть передано в следующее окно перед очисткой текущего окна.

Stream Window

Мысль о решениях:

  1. Pu sh необработанные события Т-типа в поток Kinesis (источник) в пользовательской функции обработки окна. (Наихудший вариант решения)

  2. Вместо FIRE_AND_PURGE используйте FIRE, чтобы поддерживать состояние в течение всего времени выполнения. Удалить обработанные элементы, используя элементы Iterator. (Не рекомендуется хранить бесконечное окно)

Хотелось бы узнать, есть ли способ напрямую обработать необработанные события обратно во входной поток (без кинезис). (Переставление в очередь)

Или

Есть ли способ сохранить состояние в контексте keyBy, чтобы мы выполняли вычисления для этих необработанных данных (до или) вместе с элементами окна .

1 Ответ

3 голосов
/ 21 февраля 2020

Вот два решения. Они более или менее эквивалентны по своему базовому поведению, но вам может показаться, что один или другой легче понять, поддержать или протестировать.

Что касается вашего вопроса, нет, нет способа l oop вернуть (переместить в очередь) неизрасходованные события, не возвращая их обратно в Kinesis. Но просто держаться за них, пока они не понадобятся, должно быть хорошо.

Решение 1. Используйте RichFlatMapFunction

По мере поступления событий T-типа добавляйте их к объекту ListState. Когда приходит событие J-типа, соберите на выходе все соответствующие события T-типа из списка и обновите список, чтобы сохранить только те события T-типа, которые будут принадлежать более поздним событиям J-типа.

Решение 2. Используйте Global Windows с пользовательским триггером и Evictor

В дополнение к тому, что вы уже сделали, внедрите Evictor, который (после того, как окно было FIREd) удаляет только событие типа J и все соответствующие события типа T из окна.

Обновление: состояние очистки для устаревших ключей / мертвых датчиков

С решением 1, Вы можете использовать состояние TTL для организации любого неактивного состояния, связанного с удалением мертвых ключей. Или вы можете использовать KeyedProcessFunction вместо RichFlatMapFunction и использовать таймеры для выполнения sh того же.

Управление состоянием для устаревших ключей с помощью оконного API может быть менее простым, но для решения 2 Я считаю, что вы можете расширить свой пользовательский триггер, включив тайм-аут, который очистит окно. И если вы использовали глобальное состояние в ProcessWindowFunction, вам нужно полагаться на состояние TTL, чтобы очистить его.

...