Будет ли это заново создавать нового потребителя кафки каждый раз, когда возникает ошибка? - PullRequest
0 голосов
/ 04 февраля 2020

Я новичок в реакторе ... и я хотел использовать его для потребления от Кафки (что с другой стороны, я хорошо знаю).

Итак, я хотел настроить конвейер, который повторяет попытку в случае ошибки. Я прочитал в документации, что ошибка в реакторе является терминальной операцией ... даже при использовании повторных попыток и отката двигатель будет повторно подписываться на поток. Я использую это, чтобы потреблять из topi c, следуя примеру потока на раздел:

Scheduler scheduler = Schedulers.newBoundedElastic(
        processingThreads,
        100,
        "KafkaProcessingPool",
        60);
this.receiverRecordFlux
        .groupBy(r -> r.partition() % processingThreads)
        .flatMap(groupedFlux -> {
            var consumer = makeConsumer();
            return groupedFlux.publishOn(scheduler)

                    .map(r -> {
                        consumer.accept(r);
                        return r.receiverOffset();
                    })
                    .concatMap(ReceiverOffset::commit);
        }).retryBackoff(Long.MAX_VALUE, Duration.ofMillis(25), Duration.ofSeconds(1))
        .doOnError(err -> LOG.warn("Error while consuming, will retry", err)).retry()
        .subscribe();

приводит ли это к тому, что потребитель создается заново каждый раз, когда возникает ошибка (это звучит для меня безумно ) ...

...