Я пытаюсь использовать UnicastProcessor, чтобы разрешить определенное количество событий в очереди (для обработки всплесков нагрузки и чтобы иметь возможность проверить размер очереди). Проблема в том, что противодавление не работает должным образом.
В соответствии с docs поддерживается обратное давление:
Если эта очередь ограничена процессор может отклонить значение pu sh, когда буфер заполнен и недостаточно запросов от нисходящего потока получено.
В этом ограниченном случае вы также можете создать процессор с обратным вызовом, который вызывается на каждом отклоненном элементе, позволяя очистить эти отклоненные элементы.
Вот игрушечный пример:
Flux<Long> interval = Flux.interval(Duration.ZERO, Duration.ofMillis(100));
BlockingQueue<Long> queue = new LinkedBlockingQueue<>(10);
UnicastProcessor<Long> processor = UnicastProcessor.create(queue,
i -> {
// only invoked once
log.error("Dropping in UnicastProcessor i={}, QueueSize={}", i, queue.size());
}, () -> {
}
);
// FluxSink<Long> sink = processor.sink(FluxSink.OverflowStrategy.DROP); // using a sink facade doesn't seem to help either
interval
.publishOn(Schedulers.newSingle("sending"))
.doOnNext(i -> log.error("Sent i={}, QueueSize={}", i, queue.size()))
.subscribe(processor);
// .doOnNext(sink::next)
// .subscribe();
processor
// Adding backpressure here makes the queue size always zero
// .onBackpressureDrop(i -> log.error("Dropping after UnicastProcessor i={}, QueueSize={}", i, queue.size()))
.publishOn(Schedulers.newSingle("receiving"))
.doOnNext(i -> log.error("Received i={}, QueueSize={}", i, queue.size()))
.doOnNext(i -> {
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
})
.subscribe();
sleep(20000); // Keep running long enough to see the exception
Идеальное поведение - заполнить очередь, если подписчик не достаточно быстр и отбрасывает что-либо еще входящее.
Вот некоторые журналы (...
указывает на похожий журнал с увеличенными значениями i
и тем же QueueSize):
[sending-1] ERROR Test - Sent i=0, QueueSize=0
[receiving-2] ERROR Test - Received i=0, QueueSize=0
[sending-1] ERROR Test - Sent i=1, QueueSize=0
...
[sending-1] ERROR Test - Sent i=10, QueueSize=9
[receiving-2] ERROR Test - Received i=1, QueueSize=9
[sending-1] ERROR Test - Sent i=11, QueueSize=9
[sending-1] ERROR Test - Sent i=12, QueueSize=10
[sending-1] ERROR Test - Dropping in UnicastProcessor i=12, QueueSize=10
[sending-1] ERROR Test - Sent i=13, QueueSize=10
...
[sending-1] ERROR Test - Sent i=20, QueueSize=10
[receiving-2] ERROR Test - Received i=2, QueueSize=9
[sending-1] ERROR Test - Sent i=21, QueueSize=9
[sending-1] ERROR Test - Sent i=22, QueueSize=9
...
[receiving-2] ERROR Test - Received i=10, QueueSize=1
[sending-1] ERROR Test - Sent i=101, QueueSize=1
...
[sending-1] ERROR Test - Sent i=110, QueueSize=1
[receiving-2] ERROR Test - Received i=11, QueueSize=0
[sending-1] ERROR Test - Sent i=111, QueueSize=0
...
[sending-1] ERROR Test - Sent i=120, QueueSize=0
[receiving-2] ERROR reactor.core.scheduler.Schedulers - Scheduler worker in group main failed with an uncaught exception
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
Caused by: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:202) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.UnicastProcessor.onNext(UnicastProcessor.java:373) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:398) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) [reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) [reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_232]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_232]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_232]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
[sending-1] ERROR Test - Sent i=121, QueueSize=0
[sending-1] ERROR Test - Sent i=122, QueueSize=0
...
Возможно .subscribe(processor)
на interval
не реализует противодавление?
Для чего я на самом деле пытаюсь использовать это, для очереди входящих пакетов из Reactor Netty, поэтому я отметил это вопрос, если кто-то пытался сделать что-то подобное.