Что эквивалентно ReplaySubject (RxJava) в Reactor для весеннего webflux?.
Мне нужен горячий наблюдаемый канал, который может иметь N подписчиков и сохранять предыдущие опубликованные элементы, такие как ReplaySubject, в rxJava.
Спасибо за ваши комментарии, я использую Spring webflux.
Я не могу найти ничего подобного ReplaySubject в Reactor.
Если нет реализации ReplaySubject, как я могу это реализовать ?, у меня есть такой код:
class ReplayFlux<T> {
private val elements: List<T> = emptyList()
private val currentSubscriptions: List<FluxSink<T>> = emptyList()
fun publish(element: T){
elements.plus(element)
currentSubscriptions.forEach {
it.next(element)
}
}
fun subscribe(): Flux<T>{
return Flux.create {fluxSink ->
currentSubscriptions.plus(fluxSink)
elements.forEach {element ->
fluxSink.next(element)
}
}
}
fun finish(){ currentSubscriptions.forEach { it.complete() } }
}
Очевидно, это не потокобезопасно, у кого-то есть лучший чертенок. Если вы думаете, что это не так уж плохо, какой потокобезопасный структуры данных вы должны использовать в этом случае?