Я хочу изменить свой код для одного абонента.Теперь у меня есть
auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120)).subscribe(
s -> s.groupBy(Auction::getItem).subscribe( longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats )
));
Этот код работает правильно, метод сокращения очень прост.Я пытался изменить свой код для одного абонента
auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120))
.flatMap(window -> window.groupBy(Auction::getItem))
.flatMap(longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats))
.subscribe(itemDumpStatsMono -> log.info(itemDumpStatsMono.toString()));
Это мой код, и этот код не работает.Нет ошибок и нет результатов.После отладки я обнаружил, что код застрял на втором flatMap, когда я уменьшил поток.Я думаю, что проблема слияния flatMap, застревание на разрешении Mono.Кто-нибудь сейчас, как решить эту проблему и использовать только один подписчик?
Как реплицировать, вы можете использовать другой класс или создать его.В небольших размерах работает, а на больших умирает
List<Auction> auctionList = new ArrayList<>();
for (int i = 0;i<100000;i++){
Auction a = new Auction((long) i, "test");
a.setItem((long) (i%50));
auctionList.add(a);
}
Flux.fromIterable(auctionList).groupBy(Auction::getId).flatMap(longAuctionGroupedFlux ->
longAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats)).collectList().subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()));
При таком подходе результат мгновенный, но я использую 3 подписчика
Flux.fromIterable(auctionList)
.groupBy(Auction::getId)
.subscribe(
auctionIdAuctionGroupedFlux -> auctionIdAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats).subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()
)
));