Я реализовал Flink RichFunction
, который имеет следующую структуру:
public class MyFunction extends KeyedBroadcastProcessFunction <String, InputType, BroadcastedStateType, OutputType> {
private MapState<String, MyState> myState;
@Override
public void open(Configuration conf)throws Exception{
myState = getRuntimeContext().getMapState(new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class)));
}
@Override
public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {
MyState state = myState.get(value.ID());
// Do things
}
@Override
public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {
state.put(value.ID(), value.state()); // Update the mapState with value from broadcast
}
// retrieve all the state values and put them in the MapState
private void initialState() throws Exception{
Map<String, MyState> initialValues = ...;
this.cameras.putAll(initialValues);
}
}
Переменная mapState
хранит несколько состояний, которые обновляются через BroadcastedStream
. Обновление выполняется в функции processBroadcastElement()
.
В начале работы я хочу инициализировать mapState
с помощью функции initialState()
.
Проблема заключается в том, чтоЯ не могу использовать его в функции open()
(см. здесь почему)
Каков правильный способ инициализации mapState
в этом случае? (И во всех случаях с RichFunctions)