Я пытаюсь написать стойкого потребителя Кафки. Если при обработке сообщения в методе слушателя возникает исключение, я бы хотел повторить попытку. Я хотел бы повторить несколько раз для определенных исключений, всегда для определенных исключений и никогда для остальных. Я прочитал документы Spring, касающиеся SeekToCurrentErrorHandler, но не уверен на 100%, как его реализовать.
Я подклассифицировал ExceptionClassifierRetryPolicy и возвращаю соответствующую Политику повтора, основанную на исключениях, возникающих в методе Listener.
Я создал RetryTemplate и установил его RetryPolicy с моей собственной реализацией в подклассе.
Я установил retryTemplate для контейнера Kafka. Я установил обработчик ошибок как новый SeekToCurrentHandler, а также установил свойство повторения с сохранением состояния в true.
Метод прослушивателя
@KafkaListener(topics = "topicName", containerFactory = "containerFactory")
public void listenToKafkaTopic(@Payload Message<SomeAvroGeneratedClass> message, Acknowledgement ack){
SomeAvroGeneratedClass obj = message.getPayLoad();
type.processIncomingMessage();
ack.acknowledge();
}
Класс настраиваемой политики повтора
@Component
public class MyRetryPolicy extends ExceptionClassifierRetryPolicy
{
@PostConstruct
public void init(){
final SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(8);
this.setExceptionClassifier( classifiable ->
{
// Always Retry when instanceOf TransientDataAccessException
if( classifiable.getCause() instanceof TransientDataAccessException)
{
return new AlwaysRetryPolicy();
}
else if(classifiable.getCause() instanceOf NonTransientDataAccessException)
{
return new NeverRetryPolicy();
}
else
{
return simpleRetryPolicy;
}
} );
}}
Повторить попытку шаблона и контейнера
@Configuration
public class RetryConfig{
@Bean
public RetryTemplate retryTemplate(@Autowired ConcurrentKafkaListenerContainerFactory factory){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new MyRetryPolicy());
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy()
fixedBackOffPolicy.setBackOffPeriod(2000l);
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
factory.setRetryTemplate(retryTemplate);
factory.setAckOnError(false);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
factory.setStateFulRetry(true);
factory.setRecoveryCallback(//configure recovery after retries are exhausted and commit offset
);
}
}
Свойства прослушивателя:
1) AckMode = Manual 2) auto.offset.commit = false
Вопрос:
1) Могу ли я достичь своей логики повторения, определенной в MyRetryPolicy, без восстановления баланса потребителя при возврате AlwaysRetryPolicy? Если нет, пожалуйста, направьте меня по правильному пути.
2) Правильный ли мой подход в использовании обработчика ошибок и повторных попыток?