Я публикую ниже исправление кода для моего пользовательского подхода Trigger
. Я не использовал Evictor
, как предложено @David, но использовал FIRE_AND_PURGE в onProcessingTime
. Этот подход, кажется, работает, протестировав несколько сценариев ios.
Конфигурация слота
private static final Map<String, Long> eventSlots = new HashMap<>();
static {
eventSlots.put("type22", 30000l);
eventSlots.put("type12", 15000l);
eventSlots.put("type9", 10000l);
eventSlots.put("type2", 20000l);
eventSlots.put("type7", 13000l);
}
обработка кода
SingleOutputStreamOperator<Event> sourceStream = ...
someSourceStream
.keyBy("type")
.window(GlobalWindows.create())
.trigger(new Trigger<Event, GlobalWindow>() {
@Override
public TriggerResult onElement(Event element, long timestamp, GlobalWindow window,
TriggerContext ctx) throws Exception {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + eventSlots.get(element.getType()));
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
}).process(new ProcessWindowFunction<Event, Events, Tuple, Context> () {
@Override
public void process(Tuple arg0,
ProcessWindowFunction<Event, Events, Tuple, GlobalWindow>.Context ctx,
Iterable<Event> arg2, Collector<Event> arg3) throws Exception {
List<Event> events = new ArrayList<>();
arg2.forEach(events::add);
arg3.collect(new Events(events));
});