Я использую RxJava2 Flowables, подписавшись на поток событий из PublishSubject. Он используется в приложении уровня предприятия, и у нас нет выбора удаления каких-либо событий.
Я использую версию RxJava 2.2.8
Я использую BackpressureStrategy.BUFFER, так как не хочу терять ни одно из моих событий.
Кроме того, я снова буферизую на 50000 или на 3 минуты, в зависимости от того, что наступит раньше. Это я делаю так, как хочу консолидировать события и затем обрабатывать их.
Но через несколько минут моего запуска я получаю следующие ошибки
io.reactivex.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
at io.reactivex.internal.subscribers.QueueDrainSubscriber.fastPathOrderedEmitMax(QueueDrainSubscriber.java:121)
at io.reactivex.internal.operators.flowable.FlowableBufferTimed$BufferExactBoundedSubscriber.run(FlowableBufferTimed.java:569)
at io.reactivex.Scheduler$Worker$PeriodicTask.run(Scheduler.java:479)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
Я попытался увеличить размер буфера путем настройки, но в поведении ничего не изменилось.
System.setProperty("rx2.buffer-size", "524288");
Также, если я буферирую в течение более длительного времени вместо 3 минут, я получаю исключение через гораздо более длительное время, вероятно, потому что мой нисходящий поток работает лучше, когда события консолидируются больше. Тем не менее, у меня нет такого выбора, потому что это живые события, требующие немедленной обработки (через 3-5 минут).
Я также пытался использовать thread.sleep () перед вызовом «subscription.next» в случае ошибки, но все равно получал те же результаты.
keySubject.hide()
.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() {
@Override
public void onSubscribe(Subscription var1) {
innerSubscription = var1;
innerSubscription.request(1L);
}
@Override
public void onNext(List<String> logs) {
Subscription.request(1L);
/// Do some logic here
}
Я хочу знать, как мне справиться с противодавлением, чтобы избежать этого исключения? Это исключение из-за метода ".buffer"
Есть ли способ для меня, чтобы проверить состояние этих буферов. Кроме того, даже если я увеличу размер rx2.buffer, я все равно получу исключение за то же время. В идеале, система должна работать дольше с большим размером буфера, если исключение, потому что, если буфер заполняется.
Будет полезна любая помощь по причине этого сообщения «Не удалось создать буфер из-за отсутствия запросов в».