Mono.count блок на неопределенный срок - PullRequest
0 голосов
/ 12 апреля 2019

Я запустил это:

Mono<Void> mono = Mono.empty();

System.out.println("mono.block: " + mono.block());

, и он выдаст:

mono.block: null    

, как и ожидалось.Другими словами, вызов block вернется немедленно, если Mono уже завершен.

Еще один пример, напоминающий сценарий реального мира.У меня есть исходный поток, например:

Flux<Integer> ints = Flux.range(0, 2);

Я создаю подключаемый поток, который я буду использовать для разрешения нескольких подписчиков:

ConnectableFlux<Integer> publish = ints.publish();

Для этого примера, скажем, есть один реальный-работный подписчик:

publish
   .doOnComplete(() -> System.out.println("publish completed"))
   .subscribe();

и другой подписчик, который просто производит счетчик элементов:

Mono<Long> countMono = publish
   .doOnComplete(() -> System.out.println("countMono completed"))
   .count();

countMono.subscribe();

Я подключаю соединяемый поток и распечатываю счетчик элементов:

publish.connect();

System.out.println("block");

long count = countMono.block();

System.out.println("count: " + count);

Это печатает:

publish completed
countMono completed
block

Другими словами, оба подписчика подписываются успешно и завершают, но затем countMono.block() блокирует на неопределенный срок.

Почему это так и как мне заставить это работать?Моя конечная цель - получить количество элементов.

1 Ответ

1 голос
/ 13 апреля 2019

Вы можете заставить это работать, используя autoConnect или refCount вместо ручного вызова connect().

Например:

        Flux<Integer> ints = Flux.range(0, 2);
        Flux<Integer> publish = ints.publish()
                .autoConnect(2);  // new 
        publish
                .doOnComplete(() -> System.out.println("publish completed"))
                .subscribe();
        Mono<Long> countMono = publish
                .doOnComplete(() -> System.out.println("countMono completed"))
                .count();
        // countMono.subscribe();
        long count = countMono.block();
        System.out.println("count: " + count);

Почему ваш пример не работает?

Вот то, что я думаю, что происходит в вашем примере ... но это основано на моих ограниченных знаниях, и я не уверен на 100%, что это правильно.

  1. .publish() превращает источник вверх по течению в горячий поток
  2. Затем вы подписываетесь дважды (но они еще не запускают поток, поскольку подключаемый поток еще не подключен к восходящему потоку)
  3. .connect() подписывается на восходящий поток и запускает поток
  4. восходящий поток и две подписки, которые были зарегистрированы до завершения connect() (поскольку все это происходит в главном потоке)
  5. В этот момент ConnectableFlux больше не подключен к восходящему потоку, потому что восходящий поток завершен (Документы по реактору дают подробные сведения о том, что происходит с ConnectableFlux, когда новые подписки поступают после завершения восходящего источника, так что это то, что я Я не уверен на 100%.)
  6. block() создает новую подписку.
  7. Но поскольку ConnectableFlux больше не подключен, данные не передаются
  8. Если бы вам пришлось снова вызывать connect() (из другого потока, так как основной поток заблокирован), данные снова были бы переданы, и block() завершился бы. Однако это будет новая последовательность (не оригинальная последовательность, выполненная на шаге 4)

Почему мой пример работает?

Создаются только две подписки (вместо 3 в вашем примере), одна из вызовов .subscribe() и одна из .block(). ConnectableFlux автоматически подключается после 2 подписок, и поэтому подписка block() завершается. Обе подписки имеют одинаковую последовательность восходящего потока.

...