У меня есть некоторая конфигурация для использования и обслуживания для задания Flink.
Сейчас я делаю это, используя BroadcastedStream
и KeyedBroadcastProcessFunction
.
Взятие KeyedBroadcastProcessFunction
в качестве входных данных используется поток данных (SingleOutputStreamOperator
) и поток обновлений конфигурации (BroadcastedStream
).
Имеет
- A
MapState
объект, хранящий конфигурацию - Функция
open()
, которая инициализирует конфигурацию - Функция
processElement()
, которая объединяет сообщения из потока данных с текущей конфигурацией и отправляет его в выходной поток - Aфункция
processBroadcastElement()
, которая использует сообщение из потока обновлений конфигурации для обновления сохраненной MapState
Теперь я хочу реализовать что-то немного другое, используя Flink SQL API.
У меня есть несколько различных запросов SQL, каждый из которых зависит от текущего состояния конфигурации. В идеале я хотел бы сделать что-то вроде этого:
public void process(
StreamTableEnvironment tableEnv,
BroadcastStream<ConfUpdateObject> broadcastedConfigs // The broadcasted stream of configuration Update
){
Table configurationTable = ???; // TODO : A Table that should store the configurations and be updated by broadcastedConfigs
tableEnv.registerTable("Configuration", configurationTable);
Table dataTable1 = ...; // For example table created from a Kafka source
tableEnv.registerTable("Data1", dataTable1);
String query1 = "SELECT * from Data1 JOIN Configuration ON ..."
Table dataTable2 = ...; // For example table created from a Kafka source
tableEnv.registerTable("Data2", dataTable2);
String query2 = "SELECT * from Data2 JOIN Configuration ON ..."
Table result1 = tableEnv.sqlQuery(query1);
Table result2 = tableEnv.sqlQuery(query2);
... // Insert result1 and result2 in some sinks
}
Возможно ли это сделать каким-либо образом?
Примечания:
- Входные данные всеимеет
rowtime
поле - Конфигурация не имеет значений отметки времени
- Обновление конфигурации может происходить из других объектов, кроме
BroadcastStream
, если необходимо