У меня есть издатель, который может опубликовать sh быстрее, чем подписчик может обработать данные. Чтобы справиться с этим, я начал работать с противодавлением. Поскольку я не хочу отбрасывать какие-либо данные, я использую противодействующее противодавление. Я понял это как возможность подписчика сообщать издателю, когда публиковать sh больше данных, как описано в этом и следующих параграфах .
Издатель - это Flowable, который выполняет свою работу асинхронно параллельно и впоследствии объединяется в последовательный Flowable. Данные должны буферизоваться до 10 элементов, и когда этот буфер заполнен, Flowable не должен публиковать sh больше данных и ждать следующего запроса.
Подписчик является DisposableSubscriber, который запрашивает 10 элементов при запуске. Каждый потребляемый предмет требует некоторых вычислений, и после этого будет запрошен новый предмет.
Мой MWE выглядит следующим образом:
List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
src.add(i);
}
Flowable.fromIterable(src)
.parallel(10, 1)
.runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
.flatMap(i -> Single.fromCallable(() -> {
System.out.println("publisher: " + i);
Thread.sleep(200);
return i;
}).toFlowable())
.sequential(1)
.onBackpressureBuffer(10)
.observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.newThread())
.doOnError(Throwable::printStackTrace)
.subscribeWith(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(10);
}
@Override
public void onNext(Integer integer) {
System.out.println("subscriber: " + integer);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Я ожидаю, что этот код сделает следующее: подписчик запрашивает первые 10 элементов. Издатель публикует первые 10 статей. Затем подписчик выполняет вычисления в onNext
и запрашивает больше элементов, которые издатель опубликует sh.
Что на самом деле происходит: сначала издатель, кажется, безгранично публикует sh элементов. В какой-то момент, например, после 14 опубликованных элементов, подписчик обрабатывает свой первый элемент. Пока это происходит, издатель продолжает публиковать sh элементов. После примерно 30 опубликованных материалов выдается io.reactivex.exceptions.MissingBackpressureException: Buffer is full
и поток заканчивается.
Мой вопрос : что я делаю не так? Как я могу позволить подписчику контролировать, если и когда издатель публикует данные? Очевидно, я делаю что-то ужасно неправильно. В противном случае ожидание не будет таким отличным от реальности.
Пример вывода вышеуказанного MWE:
publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full