MissingBackpressureException: не удалось создать буфер из-за отсутствия запросов - PullRequest
0 голосов
/ 19 марта 2020

Я получил сообщение об ошибке, включая MissingBackpressureException: Could not emit buffer due to lack of requests для Rx Java Flowable, но я изо всех сил пытаюсь создать простой тестовый пример, демонстрирующий проблему (поддерживая структуру Flowable).

Вот тест, который я пытаюсь собрать, который поддерживает те же этапы в конвейере:

int inputEvents=10000;

CountDownLatch completed = new CountDownLatch(1);
Flowable<List<String>> flowable = Flowable.<String>create(e -> {

    System.out.println(Thread.currentThread().getName() + ": Will send");
    for (int counter = 0; counter < inputEvents; counter++) {
        e.onNext("" + counter);
        Thread.sleep(5);
    }
    System.out.println(Thread.currentThread().getName() + ": Completed sending");
    e.onComplete();
}, BackpressureStrategy.DROP)
    .onBackpressureDrop(s -> System.out.println("Backpressure, dropping " + Arrays.asList(s)))
    .buffer(1, TimeUnit.SECONDS)
    .doOnNext(strings -> System.out.println("\t" + Thread.currentThread().getName() + ": Buffered: " + strings.size() + " items"))
    .observeOn(Schedulers.io(), false)
    .doOnNext(strings -> {
        System.out.println("\t" + "\t" + Thread.currentThread().getName() + ": Waiting: " + strings.size());
        Thread.sleep(5000);
    });

flowable
    .subscribe(s -> System.out.println("\t" + "\t" + "onNext: " + s.size()),
            error -> {
                throw new RuntimeException(error);
            },
            () -> {
                System.out.println("\t" + "\t" + "Complete");
                completed.countDown();
            });

completed.await();

В процессе производства мы получаем MissingBackpressureException: Could not emit buffer due to lack of requests со следующей трассировкой стека:

io.reactivex.rxjava3.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
        at io.reactivex.rxjava3.internal.subscribers.QueueDrainSubscriber.fastPathEmitMax(QueueDrainSubscriber.java:87)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableBufferTimed$BufferExactUnboundedSubscriber.run(FlowableBufferTimed.java:207)
        at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Так что я думаю, что это относится к нисходящей работе из буфера.

Однако, независимо от того, как долго я блокирую в doOnNext, я не могу воспроизвести проблему. Пример вывода:

main: Will send
    RxComputationThreadPool-1: Buffered: 197 items
        RxCachedThreadScheduler-1: Waiting: 197
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
        onNext: 197
        RxCachedThreadScheduler-1: Waiting: 196
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
        onNext: 196
        RxCachedThreadScheduler-1: Waiting: 197
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
...

Я ожидал, так как Thread.sleep(5000) занимает так много времени, мы получим обратное давление.

Есть ли способ смоделировать это, в идеале в тесте с использованием TestScheduler / TestSubscriber (чтобы избежать Thread.sleep() s)?

1 Ответ

1 голос
/ 20 марта 2020

Мне удалось воспроизвести исключение MissingBackpressureException, увеличив скорость, с которой отправляются ваши события, увеличив максимальное число событий и уменьшив скорость, с которой их обрабатывает потребитель.

Переполнение буфера буфер оператора observeOn(...) по умолчанию размером 128. Поскольку он получает новый список раз в секунду, обратное давление займет не менее пары минут, прежде чем он переполнится.

Обратите внимание, вы можете переопределить это размер буфера по умолчанию, передав его в виде аргумента observeOn(...).

Возвращаясь к обработке обратного давления, я думаю, что основной проблемой в вашем конвейере является оператор buffer(1, TimeUnit.SECONDS). Если вы посмотрите на javado c:

Противодавление: этот оператор не поддерживает противодавление, поскольку использует время. Он запрашивает Long.MAX_VALUE в восходящем направлении и не подчиняется нисходящим запросам.

В результате вышесказанного ваш onBackPressureDrop(...) никогда не вызывается. Я думаю, что вы исправите это, поместив onBackPressureDrop(...) после buffer(...). В результате вы получите сообщение Backpressure, dropping....

Вы можете проверить его с помощью: TestScheduler.advanceTimeBy(long, TimeUnit). Хотя я должен признать, я еще не пробовал.

...