Функция Flink Window getResult не срабатывает - PullRequest
0 голосов
/ 03 апреля 2020

Я пытаюсь использовать время события в моей работе Flink и использую BoundedOutOfOrdernessTimestampExtractor для извлечения метки времени и создания водяного знака. Но у меня есть некоторые входные данные Кафки с разреженным потоком, они могут долго не иметь данных, что делает getResult in AggregateFunction вообще не вызываемым. Я вижу данные, поступающие в функцию add.

Я установил getEnv().getConfig().setAutoWatermarkInterval(1000L); Я пытался

 eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(TumblingEventTimeWindows.of(Time.minutes(windowInMinutes)))
            .allowedLateness(WINDOW_LATENESS)
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))

также окно сеанса

eventsWithKey
            .keyBy(entry -> (String) entry.get(key))
            .window(EventTimeSessionWindows.withGap(Time.seconds(30)))
            .aggregate(new CountTask(basicMetricTags, windowInMinutes))

Все водяные знаки отмечены показывает No Watermark Как я могу позволить Флинку игнорировать эту вещь без водяных знаков?

1 Ответ

1 голос
/ 04 апреля 2020

К вашему сведению, это обычно называется проблемой «неактивного источника». Это происходит потому, что всякий раз, когда оператор Flink имеет два или более входов, его водяной знак является минимумом водяных знаков от его входов. Если один из этих входных данных останавливается, его водяной знак больше не продвигается.

Если вы можете организовать его, лучшим решением будет включение в ваши источники данных событий поддержки активности. Это позволит вам уверенно продвигать свои водяные знаки, зная, что источник просто простаивает, а не, например, находится в автономном режиме.

Если это невозможно, и если у вас есть источники, которые не находятся в режиме ожидания, тогда вы могли бы поставить rebalance() перед BoundedOutOfOrdernessTimestampExtractor (и перед keyBy), чтобы каждый экземпляр продолжал получать некоторые события и мог продвигать свой водяной знак. Это происходит за счет дополнительной перестановки в сети.

Возможно, наиболее часто используемым решением является использование генератора водяных знаков, который обнаруживает простоя и искусственно увеличивает водяной знак на основе таймера времени обработки. ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor является примером этого.

...