Я новичок в реакторе ... и я хотел использовать его для потребления от Кафки (что с другой стороны, я хорошо знаю).
Итак, я хотел настроить конвейер, который повторяет попытку в случае ошибки. Я прочитал в документации, что ошибка в реакторе является терминальной операцией ... даже при использовании повторных попыток и отката двигатель будет повторно подписываться на поток. Я использую это, чтобы потреблять из topi c, следуя примеру потока на раздел:
Scheduler scheduler = Schedulers.newBoundedElastic(
processingThreads,
100,
"KafkaProcessingPool",
60);
this.receiverRecordFlux
.groupBy(r -> r.partition() % processingThreads)
.flatMap(groupedFlux -> {
var consumer = makeConsumer();
return groupedFlux.publishOn(scheduler)
.map(r -> {
consumer.accept(r);
return r.receiverOffset();
})
.concatMap(ReceiverOffset::commit);
}).retryBackoff(Long.MAX_VALUE, Duration.ofMillis(25), Duration.ofSeconds(1))
.doOnError(err -> LOG.warn("Error while consuming, will retry", err)).retry()
.subscribe();
приводит ли это к тому, что потребитель создается заново каждый раз, когда возникает ошибка (это звучит для меня безумно ) ...