Поведение хранилища состояний с ключами в ProcessWindowFunction (Apache Flink Java) - PullRequest
1 голос
/ 18 июня 2020

У меня есть ProcessWindowFunction для обработки TumblingEventTime Windows, в котором я использую хранилище состояний для сохранения некоторых значений при нескольких перекатываниях windows. Моя проблема в том, что это хранилище состояний не сохраняется при переворачивании windows, т.е. если я сначала сохраню что-то в окне [0,999], а затем получу доступ к этому хранилищу из окна [1000,1999], хранилище будет пустым. Мне известно о глобальном состоянии и для каждого состояния окна указано здесь . Я хочу использовать глобальное состояние. Я также попытался создать минимальный рабочий пример, чтобы исследовать это:

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;


public class twStateStoreTest {


    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000L);

        final DataStream<Element> elements = env.fromElements(
                Element.from(1, 500),
                Element.from(1, 1000),
                Element.from(1, 1500),
                Element.from(1, 2000),

                Element.from(99, 9999)
                ).
                assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Element>() {
                    long w;
                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(w);
                    }

                    @Override
                    public long extractTimestamp(Element element, long previousElementTimestamp) {
                        w = element.getTimestamp();
                        return w;
                    }
                });

        elements
                .keyBy(new KeySelector<Element, Integer>() {
                    @Override
                    public Integer getKey(Element element) throws Exception {
                        return element.value;
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.milliseconds(1000L)))
                .process(new MyProcessWindowFn()).
                print();

        // execute program
        env.execute("Flink Streaming Java API Skeleton");
    }

    static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
        MapState<Integer, Integer> stateStore;

        @Override
        public void open(Configuration parameters) throws Exception {
            stateStore = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class));
        }

        @Override
        public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {

            if (stateStore.get(key) == null) {
                stateStore.put(key, 1);
            }else {
                int previous = stateStore.get(key);
                stateStore.put(key, previous+1);
            }
            out.collect("State store for " + elements.toString() + " is " + stateStore.entries().toString()
                    + " for window : " + context.window());
        }
    }




    static class Element {
        private final long timestamp;
        private final int value;

        public Element(long timestamp, int value) {
            this.timestamp = timestamp;
            this.value = value;
        }

        public long getTimestamp() {
            return timestamp;
        }

        public int getValue() {
            return value;
        }

        public static Element from(int value, long timestamp) {
            return new Element(timestamp, value);
        }
    }


}

Здесь я пытаюсь подсчитать, сколько раз функция process() вызывалась для ключа. В этом примере работает , и состояние действительно сохраняется при переворачивании windows. Я удостоверился, что этот пример точно отражает фактическую функцию processWindow, с удаленным другим ненужным кодом.

Но состояние не сохраняется в windows в фактической функции processWindowFunction!

Есть ли какие-то ошибки, которые мне явно не хватает? Есть ли какая-либо другая причина, по которой состояние не сохраняется в EventTimeTumbling Windows для функции processWindowFunction, которая использует MapState, определенный следующим образом:

private MapState<UserDefinedEnum, Boolean> activeSessionStore;

@Override
    public void open(Configuration parameters) throws Exception {
        activeSessionStore = getRuntimeContext().getMapState(new MapStateDescriptor<IUEventType, Boolean>(
                                                "name", UserDefinedEnum.class, Boolean.class));
    }

Вот фактический класс с удаленным раздуванием и согласно @ David и @ ShemTov предложения:

public class IUFeatureStateCombiner extends ProcessWindowFunction<IUSessionMessage, IUSessionMessage, IUMonitorFeatureKey, TimeWindow> {

    private final static MapStateDescriptor<IUEventType, Boolean> desc =  new MapStateDescriptor<IUEventType, Boolean>(
            "store", IUEventType.class, Boolean.class);
    private final Logger LOGGER = LoggerFactory.getLogger(IUFeatureStateCombiner.class);

    @Override
    public void process(IUMonitorFeatureKey iuMonitorFeatureKey, Context context, Iterable<IUSessionMessage> elements, Collector<IUSessionMessage> out) throws Exception {
        ...

        MapState<IUEventType, Boolean> activeSessionStore = context.globalState().getMapState(desc);

        Iterable<Entry<IUEventType, Boolean>> lastFeatureStates = activeSessionStore.entries(); // <-------- This returns an empty iterable
        // even though I populated activeSessionStore with some values in the previous invocation of process()

        ... do something based on lastFeatureStates....

        activeSessionStore.put(...);
    }

    @Override
    public void clear(Context context) throws Exception {
        context.globalState().getMapState(desc).clear();
    }
}

И я вызываю его, используя:

inputStream.keyBy(IUSessionMessage::getMonitorFeatureKey).
window(TumblingEventTimeWindows.of(Time.milliseconds(1000L))).
            process(new IUFeatureStateCombiner())

Проблема все еще есть, я получаю пустую итерацию при втором вызове process(), хотя я заполнил состояние в предыдущем вызове.

Изменить: проблема решена, метод clear () не должен вызываться, поскольку это глобальное состояние.

Ответы [ 3 ]

1 голос
/ 18 июня 2020

Вы хотите сделать что-то подобное. И имейте в виду, что это хранилища состояний для каждого ключа - для каждого ключа есть отдельная карта - так что где вы делаете stateStore.get(key), это действительно не имеет смысла. Возможно, все, что вам нужно, это ValueState, если вам нужно хранить только целое число для каждого ключа.

static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
    private final static MapStateDescriptor mapDesc = new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class);

    @Override
    public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {

        MapState<Integer, Integer> stateStore = context.globalState.getMapState(mapDesc);

        ...
    }
}

Обратите внимание, что глобальное хранилище состояний никогда не очищается. Поэтому, если у вас неограниченное пространство клавиш, вы в конечном итоге столкнетесь с проблемами. Вы можете настроить TTL состояния в дескрипторах состояния, чтобы справиться с этим.

1 голос
/ 18 июня 2020

Моя ошибка заключалась в том, что я неправильно использовал метод clear(). Поскольку это глобальное состояние, использование метода clear() очистит состояние, как только истечет время TumblingWindow. Как указал Дэвид, глобальное состояние никогда не очищается, и мы должны определить TTL для неограниченных ключевых потоков.

1 голос
/ 18 июня 2020

Насколько мне известно, вы не можете получить глобальное состояние из метода @override open.

Вам нужно получить его из функции процесса в ProcessWindowFunction:

context.globalState().getMapState(<your_Map_State_Descriptor>)
...