У меня есть конвейер обработки времени-события. Что-то вроде:
env.keyBy(_.key).process(myProcessFunction)
Функция процесса обрабатывает только события с водяными знаками (это потому, что требуется упорядочить события и в зависимости от того, что упорядочивает, генерирует событие).).Такая функция процесса была показана много раз, первое, подобное тому, что я использую, я нашел в Google здесь .Вкратце это выглядит так:
class MyProcessFunction extends ProcessFunction[Input, Output] {
def open(config: Configuration) {
//setup an ordered queue here
}
def processElement(event: Input, ..., out: Collector[Output]) {
updateQueue(_.enqueue(event))
context.timerService().registerEventTimeTimer(even.timestamp)
}
def onTimer(timestamp: Long, ..., out: Collector[Output]) {
val watermark = ctx.timerService().currentWatermark()
updateQueue{ queue =>
while(queue.headOption.exists(_.timestamp < watermark) {
val event = queue.dequeue()
out.collect(event)
}
}
}
}
Теперь, допустим, я использую параллелизм 8, поскольку в большинстве случаев через эту систему протекает много ключей.Я написал себе несколько интеграционных / модульных тестов, которые работают с небольшим набором данных.Допустим, в этих данных у нас всего 4 ключа.Я столкнулся с, возможно, не очень хорошо задокументированным поведением flink.
Если вы посмотрите на StreamInputProcessor.processInput, в частности на строку 189 , которая обрабатывает водяные знаки и следует в StatusWatermarkValve.inputWatermark и, наконец, StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels вы обнаружите, что водяной знак, который назначается потоку, является MIN всех «активных» каналов для этого потока.Во время отладки во время выполнения я обнаружил, что, поскольку эти каналы инициализируются как ACTIVE (конструктор StatusWatermarkValve), они будут устанавливать для водяного знака потоков значение Long.MIN_VALUE, пока по крайней мере один водяной знак не попадет в каждый канал.
Это приводит к тому, что если у вас есть моя программа обработки, описанная выше, и у вас есть только 4 ключа, они никогда не будут обработаны.
Я немного удивлен, что водяные знаки не распространяются в каждый канал (возможно, оптимизация памяти).Во-вторых, я ОЧЕНЬ удивлен, что каналы по умолчанию устанавливаются как АКТИВНЫЕ, даже до того, как в них передается какое-либо событие.
Мой вопрос: возможно ли как-то обойти это?Я хочу иметь возможность обрабатывать эти 4 ключа.
Я говорил об интеграционном тесте, поэтому решение может быть просто уменьшить параллелизм в тесте или добавить ключи.Однако я могу вспомнить времена, когда мне не нравилось это поведение во время живого исполнения: в течение дня у меня гораздо больше событий, чем ночью, поэтому я измеряю свой параллелизм таким образом, чтобы обрабатывать день, но ночью конвейер будет нуждаться в данных => Некоторые каналы будут неиспользованными и, следовательно, заблокируют водяные знаки других каналов.Это почти наводит меня на мысль, что это ошибка, но я не могу быть уверен, что для этого может быть причина.