Проблема ( упрощенная ): у меня есть много событий, которые необходимо опубликовать в RabbitMQ, для эффективности я хочу пакетировать сообщения, а затем публиковать их.События находятся в нескольких потоках и создаются одновременно
с использованием реактора 3.2.8.release
eventProcessor = EmitterProcessor.create(batchingProperties.getOverflowBufferSize(), true);
eventSink = eventProcessor.sink(FluxSink.OverflowStrategy.LATEST);
eventProcessor
.bufferTimeout(batchingProperties.getBufferSize(),
Duration.ofSeconds(batchingProperties.getBufferSeconds()),
eventPublisherScheduler)
.parallel()
.runOn(eventPublisherScheduler)
.map(list -> batchFunction.apply(list))
.subscribe(this::processEventList, this::processEventError, this::processComplete);
, и я просто так публикуюсь из нескольких потоков
eventSink.next(event);
Это не работает, и я не уверен, как заставить это работать, потому что я получаю пропущенные сообщения, которые отбрасываются молча.
Вопрос 1: Как правильно распечатать пропущенные сообщения?
Если я добавлю
.onBackpressureBuffer(100, item -> {
LOGGER.error("dropping {}", item);
})
до bufferTimeout, тогда я получу ошибку:
реактор.core.Exceptions $ OverflowException: получатель переполнен большим количеством сигналов, чем ожидалось (ограниченная очередь ...)
Вопрос 2: Как правильно реализовать это, ясно, как я это делаю, не работает?
Вопрос 3: Как мне сделать так, чтобы он блокировал поток, вызывающий eventSink.next (событие) вместо получения исключения overflowException?