Как исправить «MissingBackpressureException» - PullRequest
2 голосов
/ 15 мая 2019

Я использую RxJava2 Flowables, подписавшись на поток событий из PublishSubject. Он используется в приложении уровня предприятия, и у нас нет выбора удаления каких-либо событий. Я использую версию RxJava 2.2.8

Я использую BackpressureStrategy.BUFFER, так как не хочу терять ни одно из моих событий.

Кроме того, я снова буферизую на 50000 или на 3 минуты, в зависимости от того, что наступит раньше. Это я делаю так, как хочу консолидировать события и затем обрабатывать их.

Но через несколько минут моего запуска я получаю следующие ошибки

io.reactivex.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
at io.reactivex.internal.subscribers.QueueDrainSubscriber.fastPathOrderedEmitMax(QueueDrainSubscriber.java:121)
at io.reactivex.internal.operators.flowable.FlowableBufferTimed$BufferExactBoundedSubscriber.run(FlowableBufferTimed.java:569)
at io.reactivex.Scheduler$Worker$PeriodicTask.run(Scheduler.java:479)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)

Я попытался увеличить размер буфера путем настройки, но в поведении ничего не изменилось.

System.setProperty("rx2.buffer-size", "524288");

Также, если я буферирую в течение более длительного времени вместо 3 минут, я получаю исключение через гораздо более длительное время, вероятно, потому что мой нисходящий поток работает лучше, когда события консолидируются больше. Тем не менее, у меня нет такого выбора, потому что это живые события, требующие немедленной обработки (через 3-5 минут).

Я также пытался использовать thread.sleep () перед вызовом «subscription.next» в случае ошибки, но все равно получал те же результаты.

keySubject.hide()
.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() {

@Override
  public void onSubscribe(Subscription var1) {
   innerSubscription = var1;
innerSubscription.request(1L);
 }

@Override
public void onNext(List<String> logs) {
    Subscription.request(1L);

///   Do some logic here

}

Я хочу знать, как мне справиться с противодавлением, чтобы избежать этого исключения? Это исключение из-за метода ".buffer" Есть ли способ для меня, чтобы проверить состояние этих буферов. Кроме того, даже если я увеличу размер rx2.buffer, я все равно получу исключение за то же время. В идеале, система должна работать дольше с большим размером буфера, если исключение, потому что, если буфер заполняется.

Будет полезна любая помощь по причине этого сообщения «Не удалось создать буфер из-за отсутствия запросов в».

1 Ответ

0 голосов
/ 16 мая 2019

Дело в том, почему вы используете предмет, который не воспринимает противодавление? Вы используете это в качестве автобуса для бедного человека? Кроме того, предполагая, что e.getContents() - простой получатель, я полагаю, вы можете заменить весь этот блок

.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() { ... });

с

.flatMapIterable(e -> e.getContents())
.buffer(3,TimeUnit.MINUTES,50000)
.rebatchRequests(1)
.observeOn(Schedulers.computation())
.doOnNext(s -> /* Do some logic here */)
.subscribe();
...