Мне удалось решить эту проблему путем точного копирования CountTrigger и переопределения следующего:
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
Мне также не нужно было расширять ProcessingTimeSessionWindow, поскольку я могу просто использовать созданный пользовательский триггер. К сожалению, мы не можем расширить CountTrigger из-за его закрытого конструктора, иначе это было бы лучшим решением.
Итак, окончательный код выглядит примерно так:
consoleInput.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.trigger(ProcessingTimeCountTrigger.of(10L))
.process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
@Override
public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
List<String> alphaList = new ArrayList<>();
elements.forEach(alphaList::add);
out.collect("Time is " + new Date().toString());
out.collect("Total " + alphaList.size() + " elements in window");
}
})
Это отправляет данные в цепочке вниз по течению, если у нас есть 10 элементов ИЛИ прошло 10 секунд с тех пор, как мы последний раз видели элемент.
Код пользовательского триггера ниже:
public class ProcessingTimeCountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private ProcessingTimeCountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
}
@Override
public String toString() {
return "ProcessingTimeCountTrigger(" + maxCount + ")";
}
/**
* Creates a trigger that fires once the number of elements in a pane reaches the given count.
*
* @param maxCount The count of elements at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public static <W extends Window> ProcessingTimeCountTrigger<W> of(long maxCount) {
return new ProcessingTimeCountTrigger<>(maxCount);
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}