Java Reactor: Есть ли способ трансформировать Flux в Flux <T>без загрузки? - PullRequest
0 голосов
/ 09 мая 2020

У меня быстрый, но дорогой производитель (Spring WebClient) и очень медленный подписчик. Мне нужен способ соблюдать обратное давление во всей цепочке.

Во время реализации я понял, что flatMap, concatMap и другие используют активную выборку и, похоже, нет возможности отключить это поведение.

Использование спроса в подписчике без flatMap

Flux.defer(() -> Flux.range(1, 1000))
            .doOnRequest(i -> System.out.println("Requested: " + i))
            .doOnNext(v -> System.out.println("Emitted:   " + v))
            //.flatMap(Mono::just)
            .subscribe(new BaseSubscriber<Object>() {
                protected void hookOnSubscribe(final Subscription subscription) {
                    subscription.request(3);
                }

                protected void hookOnNext(final Object value) {
                    System.out.println("Received:  " + value);
                }
            });

.. выдает:

Requested: 3
Emitted:   1
Received:  1
Emitted:   2
Received:  2
Emitted:   3
Received:  3

Использование того же запроса с flatMap (без комментариев) дает:

Requested: 256
Emitted:   1
Received:  1
Emitted:   2
Received:  2
Emitted:   3
Received:  3
Emitted:   4
Emitted:   5
...
Emitted:   254
Emitted:   255
Emitted:   256

1 Ответ

1 голос
/ 12 мая 2020

Похоже, есть открытая проблема: https://github.com/reactor/reactor-core/issues/1397

В любом случае, я нашел решение для своей ситуации: block(). Имейте в виду, что эта операция разрешена только для потоков, которые не помечены как «только неблокирующие операции». (См. Также Project Blockhound )

Напомним, проблема в том, что в какой-то момент у меня есть Flux<Mono<T>> и .flatMap(...), .concatMap(...) и т.д. c. использовать какую-то нетерпеливую выборку. Flux<Mono<T>>, используемый для тестирования:

final Flux<Mono<Integer>> monoFlux = Flux.<Mono<Integer>, Integer>generate(
() -> 0, 
(state, sink) -> {
    state += 1;
    sink.next(Mono.just(state));
    return state;
}).doOnRequest(i -> System.out.println("Requested: " + i))
  .doOnNext(v -> System.out.println("Emitted:   " + v));

Чтобы не было нетерпеливого извлечения, я теперь создаю блок внутри карты, и он работает на удивление хорошо:

monoFlux.map(Mono::block)
        .subscribe(new MySubscriber<>());

Результат:

Requested: 3
Emitted:   MonoJust
Received:  1
Emitted:   MonoJust
Received:  2
Emitted:   MonoJust
Received:  3
...