Инициализировать содержимое MapState - PullRequest
1 голос
/ 09 октября 2019

Я реализовал Flink RichFunction, который имеет следующую структуру:

public class MyFunction extends KeyedBroadcastProcessFunction <String, InputType, BroadcastedStateType, OutputType> {

    private MapState<String, MyState> myState;              

    @Override
    public void open(Configuration conf)throws Exception{
        myState = getRuntimeContext().getMapState(new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class)));
    }

    @Override
    public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {
        MyState state = myState.get(value.ID());

        // Do things
    }

    @Override
    public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {
        state.put(value.ID(), value.state());   // Update the mapState with value from broadcast
    }

    // retrieve all the state values and put them in the MapState
    private void initialState() throws Exception{
       Map<String, MyState> initialValues = ...;
       this.cameras.putAll(initialValues);
    }
}

Переменная mapState хранит несколько состояний, которые обновляются через BroadcastedStream. Обновление выполняется в функции processBroadcastElement().

В начале работы я хочу инициализировать mapState с помощью функции initialState().

Проблема заключается в том, чтоЯ не могу использовать его в функции open() (см. здесь почему)

Каков правильный способ инициализации mapState в этом случае? (И во всех случаях с RichFunctions)

Ответы [ 2 ]

1 голос
/ 09 октября 2019

Вы хотите реализовать org.apache.flink.streaming.api.checkpoint.CheckpointedFunction

Когда вы сделаете это, вы реализуете два метода:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {

    // called when it's time to save state

    myState.clear();

        // Update myState with current application state 

}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    // called when things start up, possibly recovering from an error

    descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));

    myState = context.getKeyedStateStore().getMapState(descriptor);

    if (context.isRestored()) {

        // restore application state from myState  

    }       

}

Вы инициализируете переменную myStateв методе initializeState () вместо open ().

0 голосов
/ 10 октября 2019

Я не верю, что вы действительно можете инициализировать состояние широковещания в initializeState (). Единственный способ изменить состояние широковещания - через контекст чтения / записи, который вы получаете в методе processBroadcastElement.

Но что вы можете сделать, это использовать context.isRestored () в initializeState, чтобы определить, инициализируется ли KeyedBroadcastProcessFunction в самый первый раз, и установить временную локальную переменную для записи этой информации. И затем при первом вызове метода processBroadcastElement вы можете использовать эту информацию, чтобы решить, что хранить в состоянии широковещания. Но вам нужно будет что-то отправить в трансляции, чтобы начать это.

...