Flink AssignerWithPeriodicWatermarks getCurrentWatermark никогда не вызывается - PullRequest
1 голос
/ 06 марта 2020

Я пытаюсь выполнить простую агрегацию скользящего окна на основе источника Kafka.

Все события в Kafka содержат элемент отметки времени и расположены в порядке возрастания. Я пробовал использовать разные водяные маркеры Periodi c (восходящие, ограниченные и пользовательские, чтобы легче было отлаживать происходящее внутри). Я могу сказать, что метод extractTimestamp всегда вызывается, но метод getCurrentWatermark никогда не вызывается.

Я установил autoWatermarkInterval даже на 1 мс, а затем даже на водяной знак для каждой подзадачи. никогда не обновляется. Я проверил это с помощью пользовательского интерфейса Flink и посмотрел доступную метрику c.

Я прочитал довольно много похожих вопросов об этой топике c на SO, и большинство из них касалось окна никогда излучение по нескольким причинам. Я не смог определить причину, по которой он никогда не будет повышать уровень водяного знака.

Я также подтвердил, что никакие данные не выводятся на стороне как поздние данные.

Поток в нем Большинство основных c форма:

      val rfq = kafkaDataStream
        .assignAscendingTimestamps(_.timestamp.toEpochMilli)
        .keyBy("id")

      val lateTag = new OutputTag[RFQ]("late") {}

      val predictions: DataStream[RFQPrediction] = rfq
        .window(SlidingEventTimeWindows.of(5,3))
        .sideOutputLateData(lateTag)
        .aggregate(new PricePredictionsAggregate)
        .name("windowed-predictions")

Я убедился, что он работает нормально с AssignerWithPunctuatedWatermarks.

Что может быть причиной того, что метод getCurrentWatermark никогда не получит называется, хотя интервал установлен в 1 мс?

В тестовых данных, через которые я передаю данные, используется ограниченный список идентификаторов, для которых события постоянно генерируются с постоянно увеличивающейся отметкой времени.

Большое спасибо!

...