Почему у Mono в Project Reactor нет оператора share ()? - PullRequest
2 голосов
/ 07 июля 2019

Я бы хотел «поделиться» Моно, как я делаю с Flux.

Пример использования потока () с Kotlin:

fun `test flux share`() {
        val countDownLatch = CountDownLatch(2)

        val originalFlux = Flux.interval(Duration.ofMillis(200))
                .map { "$it = ${Instant.now()}" }
                .take(7)
                .share()
                .doOnTerminate {
                    countDownLatch.countDown()
                }


        println("Starting #1...")

        originalFlux.subscribe {
            println("#1: $it")
        }

        println("Waiting ##2...")
        CountDownLatch(1).await(1000, TimeUnit.MILLISECONDS)
        println("Starting ##2...")

        originalFlux.subscribe {
            println("##2: $it")
        }

        countDownLatch.await(10, TimeUnit.SECONDS)

        println("End!")
    }

Мне не удалось найти оператор share () для Mono. Почему его не существует?

1 Ответ

1 голос
/ 24 июля 2019

Мне не удалось найти оператор share () для Mono.Почему его не существует?

Особое поведение share() не имеет особого смысла с Mono, но у нас есть cache(), что может быть тем, что вам нужно.

share() эквивалентно тому, что вы звоните publish().refcount() на свой Flux.В частности, publish() дает вам ConnectableFlux, или «горячий» поток.(refcount() просто автоматически подключает / останавливает поток на основе первого / последнего подписчика.)

«raison d'être» для ConnectableFlux позволяет нескольким подписчикам подписываться в любое время, пропуская данныеэто было выпущено, прежде чем они подписались.В случае Mono это не имеет большого смысла, так как по определению испускается только одно значение - поэтому, если вы пропустили его, значит, вы пропустили его.

Однако, у нас есть cache () в Mono, что также превращает его в «горячий» источник (где первоначальный поставщик не вызывается для каждой подписки, только один раз при первой подписке.) Очевидное отличие отВыше указано, что значение воспроизводится для каждого подписчика, но это почти наверняка то, что вы хотите.

(Sidenote, если вы тестируете вышеупомянутое - обратите внимание, что вам нужно использовать Mono.fromSupplier() вместо Mono.just(), так как последний просто возьмет значение один раз при создании экземпляра, поэтому cache() не имеет никакого значимого эффекта.)

...