Flink: DataStream to Table - PullRequest
       76

Flink: DataStream to Table

2 голосов
/ 28 апреля 2020

Usecase : чтение протобуф-сообщений от Kafka, десериализация их, применение некоторых преобразований (выравнивание некоторых столбцов) и запись в Dynamodb.

К сожалению, Kafka Flink Connector поддерживает только форматы csv, json и avro. Поэтому мне пришлось использовать API более низкого уровня (поток данных).

Проблема: Если я смогу создать таблицу из объекта потока данных, то смогу принять запрос для запуска этой таблицы. Это сделало бы часть преобразования бесшовной и обобщенной c. Можно ли выполнить SQL запрос к объекту потока данных?

1 Ответ

1 голос
/ 28 апреля 2020

Если у вас есть DataStream объектов, то вы можете просто зарегистрировать данную DataStream как таблицу, используя StreamTableEnvironment.

. Это будет выглядеть примерно так:

val myStream = ...
val env: StreamExecutionEnvironment = configureFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment)
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
tEnv.registerDataStream("myTable", myStream, [Field expressions])

Тогда Вы сможете запросить динамическую таблицу c, созданную из Your DataStream.

...