Повторная попытка сохранения состояния с DeadLetterPublishingRecoverer, вызывающая RetryCacheCapacityExceededException - PullRequest
1 голос
/ 02 июля 2019

Моя фабрика контейнеров имеет SeekToCurrentErrorHandler, который использует DeadLetterPublishingRecoverer для публикации в DLT определенных исключений типа NotRetryableException и продолжает искать одно и то же смещение для исключений другого типа бесконечное число раз.При такой настройке после определенного количества полезных нагрузок, которые приводят к исключениям, не подлежащим повторению, карта, в которой хранится контекст повтора - MapRetryContextCache (spring-retry), переполняется, создавая исключение RetryCacheCapacityExceededException.На первый взгляд, повторные контексты сообщений, которые должны обрабатываться средством восстановления DLT, не удаляются из MapRetryContextCache.Либо это, либо моя конфигурация неверна.

SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer(kafkaTemplate),-1);
eh.addNotRetryableException(SomeNonRetryableException.class);
        eh.setCommitRecovered(true);
        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = getContainerFactory();
        factory.setErrorHandler(eh);
        factory.setRetryTemplate(retryTemplate);
        factory.setStatefulRetry(true);

1 Ответ

0 голосов
/ 02 июля 2019

Чтобы очистить кэш, необходимо выполнить восстановление в шаблоне повторных попыток, а не в обработчике ошибок.

@SpringBootApplication
public class So56846940Application {

    public static void main(String[] args) {
        SpringApplication.run(So56846940Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so56846940").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topicDLT() {
        return TopicBuilder.name("so56846940.DLT").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template,
            ConcurrentKafkaListenerContainerFactory<String, String> factory,
            DeadLetterPublishingRecoverer recoverer) {

        factory.setRetryTemplate(new RetryTemplate());
        factory.setStatefulRetry(true);
        factory.setRecoveryCallback(context -> {
            recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
                    (Exception) context.getLastThrowable());
            return null;
        });

        return args -> IntStream.range(0, 5000).forEach(i -> template.send("so56846940", "foo"));
    }

    @KafkaListener(id = "so56846940", topics = "so56846940")
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException();
    }

    @Bean
    public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<String, String> template) {
        return new DeadLetterPublishingRecoverer(template);
    }

    @Bean
    public SeekToCurrentErrorHandler eh() {
        return new SeekToCurrentErrorHandler(4);
    }

}

Обработчик ошибок должен повторять как минимум столько же раз, сколько шаблон повторных попыток, чтобычто повторные попытки исчерпаны, и мы очищаем кэш.

Вам также следует настроить RetryTemplate с теми же исключениями, не требующими повторного запуска, как и в обработчике ошибок.

Мы разъясним это в справочном руководстве.

...