Flink поддерживая состояние конфигурации - PullRequest
1 голос
/ 09 октября 2019

У меня есть сценарий поддержки конфигурации во Flink, который я не знаю, как с этим справиться.

Допустим, у меня где-то есть конфигурация, и она мне нужна для обработки. При инициализации задания Flink я хочу загрузить всю конфигурацию.

Эта конфигурация также может быть изменена во время выполнения задания Flink, поэтому я должен сохранить в памяти состояние этой конфигурации и обновить еепри необходимости. Обновления конфигурации доступны из KafkaSource.

Итак, вот что у меня есть:

У меня есть функция, которая загружает всю конфигурацию, сохраняет ее в состоянии и связывает с моими даннымиstream:

public class MyConfiguration extends RichFlatMapFunction<Row, Row>{
    private transient MapState<String, MyConfObject> configuration;

    @Override
    public void open(MyConfiguration config) throws Exception{
        MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>(
                "configuration",
                BasicTypeInfo.STRING_TYPE_INFO,
                ...
        );
        configuration = getRuntimeContext().getMapState(descriptor);
        configuration.putAll(...);   // Load configuration from somewhere
    }

    @Override
    public void flatMap(Row value, Collector<Row> out) throws Exception {
        MyConfObject conf = configuration.get(...);
        ...               // Associate conf with data
        out.collect(value);
    }
}

И мой конвейер выглядит следующим образом:

DataStream<Row> dataStream = ...; // My data stream
DataStream<Map<String, MyConfObject> streamConf = 
     env.addSource(new FlinkKafkaConsumer<Row>(..., ..., ...)) // The stream of configuration updates
        .map(...); 

return dataStream
    .assignTimestampsAndWatermarks(...)
    .flatMap(new MyConfiguration())

    ... //Do some processing

    .map(m -> {
        ObjectMapper objectMapper = new ObjectMapper();
        String json = objectMapper.writeValueAsString(m);
        return json.getBytes();
    });

Я хочу использовать поток обновлений конфигурации streamConf для обновления переменной State внутри MyConfiguration функция плоской карты. Как я могу это сделать?

1 Ответ

1 голос
/ 09 октября 2019

Я бы посоветовал вам написать источник, который читает информацию о конфигурации из Kafka, а затем транслирует изменения в конфигурацию через широковещательный поток в функцию отображения. Функция отображения будет хранить полную текущую конфигурацию в своем постоянном состоянии, а широковещательный поток означает, что все экземпляры функции отображения будут получать все изменения конфигурации.

...