Истощенные данными каналы задерживают водяные знаки - PullRequest
0 голосов
/ 18 мая 2018

У меня есть конвейер обработки времени-события. Что-то вроде:

env.keyBy(_.key).process(myProcessFunction)

Функция процесса обрабатывает только события с водяными знаками (это потому, что требуется упорядочить события и в зависимости от того, что упорядочивает, генерирует событие).).Такая функция процесса была показана много раз, первое, подобное тому, что я использую, я нашел в Google здесь .Вкратце это выглядит так:

class MyProcessFunction extends ProcessFunction[Input, Output] {
   def open(config: Configuration) {
      //setup an ordered queue here
   }

   def processElement(event: Input, ..., out: Collector[Output]) {
      updateQueue(_.enqueue(event))
      context.timerService().registerEventTimeTimer(even.timestamp)
   }

   def onTimer(timestamp: Long, ..., out: Collector[Output]) {
      val watermark = ctx.timerService().currentWatermark()
      updateQueue{ queue =>
         while(queue.headOption.exists(_.timestamp < watermark) { 
            val event = queue.dequeue()
            out.collect(event)
         }
      }
   }
}

Теперь, допустим, я использую параллелизм 8, поскольку в большинстве случаев через эту систему протекает много ключей.Я написал себе несколько интеграционных / модульных тестов, которые работают с небольшим набором данных.Допустим, в этих данных у нас всего 4 ключа.Я столкнулся с, возможно, не очень хорошо задокументированным поведением flink.

Если вы посмотрите на StreamInputProcessor.processInput, в частности на строку 189 , которая обрабатывает водяные знаки и следует в StatusWatermarkValve.inputWatermark и, наконец, StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels вы обнаружите, что водяной знак, который назначается потоку, является MIN всех «активных» каналов для этого потока.Во время отладки во время выполнения я обнаружил, что, поскольку эти каналы инициализируются как ACTIVE (конструктор StatusWatermarkValve), они будут устанавливать для водяного знака потоков значение Long.MIN_VALUE, пока по крайней мере один водяной знак не попадет в каждый канал.

Это приводит к тому, что если у вас есть моя программа обработки, описанная выше, и у вас есть только 4 ключа, они никогда не будут обработаны.

Я немного удивлен, что водяные знаки не распространяются в каждый канал (возможно, оптимизация памяти).Во-вторых, я ОЧЕНЬ удивлен, что каналы по умолчанию устанавливаются как АКТИВНЫЕ, даже до того, как в них передается какое-либо событие.

Мой вопрос: возможно ли как-то обойти это?Я хочу иметь возможность обрабатывать эти 4 ключа.

Я говорил об интеграционном тесте, поэтому решение может быть просто уменьшить параллелизм в тесте или добавить ключи.Однако я могу вспомнить времена, когда мне не нравилось это поведение во время живого исполнения: в течение дня у меня гораздо больше событий, чем ночью, поэтому я измеряю свой параллелизм таким образом, чтобы обрабатывать день, но ночью конвейер будет нуждаться в данных => Некоторые каналы будут неиспользованными и, следовательно, заблокируют водяные знаки других каналов.Это почти наводит меня на мысль, что это ошибка, но я не могу быть уверен, что для этого может быть причина.

...