Project Reactor: кэшируйте последний элемент для каждого подписанного издателя - PullRequest
0 голосов
/ 17 марта 2019

У меня есть процессор, который подписывается на издателей, которые прибывают в произвольное время.Для каждого нового подписчика на процессор я хочу отправить последний элемент от каждого издателя.

    class PublishersState {
        val outputProcessor = DirectProcessor.create<String>()

        fun addNewPublisher(publisher: Flux<String>) {
            publisher.subscribe(outputProcessor)
        }

        fun getAllPublishersState(): Flux<String> = outputProcessor
    }

    val publisher1 = Mono
        .just("Item 1 publisher1")
        .mergeWith(Flux.never())

    val publisher2 = Flux
        .just("Item 1 publisher2", "Item 2 publisher2")
        .mergeWith(Flux.never())

    val publishersState = PublishersState()

    publishersState.getAllPublishersState().subscribe {
        println("Subscriber1: $it")
    }

    publishersState.addNewPublisher(publisher1)

    publishersState.addNewPublisher(publisher2)

    publishersState.getAllPublishersState().subscribe {
        println("Subscriber2: $it")
    }

Мне нужно изменить приведенный выше код, чтобы он вывел следующее:

Subscriber1: Item 1 publisher1
Subscriber1: Item 1 publisher2
Subscriber1: Item 2 publisher2
// Subscriber2 subscribers here and receives the last item from each publisher
Subscriber2: Item 1 publisher1
Subscriber2: Item 2 publisher2

Есть ли простой способ кэшировать последний элемент для каждого издателя?

Ответы [ 2 ]

0 голосов
/ 19 марта 2019

Я решил свой случай следующим образом:

class PublishersState {
  val publishersList = Collections.synchronizedList<Flux<String>>(mutableListOf()) // adding sync list for storing publishers 
  val outputProcessor = DirectProcessor.create<String>()

  fun addNewPublisher(publisher: Flux<String>) {
    val cached = publisher.cache(1) // caching the last item for a new publisher
    publishersList.add(cached)
    cached.subscribe(outputProcessor)
  }

  fun getAllPublishersState(): Flux<String> = publishersList
    .toFlux()
    .reduce(outputProcessor as Flux<String>) { acc, flux -> acc.mergeWith(flux.take(1)) } // merging the last item of each publisher with outputProcessor 
    .flatMapMany { it }
}
0 голосов
/ 19 марта 2019

Используйте ReplayProcessor вместо DirectProcessor:

val outputProcessor = ReplayProcessor.cacheLast()
...