Прежде всего, вы используете poll
метод. Он не блокируется и возвращает null
, если очередь пуста. Вы зацикливаете коллекцию до первого null
(т. Е. while (next != null)
, поэтому ваш код выходит из цикла практически сразу, поскольку очередь при запуске пуста. Вы должны заменить poll
на take
, который блокирует и будет ждать, пока элемент не станет доступен.
Во-вторых, hookOnNext
вызывается, когда событие удаляется из очереди. Однако вы пытаетесь снова прочитать события, используя batchOfEvents.addAll(events);
. Кроме того, вы также очищаете все ожидающие события events.clear();
Советую удалить прямой доступ к коллекции events
из метода hookOnNext
.
Почему вы вообще здесь используете Flux? Кажется слишком сложным. Вы можете использовать простую нить здесь
@Autowired
public EventController(FirehosePutService firehosePutService) {
this.firehosePutService = firehosePutService;
Thread persister = new Thread(() -> {
List<String> batchOfEvents = new ArrayList<>(batchSize);
String next;
while (( next = events.take()) != null) {
batchOfEvents.add(value);
if (batchOfEvents.size() == batchSize) {
log.info("Consume {} elements. Size of batchOfEvents={}", consumed, batchOfEvents.size());
firehosePutService.saveBulk(batchOfEvents);
batchOfEvents.clear();
}
}
});
persister.start();
}