ПРИСОЕДИНИТЬСЯ к одной таблице flink tableapi двумя столбцами - PullRequest
0 голосов
/ 27 июня 2018

У меня есть таблица с данными, и мне нужно объединить два поля.

Я написал запрос, но он не работает

SELECT * 
FROM Data t1 
JOIN Data t2 ON t1.s = t2.o

код

val csvTableSource = CsvTableSource
  .builder
  .path("src/main/resources/data.dat")
  .field("s", Types.STRING)
  .field("p", Types.STRING)
  .field("o", Types.STRING)
  .field("TIMESTAMP", Types.STRING)
  .fieldDelimiter(",")
  .ignoreFirstLine
  .ignoreParseErrors
  .commentPrefix("%")
  .build()
tableEnv.registerTableSource("Data", csvTableSource)

val query = "SELECT * FROM Data t1 JOIN Data t2 ON t1.s = t2.o"
val table = tableEnv.sqlQuery(query)

Я получаю следующее исключение

Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: 

FlinkLogicalJoin(condition=[=($0, $6)], joinType=[inner])
  FlinkLogicalTableSourceScan(table=[[Data]], fields=[s, p, o, TIMESTAMP], source=[CsvTableSource(read fields: s, p, o, TIMESTAMP)])
  FlinkLogicalTableSourceScan(table=[[Data]], fields=[s, p, o, TIMESTAMP], source=[CsvTableSource(read fields: s, p, o, TIMESTAMP)])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.

1 Ответ

0 голосов
/ 28 июня 2018

Полагаю, вы пытаетесь выполнить этот запрос в потоковой среде. Неоконные соединения в потоковых таблицах были добавлены с помощью Flink 1.5.0.

Итак, вы пытаетесь использовать функцию, которая пока не поддерживается в Flink 1.4.2.

Вы можете переключиться на пакетную среду, которая должна быть возможной, если вы читаете файлы CSV, или обновиться до Flink 1.5.0.

...