Что эквивалентно ReplaySubject (RxJava) в Reactor для весеннего webflux? - PullRequest
0 голосов
/ 08 января 2019

Что эквивалентно ReplaySubject (RxJava) в Reactor для весеннего webflux?.

Мне нужен горячий наблюдаемый канал, который может иметь N подписчиков и сохранять предыдущие опубликованные элементы, такие как ReplaySubject, в rxJava. Спасибо за ваши комментарии, я использую Spring webflux.

Я не могу найти ничего подобного ReplaySubject в Reactor.

Если нет реализации ReplaySubject, как я могу это реализовать ?, у меня есть такой код:

class ReplayFlux<T> {
private val elements: List<T> = emptyList()
private val currentSubscriptions: List<FluxSink<T>> = emptyList()

fun publish(element: T){
    elements.plus(element)
    currentSubscriptions.forEach { 
        it.next(element)
    }
}

fun subscribe(): Flux<T>{
    return Flux.create {fluxSink -> 
        currentSubscriptions.plus(fluxSink)
        elements.forEach {element -> 
            fluxSink.next(element)
        }
    }
}

fun finish(){ currentSubscriptions.forEach { it.complete() } }
}

Очевидно, это не потокобезопасно, у кого-то есть лучший чертенок. Если вы думаете, что это не так уж плохо, какой потокобезопасный структуры данных вы должны использовать в этом случае?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...