У меня есть бесконечный поток (от kafka, использующего реактор-kafka) событий, которые я пытаюсь записать в виде пакетов в базу данных, прежде чем перейти к фактической обработке событий.Моя проблема состоит в том, чтобы заставить это работать с надлежащим противодавлением.
windowTimeout
и bufferTimeout
показались хорошими кандидатами, поскольку они позволили мне указать как максимальный размер, так и ограничить время ожидания в случае низкого «трафика».
Сначалаoff был windowTimeout
, из которого массовые записи были сделаны в БД.Это, однако, быстро оказалось проблематичным: реактор.кор. Исключения $ OverflowException: приемник переполнен большим количеством сигналов, чем ожидалось (ограниченная очередь ...)) .
Затем я переключаюсь наbufferTimeout
, но не удалось с ошибкой processor.core.Exceptions $ OverflowException: не удалось создать буфер из-за отсутствия запросов .
Я надеюсь, что следующее иллюстрирует поток, который япосле:
flux.groupBy(envelope -> envelope.partition)
.flatMap(partitionFlux -> {
final Flux<ConsumedEnvelope> elasticFlux = partitionFlux.publishOn(Schedulers.elastic());
final Flux<List<ConsumedEnvelope>> batchFlux = partitionFlux.bufferTimeout(100, Duration.ofMillis(500))
.concatMap(batch -> {
final ConsumedEnvelope last = batch.get(batch.size() - 1);
return repository.persist(batch) // a)
.then(last.acknowledge()) // b)
.thenReturn(batch);
});
return processing(batchFlux);
})
.subscribe(result -> {
// ...
});
a) repository.persist
внутренне ничего не делает, а выполняет итерацию пакета для создания операции вставки, а затем возвращает Mono<Void>
.
b) ConsumedEnvelope.acknowledge ()для смещения Кафки, что я хочу сделать только после успешного сохранения пакета.Все это заключено в concatMap
, чтобы обрабатывать только один пакет за раз для каждого раздела.
Как упоминалось выше, это приводит к исключению переполнения.Есть ли идиоматические способы достижения того, что я пытался описать?Мне кажется, что это не должно быть слишком необычной задачей, но я новичок в реакторе и хотел бы получить совет.
/ d
РЕДАКТИРОВАТЬ Я понял, что просто добавивonBackpressureBuffer
на самом деле решает это ОК для меня.Но в целом, есть ли лучшие способы сделать это?
РЕДАКТИРОВАТЬ 2 ... выше, конечно, вызвало проблемы из-за несвязанного спроса, который я почему-то пропустил.Итак, вернемся к исходной проблеме или, возможно, к тому, чтобы onBackpressureBuffer НЕ запрашивал несвязанный запрос, а только перенаправлял то, что запрашивается из нисходящего потока.