Reactor UnicastProcessor сбрасывает на противодавление, когда очередь заполнена - PullRequest
0 голосов
/ 12 февраля 2020

Я пытаюсь использовать UnicastProcessor, чтобы разрешить определенное количество событий в очереди (для обработки всплесков нагрузки и чтобы иметь возможность проверить размер очереди). Проблема в том, что противодавление не работает должным образом.

В соответствии с docs поддерживается обратное давление:

Если эта очередь ограничена процессор может отклонить значение pu sh, когда буфер заполнен и недостаточно запросов от нисходящего потока получено.

В этом ограниченном случае вы также можете создать процессор с обратным вызовом, который вызывается на каждом отклоненном элементе, позволяя очистить эти отклоненные элементы.

Вот игрушечный пример:

    Flux<Long> interval = Flux.interval(Duration.ZERO, Duration.ofMillis(100));
    BlockingQueue<Long> queue = new LinkedBlockingQueue<>(10);
    UnicastProcessor<Long> processor = UnicastProcessor.create(queue,
            i -> {
                // only invoked once
                log.error("Dropping in UnicastProcessor i={}, QueueSize={}", i, queue.size());
            }, () -> {

            }
    );
    // FluxSink<Long> sink = processor.sink(FluxSink.OverflowStrategy.DROP); // using a sink facade doesn't seem to help either

    interval
        .publishOn(Schedulers.newSingle("sending"))
        .doOnNext(i -> log.error("Sent i={}, QueueSize={}", i, queue.size()))
        .subscribe(processor);
        // .doOnNext(sink::next)
        // .subscribe();

    processor
        // Adding backpressure here makes the queue size always zero
        // .onBackpressureDrop(i -> log.error("Dropping after UnicastProcessor i={}, QueueSize={}", i, queue.size()))
        .publishOn(Schedulers.newSingle("receiving"))
        .doOnNext(i -> log.error("Received i={}, QueueSize={}", i, queue.size()))
        .doOnNext(i -> {
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        })
        .subscribe();

    sleep(20000); // Keep running long enough to see the exception

Идеальное поведение - заполнить очередь, если подписчик не достаточно быстр и отбрасывает что-либо еще входящее.

Вот некоторые журналы (... указывает на похожий журнал с увеличенными значениями i и тем же QueueSize):

[sending-1] ERROR Test - Sent i=0, QueueSize=0  
[receiving-2] ERROR Test - Received i=0, QueueSize=0
[sending-1] ERROR Test - Sent i=1, QueueSize=0  
...
[sending-1] ERROR Test - Sent i=10, QueueSize=9 
[receiving-2] ERROR Test - Received i=1, QueueSize=9
[sending-1] ERROR Test - Sent i=11, QueueSize=9 
[sending-1] ERROR Test - Sent i=12, QueueSize=10
[sending-1] ERROR Test - Dropping in UnicastProcessor i=12, QueueSize=10
[sending-1] ERROR Test - Sent i=13, QueueSize=10
...
[sending-1] ERROR Test - Sent i=20, QueueSize=10
[receiving-2] ERROR Test - Received i=2, QueueSize=9
[sending-1] ERROR Test - Sent i=21, QueueSize=9
[sending-1] ERROR Test - Sent i=22, QueueSize=9
...
[receiving-2] ERROR Test - Received i=10, QueueSize=1
[sending-1] ERROR Test - Sent i=101, QueueSize=1
...
[sending-1] ERROR Test - Sent i=110, QueueSize=1
[receiving-2] ERROR Test - Received i=11, QueueSize=0
[sending-1] ERROR Test - Sent i=111, QueueSize=0
...
[sending-1] ERROR Test - Sent i=120, QueueSize=0
[receiving-2] ERROR reactor.core.scheduler.Schedulers - Scheduler worker in group main failed with an uncaught exception
    reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
    Caused by: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
        at reactor.core.Exceptions.failWithOverflow(Exceptions.java:202) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.publisher.UnicastProcessor.onNext(UnicastProcessor.java:373) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:398) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) [reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) [reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_232]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_232]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_232]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
[sending-1] ERROR Test - Sent i=121, QueueSize=0
[sending-1] ERROR Test - Sent i=122, QueueSize=0
...

Возможно .subscribe(processor) на interval не реализует противодавление?

Для чего я на самом деле пытаюсь использовать это, для очереди входящих пакетов из Reactor Netty, поэтому я отметил это вопрос, если кто-то пытался сделать что-то подобное.

...