Я реализовал потребителя Kafka с KafkaHandler
.Предполагается, что мой потребитель использует события, а затем отправляет запрос REST какой-то другой службе для каждого события.Я хочу повторить попытку, только если эта служба REST не работает.В противном случае я могу проигнорировать сбойное событие.
Моя фабрика контейнеров настроена следующим образом:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate());
factory.setConcurrency(3);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
Я использую ExceptionClassifierRetryPolicy
для установки исключений и соответствующих политик повторных попыток.
Все выглядит хорошо с повторной попыткой.Он повторяет попытку, когда я получаю ConnectException
, и игнорирует, когда я получаю IllegalArgumentException
.
Однако, в сценарии IllegalArgumentException
, SeekToCurrentErrorHandler
стремится вернуться к необработанному смещению (потому что он ищет обратно необработанноесообщения, в том числе сообщение об ошибке), которое заканчивается немедленной повторной попыткой сообщения об ошибке.Потребитель постоянно перемещается назад и вперед и повторяет попытки миллион раз.
Если бы у меня была возможность понять, какая запись не удалась в SeekToCurrentErrorHandler
, я бы создал пользовательскую реализацию SeekToCurrentErrorHandler
, чтобы проверить, нет ли сообщения об ошибке.повторяется или нет (с помощью поля thrownException
).Если это невозможно повторить, я бы удалил его из списка records
для поиска.
Есть идеи о том, как добиться этой функциональности?
Примечание: enable.auto.commit
установленоfalse
, auto.offset.reset
- earliest
.
Спасибо!