У меня есть ProcessWindowFunction
для обработки TumblingEventTime Windows, в котором я использую хранилище состояний для сохранения некоторых значений при нескольких перекатываниях windows. Моя проблема в том, что это хранилище состояний не сохраняется при переворачивании windows, т.е. если я сначала сохраню что-то в окне [0,999], а затем получу доступ к этому хранилищу из окна [1000,1999], хранилище будет пустым. Мне известно о глобальном состоянии и для каждого состояния окна указано здесь . Я хочу использовать глобальное состояние. Я также попытался создать минимальный рабочий пример, чтобы исследовать это:
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
public class twStateStoreTest {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);
final DataStream<Element> elements = env.fromElements(
Element.from(1, 500),
Element.from(1, 1000),
Element.from(1, 1500),
Element.from(1, 2000),
Element.from(99, 9999)
).
assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Element>() {
long w;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(w);
}
@Override
public long extractTimestamp(Element element, long previousElementTimestamp) {
w = element.getTimestamp();
return w;
}
});
elements
.keyBy(new KeySelector<Element, Integer>() {
@Override
public Integer getKey(Element element) throws Exception {
return element.value;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000L)))
.process(new MyProcessWindowFn()).
print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
MapState<Integer, Integer> stateStore;
@Override
public void open(Configuration parameters) throws Exception {
stateStore = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class));
}
@Override
public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {
if (stateStore.get(key) == null) {
stateStore.put(key, 1);
}else {
int previous = stateStore.get(key);
stateStore.put(key, previous+1);
}
out.collect("State store for " + elements.toString() + " is " + stateStore.entries().toString()
+ " for window : " + context.window());
}
}
static class Element {
private final long timestamp;
private final int value;
public Element(long timestamp, int value) {
this.timestamp = timestamp;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public int getValue() {
return value;
}
public static Element from(int value, long timestamp) {
return new Element(timestamp, value);
}
}
}
Здесь я пытаюсь подсчитать, сколько раз функция process()
вызывалась для ключа. В этом примере работает , и состояние действительно сохраняется при переворачивании windows. Я удостоверился, что этот пример точно отражает фактическую функцию processWindow, с удаленным другим ненужным кодом.
Но состояние не сохраняется в windows в фактической функции processWindowFunction!
Есть ли какие-то ошибки, которые мне явно не хватает? Есть ли какая-либо другая причина, по которой состояние не сохраняется в EventTimeTumbling Windows для функции processWindowFunction, которая использует MapState, определенный следующим образом:
private MapState<UserDefinedEnum, Boolean> activeSessionStore;
@Override
public void open(Configuration parameters) throws Exception {
activeSessionStore = getRuntimeContext().getMapState(new MapStateDescriptor<IUEventType, Boolean>(
"name", UserDefinedEnum.class, Boolean.class));
}
Вот фактический класс с удаленным раздуванием и согласно @ David и @ ShemTov предложения:
public class IUFeatureStateCombiner extends ProcessWindowFunction<IUSessionMessage, IUSessionMessage, IUMonitorFeatureKey, TimeWindow> {
private final static MapStateDescriptor<IUEventType, Boolean> desc = new MapStateDescriptor<IUEventType, Boolean>(
"store", IUEventType.class, Boolean.class);
private final Logger LOGGER = LoggerFactory.getLogger(IUFeatureStateCombiner.class);
@Override
public void process(IUMonitorFeatureKey iuMonitorFeatureKey, Context context, Iterable<IUSessionMessage> elements, Collector<IUSessionMessage> out) throws Exception {
...
MapState<IUEventType, Boolean> activeSessionStore = context.globalState().getMapState(desc);
Iterable<Entry<IUEventType, Boolean>> lastFeatureStates = activeSessionStore.entries(); // <-------- This returns an empty iterable
// even though I populated activeSessionStore with some values in the previous invocation of process()
... do something based on lastFeatureStates....
activeSessionStore.put(...);
}
@Override
public void clear(Context context) throws Exception {
context.globalState().getMapState(desc).clear();
}
}
И я вызываю его, используя:
inputStream.keyBy(IUSessionMessage::getMonitorFeatureKey).
window(TumblingEventTimeWindows.of(Time.milliseconds(1000L))).
process(new IUFeatureStateCombiner())
Проблема все еще есть, я получаю пустую итерацию при втором вызове process()
, хотя я заполнил состояние в предыдущем вызове.
Изменить: проблема решена, метод clear () не должен вызываться, поскольку это глобальное состояние.