При использовании повторения с отслеживанием состояния необходимо добавить SeekToCurrentErrorHandler
к фабрике контейнеров.
В текущей версии это будет
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, FixedBackOff.UNLIMITED_ATTEMPTS)));
См. документацию .
Вот полное приложение ...
@SpringBootApplication
public class So60210116Application {
private static final Logger LOG = LoggerFactory.getLogger(So60210116Application.class);
public static void main(String[] args) {
SpringApplication.run(So60210116Application.class, args);
}
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(context -> {
LOG.info(" In recovery callback method !!");
ConsumerRecord<?, ?> record = (ConsumerRecord) context
.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
Map conciseMap = (Map) record.value();
// publisher.sendMessage((String)conciseMap.get("messagePayload"));
((Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT))
.acknowledge();
return null;
});
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setStatefulRetry(true);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new FixedBackOff(0L, FixedBackOff.UNLIMITED_ATTEMPTS)));
return factory;
}
RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
fixedBackOffPolicy.setBackOffPeriod(500L);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(3,
Collections.singletonMap(RuntimeException.class, true));
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@KafkaListener(id = "so60210116", topics = "so60210116")
public void listen(String in) {
throw new RuntimeException("test");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so60210116").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so60210116", "foo");
};
}
}
В 2.2.x вместо
factory.setErrorHandler(new SeekToCurrentErrorHandler(Integer.MAX_VALUE));
.
С 2.3 и более поздние версии теперь можно исключить RetryTemplate
с соответствующей конфигурацией BackOff и recoverer в обработчике ошибок.