Как мне использовать Flux.publish ()? - PullRequest
1 голос
/ 14 октября 2019

У меня есть поток данных, которые я хочу преобразовать как в изображениях, так и в отчетах. Поскольку я хочу повторно использовать один и тот же источник данных, я подумал об использовании метода publish для Flux и объединил результаты, как показано в приведенном ниже коде:

    @Test
    fun `inside a publish, I can concat multiple fluxes`() {
        data class Data(val d: String)
        data class Image(val i: String)
        data class Report(val r: String)

        val result = Flux.just(Data("some data"))
                .publish { fluxOfData ->
                    val fod = fluxOfData  //.cache()
                    val images = fod.flatMap { Flux.just(Image("my image")) }
                    val reports = fod.flatMap { Flux.just(Report("my report")) }

                    Flux.concat(images, reports)
                }
                .collectList()
                .block()

        Assertions.assertEquals(result, listOf(Image("my image"), Report("my report")))
    }

Однако приведенный выше код только отправляет[Image(i=my image)] так что поток данных fluxOfData используется в первый раз. Однако я ожидал, что смогу подписаться на fluxOfData несколько раз.

При кэшировании fluxOfData путем вызова fluxOfData.cache() результаты получаются, как и ожидалось.

Так что жевариант использования Flux.publish()?
Есть ли другой идиоматический способ преобразования данных в изображения и отчеты?

1 Ответ

1 голос
/ 14 октября 2019

publish() превращает Flux в горячий источник (а не в холодный источник). Это означает, что несколько подписчиков могут подписаться в любое время и увидеть все элементы в будущем, но они пропустят все элементы, отправленные заранее.

Так, каков случай использования Flux.publish ()?

Существует множество вариантов использования этого - возможно, это поток сообщений журнала, которые каким-то образом потребляются (и вас интересуют только сообщения журнала, которые появляются в режиме реального времени, а не предыдущие журналы). Возможно, это прямая трансляция трансляции спортивного матча. Возможно, это чат.

При кэшировании fluxOfData путем вызова fluxOfData.cache () результаты получаются, как и ожидалось.

Да, потому что тогда выЯвно кэширует все излучаемые сигналы, поэтому, несмотря на то, что они являются «горячим» источником, новые подписчики по-прежнему могут воспроизводить все существующие элементы в потоке.

Существует ли другой идиоматический способ преобразования данных в оба изображения иотчеты?

Рассмотрим replay() вместо publish():

Превратите этот поток в горячий источник и сохраните последние испущенные сигналы для дальнейшегоАбонент. Сохранит неограниченное количество сигналов onNext. Завершение и ошибка также будут воспроизведены.

В более общем смысле это также будет хорошим фоновым чтением.

...