Я пытаюсь выполнить простую агрегацию скользящего окна на основе источника Kafka.
Все события в Kafka содержат элемент отметки времени и расположены в порядке возрастания. Я пробовал использовать разные водяные маркеры Periodi c (восходящие, ограниченные и пользовательские, чтобы легче было отлаживать происходящее внутри). Я могу сказать, что метод extractTimestamp
всегда вызывается, но метод getCurrentWatermark
никогда не вызывается.
Я установил autoWatermarkInterval
даже на 1 мс, а затем даже на водяной знак для каждой подзадачи. никогда не обновляется. Я проверил это с помощью пользовательского интерфейса Flink и посмотрел доступную метрику c.
Я прочитал довольно много похожих вопросов об этой топике c на SO, и большинство из них касалось окна никогда излучение по нескольким причинам. Я не смог определить причину, по которой он никогда не будет повышать уровень водяного знака.
Я также подтвердил, что никакие данные не выводятся на стороне как поздние данные.
Поток в нем Большинство основных c форма:
val rfq = kafkaDataStream
.assignAscendingTimestamps(_.timestamp.toEpochMilli)
.keyBy("id")
val lateTag = new OutputTag[RFQ]("late") {}
val predictions: DataStream[RFQPrediction] = rfq
.window(SlidingEventTimeWindows.of(5,3))
.sideOutputLateData(lateTag)
.aggregate(new PricePredictionsAggregate)
.name("windowed-predictions")
Я убедился, что он работает нормально с AssignerWithPunctuatedWatermarks
.
Что может быть причиной того, что метод getCurrentWatermark
никогда не получит называется, хотя интервал установлен в 1 мс?
В тестовых данных, через которые я передаю данные, используется ограниченный список идентификаторов, для которых события постоянно генерируются с постоянно увеличивающейся отметкой времени.
Большое спасибо!