Спасибо за ввод, не думал здраво, конечно, просто иметь несколько подписчиков, как это:
val flux = Flux.just("MyData1", "MyData2", "MyData3");
flux.doOnNext { println("Subscribing one$it") }.subscribe()
flux.doOnNext { println("Subscribing Two$it") }.subscribe()
Будет выводить:
Subscribing oneMyData1
Subscribing oneMyData2
Subscribing oneMyData3
Subscribing TwoMyData1
Subscribing TwoMyData2
Subscribing TwoMyData3
Как указано выше, есть Share , но этот API не позволяет устанавливать минимальное количество подписчиков, поэтому лучше вызывать нижележащие функции, так что в моем случае я хочу подождать, пока у нас не будет двух подписчиков. Документы заявляют
a Поток, который при первой подписке приводит к тому, что исходный Поток подписывается один раз, поэтому поздние подписчики могут пропустить элементы.
val flux = Flux.just("MyData1", "MyData2", "MyData3").publish().refCount(2)
Это приводит к следующемувывод, чтобы гарантировать, что сообщения не пропущены, если есть задержка при запуске второго абонента.
Subscribing oneMyData1
Subscribing TwoMyData1
Subscribing oneMyData2
Subscribing TwoMyData2
Subscribing oneMyData3
Subscribing TwoMyData3