Заметно, что буферизует элементы, когда нет подписчиков, затем генерирует их, когда на него подписаны, а затем очищает буфер, когда отписывается? - PullRequest
2 голосов
/ 17 мая 2019

Я хочу заметить, что:

  1. Может генерировать предметы по требованию и никогда не завершится (горячая наблюдаемая?)
  2. Знает, когда у него есть подписчики или нет
  3. Если нет подписчиков, он будет буферизовать элементы, которые я говорю ему испускать
  4. При подписке он будет генерировать буферизованные элементы по порядку, затем очищать буфер, а затем продолжать позволять мне излучать большеitems
  5. Когда отписаться (подписчик удален?), он вернется к буферизации.

Также:

  1. Ожидается только ожидаемоебыть одним подписчиком за один раз

  2. Это не должно быть потокобезопасным

Вот что-то вроде псевдокода того, о чем я думаю -- У меня нет необходимых обратных вызовов, хотя, чтобы сделать это правильно.Также было бы хорошо, если бы я мог обернуть все это в Наблюдаемый или Предмет.

class RxEventSender {
    private val publishSubject = PublishSubject.create<Action>()

    val observable: Observable<Action> = publishSubject

    private val bufferedActions = arrayListOf<Action>()

    private var hasSubscribers = false

    fun send(action: Action) {
        if (hasSubscribers) {
            publishSubject.onNext(action)
        } else {
            bufferedActions.add(action)
        }
    }

    //Subject was subscribed to -- not a real callback
    fun onSubscribed() {
        hasSubscribers = true
        bufferedActions.forEach {action ->
            publishSubject.onNext(action)
        }
        bufferedActions.clear()
    }

    //Subject was unsubscribed -- not a real callback
    fun onUnsubscribed() {
        hasSubscribers = false
    }
}

Ответы [ 2 ]

0 голосов
/ 20 мая 2019

Потратив некоторое время, я думаю, что это работает достаточно хорошо для моих нужд. Он не очень хорошо обернут в тему или в Observable, но все, что мне было нужно, это отправлять предметы и подписываться.

class RxEventSender<T> {
    private val bufferedEvents = arrayListOf<T>()

    private val publishSubject = PublishSubject.create<T>()

    val observable: Observable<T> = publishSubject
            .mergeWith(Observable.fromIterable(bufferedEvents))
            .doOnDispose {
                bufferedEvents.clear()
            }

    fun send(event: T) {
        if (publishSubject.hasObservers()) {
            publishSubject.onNext(event)
        } else {
            bufferedEvents.add(event)
        }
    }
}
0 голосов
/ 20 мая 2019

Используйте ReplaySubject. У него есть неограниченные и ограниченные версии, если вас беспокоит слишком большой размер буфера.

...