Я пытаюсь преобразовать Observable, используя concatMap
, так как порядок важен для моего случая.
@Test
fun load_data() {
val sub = TestSubscriber<Long>()
var s = BehaviorSubject.create<Long>()
s.concatMap {
Observable.timer(it, TimeUnit.MILLISECONDS)
}
.take(4)
.subscribe(sub)
s.onNext(5)
s.onNext(6)
s.onNext(7)
s.onNext(8) //rx.exceptions.MissingBackpressureException
sub.awaitTerminalEvent(500, TimeUnit.MILLISECONDS)
sub.assertNoErrors()
}
Я изменил загрузку реальных данных на Observable.timer()
, чтобы упростить пример и упростить воспроизведение.
Я использую вприложение BehaviorSubject
для связи действий пользовательского интерфейса с rx
Из документации , особенно из мраморной диаграммы. Я ожидаю, что оно будет хранить элементы в очереди и преобразовывать их по одному,
Однако кажется, что concatMap
имеет очередь с размером, установленным только в 2 элемента.Добавление дополнительных элементов вызывает MissingBackpressureException
Поэтому у меня возникают следующие вопросы:
- Почему
concatMap
имеет размер очереди 2 вместо RxRingBuffer.SIZE
, как другие операторы? - Должен ли я, как правило, добавлять операторы
onBackpressure*
перед вызовом concatMap
, чтобы исключить исключение MissingBackpressureException
?