У меня есть процессор, который подписывается на издателей, которые прибывают в произвольное время.Для каждого нового подписчика на процессор я хочу отправить последний элемент от каждого издателя.
class PublishersState {
val outputProcessor = DirectProcessor.create<String>()
fun addNewPublisher(publisher: Flux<String>) {
publisher.subscribe(outputProcessor)
}
fun getAllPublishersState(): Flux<String> = outputProcessor
}
val publisher1 = Mono
.just("Item 1 publisher1")
.mergeWith(Flux.never())
val publisher2 = Flux
.just("Item 1 publisher2", "Item 2 publisher2")
.mergeWith(Flux.never())
val publishersState = PublishersState()
publishersState.getAllPublishersState().subscribe {
println("Subscriber1: $it")
}
publishersState.addNewPublisher(publisher1)
publishersState.addNewPublisher(publisher2)
publishersState.getAllPublishersState().subscribe {
println("Subscriber2: $it")
}
Мне нужно изменить приведенный выше код, чтобы он вывел следующее:
Subscriber1: Item 1 publisher1
Subscriber1: Item 1 publisher2
Subscriber1: Item 2 publisher2
// Subscriber2 subscribers here and receives the last item from each publisher
Subscriber2: Item 1 publisher1
Subscriber2: Item 2 publisher2
Есть ли простой способ кэшировать последний элемент для каждого издателя?