Обработка Flink SQL на основе динамической конфигурации - PullRequest
0 голосов
/ 01 октября 2019

Я использую Flink SQL API для обработки потока данных из Kafka. Для обработки этих данных я использую некоторые конфигурации, загруженные из конечной точки HTTP один раз при инициализации.

Вот пример того, как работает мой код (здесь он возвращает только конфигурацию, связанную с идентификатором данных):

// Function that contains the processing part
public void process(StreamTableEnvironment tableEnv, StreamTableDescriptor sourceStream, UpsertStreamTableSink sink){
    tableEnv.registerFunction("StaticConf", new StaticConf());

    sourceStream.registerTableSource("Input");
    tableEnv.registerTableSink("SINK", sink);

    Table output = tableEnv.sqlQuery("SELECT * FROM Input, LATERAL TABLE(StaticConf(myId))");

    processed.insertInto("SINK");
}

Где StaticConf - это TableFunction, который загружаетКонфигурация в конструкторе и выполняет объединение в функции eval на основе атрибута значения id myId:

public class StaticConf extends TableFunction<Row>{

    // A Row contains the configuration associated to an ID
    List<Row> myConfig;

    public GetStaticData(){
        String confEndpoint = "http://MyStoredConf:1234"

        try (InputStream inputStream = new URL(confEndpoint).openStream()){
            myConfig = mapper.readValue(inputStream, TypeFactory.defaultInstance().constructParametricType(ArrayList.class, Row.class));
        }catch(Exception e){
            System.out.println("Error while loading file from {} : {}", confEndpoint, e.getMessage());
        }
    }

    public void eval(String id){
        for(Row r: myConfig){
            if(r.getField(0).equals(id)){ // For the example, let's say that the field 0 of the row is the ID
                collect(r);
            }
        }
    }

}

При таком решении конфигурация загружается только один раз при инициализации задания. Так что он хорошо работает со статической конфигурацией, но не обрабатывает динамические.

Каков наилучший способ обработки динамической конфигурации, без необходимости перезагружать конфигурацию в каждом сообщении? (Например, каждые X минут)

Примечания:

  • Я использую Flink 1.8.0
  • В полном сценарии использования я выполняю обработку временных окон на основевремя ввода входных данных (не знаю, повлияет ли это на решение)
...