Поток подписчика Reactor Flux остановлен при использовании Reduce на flatMap - PullRequest
0 голосов
/ 26 ноября 2018

Я хочу изменить свой код для одного абонента.Теперь у меня есть

auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120)).subscribe(
        s -> s.groupBy(Auction::getItem).subscribe( longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats )
));

Этот код работает правильно, метод сокращения очень прост.Я пытался изменить свой код для одного абонента

    auctionFlux.window(Duration.ofSeconds(120), Duration.ofSeconds(120))
        .flatMap(window -> window.groupBy(Auction::getItem))
        .flatMap(longAuctionGroupedFlux -> longAuctionGroupedFlux.reduce(new ItemDumpStats(), this::calculateStats))
        .subscribe(itemDumpStatsMono -> log.info(itemDumpStatsMono.toString()));

Это мой код, и этот код не работает.Нет ошибок и нет результатов.После отладки я обнаружил, что код застрял на втором flatMap, когда я уменьшил поток.Я думаю, что проблема слияния flatMap, застревание на разрешении Mono.Кто-нибудь сейчас, как решить эту проблему и использовать только один подписчик?

Как реплицировать, вы можете использовать другой класс или создать его.В небольших размерах работает, а на больших умирает

List<Auction> auctionList = new ArrayList<>();
for (int i = 0;i<100000;i++){
    Auction a = new Auction((long) i, "test");
    a.setItem((long) (i%50));
    auctionList.add(a);
}

Flux.fromIterable(auctionList).groupBy(Auction::getId).flatMap(longAuctionGroupedFlux ->
        longAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats)).collectList().subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()));

При таком подходе результат мгновенный, но я использую 3 подписчика

Flux.fromIterable(auctionList)
        .groupBy(Auction::getId)
        .subscribe(
                auctionIdAuctionGroupedFlux -> auctionIdAuctionGroupedFlux.reduce(new ItemDumpStats(), (itemDumpStats, auction) -> itemDumpStats).subscribe(itemDumpStats -> System.out.println(itemDumpStats.toString()
                )
        ));

Ответы [ 2 ]

0 голосов
/ 28 ноября 2018

Я думаю, что поведение, которое вы описали, связано с взаимодействием между groupBy, связанным с flatMap.Проверьте groupBy документацию.В нем говорится, что:

Для правильной работы groupBy необходимо слить и использовать группы вниз по течению.В частности, когда критерий создает большое количество групп, это может привести к зависанию, если группы не используются надлежащим образом в нисходящем направлении (например, из-за flatMap с параметром maxConcurrency, который установлен слишком низким).

По умолчанию для maxConcurrency (flatMap) установлено значение 256 (я проверил исходный код 3.2.2).Таким образом, выбор более 256 групп может привести к зависанию выполнения (особенно, когда все выполнение происходит в одном потоке).

Следующий код помогает понять, что происходит при соединении операторов groupBy и flatMap:

@Test
public void groupAndFlatmapTest() {
    val groupCount = 257;
    val groupSize = 513;
    val list = rangeClosed(1, groupSize * groupCount).boxed().collect(Collectors.toList());
    val source = Flux.fromIterable(list)
            .groupBy(i -> i % groupCount)
            .flatMap(Flux::collectList);
    StepVerifier.create(source).expectNextCount(groupCount).expectComplete().verify();
}

Выполнение этого кода зависает.Изменение groupCount на 256 или меньше приводит к прохождению теста (для каждого значения groupSize).

Итак, что касается исходной проблемы, очень возможно, что вы создаете большое количество групп с вашимключ-селектор Auction::getItem.

0 голосов
/ 28 ноября 2018

Добавление parallel исправлена ​​проблема, но я ищу ответ, почему резко уменьшите flatMap.

...