Мне нужно подсчитать события в переворачивающемся окне. Но я также хочу отправлять события со значением 0, если в окне не было событий.
Что-то вроде.
- windowCount: 5
- windowCount: 0
- windowCount: 0
- windowCount: 3
- windowCount: 0 ...
import com.google.protobuf.Message;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.skydivin4ng3l.cepmodemon.models.events.aggregate.AggregateOuterClass;
public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(T event, Long accumulator) {
return accumulator + 1L;
}
@Override
public AggregateOuterClass.Aggregate getResult(Long accumulator) {
return AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
}
@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}
и используется здесь
DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new BasicCounter<MonitorOuterClass.Monitor>());
TimeCharacteristics - ingestionTime
Я читал о функции TiggerFunction, которая может определять, получил ли агрегированный поток событие по прошествии x времени, но я не уверен, что это правильный способ сделать это.
Я ожидал, что агрегирование произойдет, даже если в окне вообще не будет событий. Может быть, есть настройка, о которой я не знаю?
Спасибо за любые подсказки.