Чтобы очистить кэш, необходимо выполнить восстановление в шаблоне повторных попыток, а не в обработчике ошибок.
@SpringBootApplication
public class So56846940Application {
public static void main(String[] args) {
SpringApplication.run(So56846940Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so56846940").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topicDLT() {
return TopicBuilder.name("so56846940.DLT").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
DeadLetterPublishingRecoverer recoverer) {
factory.setRetryTemplate(new RetryTemplate());
factory.setStatefulRetry(true);
factory.setRecoveryCallback(context -> {
recoverer.accept((ConsumerRecord<?, ?>) context.getAttribute("record"),
(Exception) context.getLastThrowable());
return null;
});
return args -> IntStream.range(0, 5000).forEach(i -> template.send("so56846940", "foo"));
}
@KafkaListener(id = "so56846940", topics = "so56846940")
public void listen(String in) {
System.out.println(in);
throw new RuntimeException();
}
@Bean
public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<String, String> template) {
return new DeadLetterPublishingRecoverer(template);
}
@Bean
public SeekToCurrentErrorHandler eh() {
return new SeekToCurrentErrorHandler(4);
}
}
Обработчик ошибок должен повторять как минимум столько же раз, сколько шаблон повторных попыток, чтобычто повторные попытки исчерпаны, и мы очищаем кэш.
Вам также следует настроить RetryTemplate с теми же исключениями, не требующими повторного запуска, как и в обработчике ошибок.
Мы разъясним это в справочном руководстве.