Как использовать Flink Temporal Tables? - PullRequest
0 голосов
/ 30 января 2019

Новые временные таблицы во Флинке выглядят потрясающе, но я пока не смог заставить их работать.Поскольку я не могу найти никаких рабочих примеров, мне интересно, заставил ли это кто-нибудь еще работать и может указать, что я делаю неправильно.

Вот небольшой контекст:

query:

SELECT s.id FROM sitemembership AS m, LATERAL TABLE (site(m.ts)) AS s WHERE m.siteId = s.id

Настройка:

// { "streamName": "sitemembership", "key": "siteId" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
table.printSchema();
tableEnv.registerTable(streamName, table);

// { "streamName": "site", "key": "id" }
Table table = tableEnv.fromDataStream(stream, String.join(",", rowTypeInfo.getFieldNames()) + ",ts.rowtime");
TemporalTableFunction temporalTable = table.createTemporalTableFunction("ts", key);
tableEnv.registerFunction(streamName, temporalTable);

Я не получаю никаких строк и ошибок.Я попытался перевернуть запрос, изменив таблицу, которую я зарегистрировал как временную, но безуспешно.Я также посмотрел на столбец "ts" и получил даты, которые заставляют меня поверить, что я должен получить хотя бы несколько строк.

Любая помощь приветствуется.

PS Я запускаю это наисторические данные из кафки, разделенные на «id», который также является ключом строки

1 Ответ

0 голосов
/ 30 января 2019

Вы можете найти полностью рабочий код «примеров» в виде тестов здесь (Содержание этих двух тестов (время обработки и даже время) более или менее повторяется в документации здесь и здесь или здесь ).Вы можете начать с этих примеров, а затем пошагово преобразовать их в конкретный вариант использования / сценарий.Возможно, было бы полезно сначала начать с заранее определенного набора данных и только потом переключиться на чтение из Kafka.

Что касается вашей проблемы, из вашего фрагмента кода неясно, что является неправильным, некоторые из потенциальных проблем:

  • водяные знаки не назначаются / не увеличиваются (assignTimestampsAndWatermarks() вызов в связанном testEventTimeInnerJoin()).Оператор временного соединения генерирует данные только по водяному знаку.
  • время строки между этими двумя таблицами, к которым вы пытаетесь присоединиться, не синхронизировано.Если в site нет строки, достаточно старой для объединения с записями sitemembership, результат будет пустым.Как, например, если все записи из site имеют поля времени из года 2019, тогда как sitemembership имеют только записи из 2018.
...