У меня есть сценарий поддержки конфигурации во Flink, который я не знаю, как с этим справиться.
Допустим, у меня где-то есть конфигурация, и она мне нужна для обработки. При инициализации задания Flink я хочу загрузить всю конфигурацию.
Эта конфигурация также может быть изменена во время выполнения задания Flink, поэтому я должен сохранить в памяти состояние этой конфигурации и обновить еепри необходимости. Обновления конфигурации доступны из KafkaSource.
Итак, вот что у меня есть:
У меня есть функция, которая загружает всю конфигурацию, сохраняет ее в состоянии и связывает с моими даннымиstream:
public class MyConfiguration extends RichFlatMapFunction<Row, Row>{
private transient MapState<String, MyConfObject> configuration;
@Override
public void open(MyConfiguration config) throws Exception{
MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>(
"configuration",
BasicTypeInfo.STRING_TYPE_INFO,
...
);
configuration = getRuntimeContext().getMapState(descriptor);
configuration.putAll(...); // Load configuration from somewhere
}
@Override
public void flatMap(Row value, Collector<Row> out) throws Exception {
MyConfObject conf = configuration.get(...);
... // Associate conf with data
out.collect(value);
}
}
И мой конвейер выглядит следующим образом:
DataStream<Row> dataStream = ...; // My data stream
DataStream<Map<String, MyConfObject> streamConf =
env.addSource(new FlinkKafkaConsumer<Row>(..., ..., ...)) // The stream of configuration updates
.map(...);
return dataStream
.assignTimestampsAndWatermarks(...)
.flatMap(new MyConfiguration())
... //Do some processing
.map(m -> {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(m);
return json.getBytes();
});
Я хочу использовать поток обновлений конфигурации streamConf
для обновления переменной State внутри MyConfiguration
функция плоской карты. Как я могу это сделать?