Flink Streaming - сохранение / печать пропущенных сообщений - PullRequest
0 голосов
/ 12 ноября 2019

Я новичок, чтобы моргать и в потоковом режиме. Я использую скользящее окно размером 5Sec с 1Sec slide для подсчета количества сообщений (проверьте код ниже), но мне нужна помощь в сохранении (или распечатке) пропущенных сообщений, которые поступили поздно, я пытался использовать sideOutputLateData, но у меня это не работает

val sensorData = stream
    .assignTimestampsAndWatermarks(new SensorTimeAssigner)
    .map(x => (x.event_id, x.user_id, 1))
    .keyBy(x => (x._1, x._2))
    .timeWindow(Time.seconds(5), Time.seconds(1))
    .sum("_3")  

с sideOutputLateData:

val lateOutputTag = OutputTag[(Int, Int, Int)]("late")
val sensorData = stream
    .assignTimestampsAndWatermarks(new SensorTimeAssigner)
    .map(x => (x.event_id, x.user_id, 1))
    .keyBy(x => (x._1, x._2))
    .timeWindow(Time.seconds(5), Time.seconds(1))
    .sideOutputLateData(lateOutputTag)
    .sum("_3")

sensorData
    .getSideOutput(lateOutputTag)
    .print()

1 Ответ

0 голосов
/ 13 ноября 2019

Один вероятный ответ: на самом деле нет поздних событий. Это будет иметь место, если вы не установили характеристику времени на время события. Недостаточно использовать assignTimestampsAndWatermarks, вам также нужно либо использовать

.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(1)))

, чтобы заставить Flink использовать временные окна событий, либо установить, что временные окна событий являются значениями по умолчанию через

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

В противном случае timeWindow фактически создает SlidingProcessingTimeWindow, и в этом случае ничто не может быть запоздалым.

Другая возможность состоит в том, что ваш водяной знак не работает так, как вы ожидаете, и позволяет всем событиям бытьвовремя.

...