Моя цель - собрать / накопить список строк в потоковом контексте с помощью операции со скользящим окном.Я не могу использовать простой ListAccumulator<String>
, так как результаты доступны только после завершения работы.
Я имею в виду использование агрегатной функции, которая использует ListAccumulator, например:
AggregateFunction<Tuple2<String, Integer>,ListAccumulator<String>,List<String>>
Однако я бы хотел сбросить ListAccumulator
на пустой при запуске события Windows, будетAggregateFunction
вызывает функцию createAccumulator()
или resetAccumulator()
для каждого триггера
DataStream<Tuple2<String, Integer>> ips = ...
ips.keyBy(0).timeWindow(Time.seconds(WINDOW_DURATION.getSeconds()), Time.seconds(SLIDE_DURATION.getSeconds())).aggregate(new RFAgg());
private static class RFAgg implements AggregateFunction<Tuple2<String, Integer>,ListAccumulator<String>,List<String>> {
...
}