Получать сообщения от topi c и повторять с экспоненциальным откатом до тех пор, пока Spring-Kafka-2.3.0 и выше не будет успешным - PullRequest
0 голосов
/ 20 января 2020

Ниже приведены конфигурации

@Bean
    @Autowired
    public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> kafkaListContFactory(@Qualifier("retryTemplate") RetryTemplate retryTemplate, @Qualifier("batchErrorHandler") ErrorHandler errorHandler, @Qualifier("batchErrorHandler") BatchErrorHandler batchErrorHandler) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setErrorHandler(errorHandler);
        factory.getContainerProperties().setAckOnError(false);
        factory.setStatefulRetry(true);
        factory.setRetryTemplate(retryTemplate);
    }

**Retry config**

 @Bean("retryTemplate")
    public RetryTemplate retryTemplate() {
        final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(delay);
        backOffPolicy.setMultiplier(multiplier);
        backOffPolicy.setMaxInterval(20000);
        final RetryTemplate template = new RetryTemplate();
        template.setRetryPolicy(new AlwaysRetryPolicy());
        template.setBackOffPolicy(backOffPolicy);
        return template;
    }

SeekToCurrentErrorHandler config
I do not want to recover and try to retry till it succeeds so I have given maxAttempts to -1
 @Bean("errorHandler")
    public ErrorHandler errorHandler() {
        final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((r, t) -> {
            if (t != null && (t instanceof RetryServiceException || t.getCause() instanceof RetryServiceException)) {
                logger.error("SeekToCurrentErrorHandler recoverer failure", t.getMessage());
                throw new RetryServiceException("SeekToCurrentErrorHandler recoverer failure");
            }
        }, -1);
        return handler;
    }

И, наконец, я подтверждаю в методе @KafkaListener, когда исключение не происходит.

У меня вопрос, настроил ли я -1 как максимум попыток, и мой Обработчик ошибок позаботится о повторных попытках. Нужно ли мне retryTemplate? Но повторных попыток не происходит в течение бесконечного времени, и проблема в том, что если я получаю пакетную запись, я обрабатываю те же сообщения, если одно из сообщений не удается в опросе, все они будут обработаны повторно.

Мне нужно использовать batchErrorHandler и реализовать экспоненциальную стратегию отката, чтобы повторные попытки выполнялись с сохранением состояния и избегали повторной обработки одних и тех же успешных сообщений. Может ли кто-нибудь помочь с вышеупомянутой проблемой.

И мне нужно избежать перебалансировки раздела из-за неправильного использования max.poll.interval.ms

1 Ответ

1 голос
/ 20 января 2020

Вам необходимо использовать Stateful Retry , чтобы избежать перебалансировки; однако в современных версиях вам вообще не нужна повторная попытка на уровне слушателя, поскольку теперь вы можете выполнять повторные попытки и откатываться на уровне обработчика ошибок.

Использование конструктора take занимает BackOff и удаляет шаблон повторных попыток контейнера.

/**
 * Construct an instance with the provided recoverer which will be called after
 * the backOff returns STOP for a topic/partition/offset.
 * @param recoverer the recoverer; if null, the default (logging) recoverer is used.
 * @param backOff the {@link BackOff}.
 * @since 2.3
 */
public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {

Мне нужно использовать batchErrorHandler и реализовать экспоненциальную стратегию отката, чтобы повторные попытки выполнялись с учетом состояния и повторной обработки одних и тех же успешных сообщений. Может ли кто-нибудь помочь с вышеуказанной проблемой.

Платформа не может помочь с пакетными прослушивателями, поскольку она не знает, где (в пакете) произошла ошибка.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...