Пакетный издатель с Reactor или RxJava - PullRequest
0 голосов
/ 26 июня 2019

Проблема ( упрощенная ): у меня есть много событий, которые необходимо опубликовать в RabbitMQ, для эффективности я хочу пакетировать сообщения, а затем публиковать их.События находятся в нескольких потоках и создаются одновременно

с использованием реактора 3.2.8.release


eventProcessor = EmitterProcessor.create(batchingProperties.getOverflowBufferSize(), true);
eventSink = eventProcessor.sink(FluxSink.OverflowStrategy.LATEST);
eventProcessor
  .bufferTimeout(batchingProperties.getBufferSize(),
     Duration.ofSeconds(batchingProperties.getBufferSeconds()), 
     eventPublisherScheduler)
   .parallel()
   .runOn(eventPublisherScheduler)
   .map(list -> batchFunction.apply(list))
   .subscribe(this::processEventList, this::processEventError, this::processComplete);

, и я просто так публикуюсь из нескольких потоков

eventSink.next(event);

Это не работает, и я не уверен, как заставить это работать, потому что я получаю пропущенные сообщения, которые отбрасываются молча.

Вопрос 1: Как правильно распечатать пропущенные сообщения?

Если я добавлю

.onBackpressureBuffer(100, item -> {
  LOGGER.error("dropping {}", item);
 })

до bufferTimeout, тогда я получу ошибку:

реактор.core.Exceptions $ OverflowException: получатель переполнен большим количеством сигналов, чем ожидалось (ограниченная очередь ...)

Вопрос 2: Как правильно реализовать это, ясно, как я это делаю, не работает?

Вопрос 3: Как мне сделать так, чтобы он блокировал поток, вызывающий eventSink.next (событие) вместо получения исключения overflowException?

...