Apache Flink: как применить пользовательскую логику к поздним событиям? - PullRequest
0 голосов
/ 20 февраля 2019

Несмотря на то, что Flink имеет некоторые встроенные инструменты для обработки поздних данных, например, допустимое опоздание , я бы хотел обработать поздние данные самостоятельно.Например, я хотел бы отслеживать последние события или просто сохранять их в базе данных.

Как я могу это сделать?

Ответы [ 2 ]

0 голосов
/ 20 февраля 2019

ProcessFunctions (ProcessFunction, KeyedProcessFunction и т. Д.) Обеспечивают доступ к метке времени события записи и TimerService через объект Context.TimerService дает доступ к текущему водяному знаку.

Вы можете определить поздние записи, сравнив отметку времени события и водяной знак.Если временная метка меньше или равна водяному знаку, событие опаздывает.

Вам решать, как вы хотите обрабатывать поздние события.Вы можете пометить их, вы можете отказаться от них, испустить их через побочный вывод или выполнить любые вычисления с ними.

0 голосов
/ 20 февраля 2019

Обычно задержка и водяные знаки используются в оконных операторах.И если вы используете оконный оператор, вы можете использовать sideoutput следующим образом:

val windowStream = eventStream.keyBy(output => output.rule)
  .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.MINUTES)))
  .sideOutputLateData(lateOutputTag)

И получить поздние элементы из sideoutput следующим образом:

windowStream.getSideOutput(lateOutputTag).print()
...