Мы используем много кода RxJava1 в нашем приложении Android. В последнее время мы начали получать много MissingBackpressureException
. Поэтому я попытался лучше понять механизм противодавления.
Я могу получить исключение противодавления
BehaviorSubject<Integer> subject = BehaviorSubject.create();
subject
.observeOn(Schedulers.computation())
.subscribe(x -> {
try {
logger.info("got " + x);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
for (int i = 0; true; ++i) {
logger.info("sending " + i);
subject.onNext(i);
}
Это здорово, я получаю MissingBackpressureException
, но когда я заставить действие subscribe
никогда не возвращаться, я больше не получаю MissingBackpressureException
, поэтому этот код:
subject
.observeOn(Schedulers.computation())
.subscribe(x -> {
while(true) {
try {
logger.info("got " + x);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Итак, у меня есть несколько вопросов здесь:
- Почему я Я не получаю
MissingBackpressureException
брошенного во 2-го подписчика? - Что происходит со всеми объектами? Я не вижу увеличения памяти, поэтому я предполагаю, что они выбрасываются? почему?
Когда я пытаюсь сделать это с RxJava2
, добавив toFlowable(BackpressureStrategy.ERROR)
к подписчику, я не получаю исключения ни в одном из случаев, что здесь происходит?
subject
.observeOn(Schedulers.computation())
.toFlowable(BackpressureStrategy.ERROR)
.subscribe(x -> {