Flink Table: обновлять таблицу конфигурации - PullRequest
0 голосов
/ 17 октября 2019

У меня есть некоторая конфигурация для использования и обслуживания для задания Flink.

Сейчас я делаю это, используя BroadcastedStream и KeyedBroadcastProcessFunction.

Взятие KeyedBroadcastProcessFunctionв качестве входных данных используется поток данных (SingleOutputStreamOperator) и поток обновлений конфигурации (BroadcastedStream).

Имеет

  • A MapState объект, хранящий конфигурацию
  • Функция open(), которая инициализирует конфигурацию
  • Функция processElement(), которая объединяет сообщения из потока данных с текущей конфигурацией и отправляет его в выходной поток
  • Aфункция processBroadcastElement(), которая использует сообщение из потока обновлений конфигурации для обновления сохраненной MapState

Теперь я хочу реализовать что-то немного другое, используя Flink SQL API.

У меня есть несколько различных запросов SQL, каждый из которых зависит от текущего состояния конфигурации. В идеале я хотел бы сделать что-то вроде этого:

public void process(
    StreamTableEnvironment tableEnv, 
    BroadcastStream<ConfUpdateObject> broadcastedConfigs    // The broadcasted stream of configuration Update
    ){

    Table configurationTable = ???;                     // TODO : A Table that should store the configurations and be updated by broadcastedConfigs
    tableEnv.registerTable("Configuration", configurationTable);

    Table dataTable1 = ...;                             // For example table created from a Kafka source
    tableEnv.registerTable("Data1", dataTable1);

    String query1 = "SELECT * from Data1 JOIN Configuration ON ..."

    Table dataTable2 = ...;                             // For example table created from a Kafka source
    tableEnv.registerTable("Data2", dataTable2);

    String query2 = "SELECT * from Data2 JOIN Configuration ON ..."

    Table result1 = tableEnv.sqlQuery(query1);
    Table result2 = tableEnv.sqlQuery(query2);

    ... // Insert result1 and result2 in some sinks
}

Возможно ли это сделать каким-либо образом?

Примечания:

  • Входные данные всеимеет rowtime поле
  • Конфигурация не имеет значений отметки времени
  • Обновление конфигурации может происходить из других объектов, кроме BroadcastStream, если необходимо
...