RxJava1 concatMap вызывает MissingBackpressureException - PullRequest
0 голосов
/ 10 мая 2018

Я пытаюсь преобразовать Observable, используя concatMap, так как порядок важен для моего случая.

@Test
fun load_data() {
    val sub = TestSubscriber<Long>()

    var s = BehaviorSubject.create<Long>()

    s.concatMap {
        Observable.timer(it, TimeUnit.MILLISECONDS)
    }
    .take(4)
    .subscribe(sub)

    s.onNext(5)
    s.onNext(6)
    s.onNext(7)
    s.onNext(8) //rx.exceptions.MissingBackpressureException

    sub.awaitTerminalEvent(500, TimeUnit.MILLISECONDS)
    sub.assertNoErrors()
}

Я изменил загрузку реальных данных на Observable.timer(), чтобы упростить пример и упростить воспроизведение.

Я использую вприложение BehaviorSubject для связи действий пользовательского интерфейса с rx

Из документации , особенно из мраморной диаграммы. Я ожидаю, что оно будет хранить элементы в очереди и преобразовывать их по одному,

Однако кажется, что concatMap имеет очередь с размером, установленным только в 2 элемента.Добавление дополнительных элементов вызывает MissingBackpressureException

Поэтому у меня возникают следующие вопросы:

  1. Почему concatMap имеет размер очереди 2 вместо RxRingBuffer.SIZE, как другие операторы?
  2. Должен ли я, как правило, добавлять операторы onBackpressure* перед вызовом concatMap, чтобы исключить исключение MissingBackpressureException?

1 Ответ

0 голосов
/ 11 мая 2018

Прежде чем я отвечу на вопросы, рассмотрите возможность перехода на RxJava 2, где это не будет проблемой с Observable.

Почему concatMap имеет размер очереди 2 вместо RxRingBuffer.SIZE, как и другиеоператоры имеют?

Оператор запускает по одному Observable за один раз, и нет никаких оснований для предварительной выборки более 1 заранее.

Должен ли я, как правило, добавитьлюбой из операторов onBackpressure * перед вызовом concatMap для предотвращения исключения MissingBackpressureException?

Да.

...