К вашему сведению, это обычно называется проблемой «неактивного источника». Это происходит потому, что всякий раз, когда оператор Flink имеет два или более входов, его водяной знак является минимумом водяных знаков от его входов. Если один из этих входных данных останавливается, его водяной знак больше не продвигается.
Если вы можете организовать его, лучшим решением будет включение в ваши источники данных событий поддержки активности. Это позволит вам уверенно продвигать свои водяные знаки, зная, что источник просто простаивает, а не, например, находится в автономном режиме.
Если это невозможно, и если у вас есть источники, которые не находятся в режиме ожидания, тогда вы могли бы поставить rebalance()
перед BoundedOutOfOrdernessTimestampExtractor
(и перед keyBy), чтобы каждый экземпляр продолжал получать некоторые события и мог продвигать свой водяной знак. Это происходит за счет дополнительной перестановки в сети.
Возможно, наиболее часто используемым решением является использование генератора водяных знаков, который обнаруживает простоя и искусственно увеличивает водяной знак на основе таймера времени обработки. ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor является примером этого.