Как обрабатывать поздние элементы в окне Flink's Table API? - PullRequest
2 голосов
/ 19 июня 2019

В нашем потоковом приложении, которое использует Flink 1.55 и его API таблиц, мне нужно обнаруживать и обрабатывать поздние элементы. Я не могу найти альтернативу функциональности API DataStream .sideOutputLateData (...)

Я пытался искать в документации Flink https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/tableApi.html, много гугл и не нашел ничего полезного

Пример:

table
  .window(Tumble over windowLengthInMinutes.minutes on 'timeStamp as 'timeWindow)
  .groupBy(..fieds list)
  .select(..fields)

Приведенный код работает должным образом. Проблема состоит в том, что элементы, которые поступают поздно, как определено размером окна и допустимой задержкой, отбрасываются. Есть ли способ обработать эти поздние элементы с помощью Table API?

Ответы [ 2 ]

0 голосов
/ 27 июня 2019

Я нашел одно решение. В настоящее время я использовал BoundedOutOfOrdernessTimestampExtractor, который предоставляет информацию о отметке времени водяного знака. Я использовал эту информацию, чтобы разделить входной поток и обработать поздний поток отдельно.

0 голосов
/ 19 июня 2019

Начиная с Flink 1.8.0, API таблиц в настоящее время не поддерживает это напрямую. Один из способов обойти это - преобразовать вашу таблицу в DataStream[Row] и установить для нее побочный вывод:

val outputTag = OutputTag[String]("side-output")

val flink = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(flink)

// Make sure the source emits data to the selected side output
tableEnv.registerTableSource(...)
val table = tableEnv.sqlQuery("QUERY")

// Can also be toAppendStream, depending on the underlying table output
val dataStream = tableEnv.toRetractStream(table)
val sideOutputStream = dataStream.getSideOutput(outputTag)
...