Я попытался включить обработку ошибок spring-kafka с помощью аннотации.
Поэтому я создал бин kafkaListenerErrorHandler
.
@Bean
public KafkaListenerErrorHandler kafkaListenerErrorHandler(ErrorHandler errorHandler) {
KafkaListenerErrorHandler kafkaListenerErrorHandler = (message, exception) -> {
MessageHeaders headers = message.getHeaders();
ConsumerRecord<String, EventV1> consumerRecord = new ConsumerRecord<>(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class),
headers.get(KafkaHeaders.OFFSET, Long.class),
headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY, String.class),
(EventV1) message.getPayload());
errorHandler.handle(exception, Collections.singletonList(consumerRecord), headers.get(KafkaHeaders.CONSUMER, Consumer.class), messageListenerContainer);
return null;
};
return kafkaListenerErrorHandler;
}
и связал его с KafkaListener
на
@Autowired(required = false)
MessageListenerContainer messageListenerContainer;
@Autowired
private UserBO userBO;
@Bean
ErrorHandler errorHandler(KafkaTemplate kafkaTemplate) {
return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0, 2));
}
@KafkaListener(topics = "#{'${kafka.listener.topics}'.split(',')}", errorHandler = "kafkaListenerErrorHandler")
public void listen(ConsumerRecord<String, EventV1> message) {
log.info("Received Messasge: {}", message);
try {
EventV1 value = message.value();
userBO.getUserById(value.getCustomerId());
} catch (Exception e) {
throw new ListenerExecutionFailedException(e.getLocalizedMessage(), e);
}
}
Даже если кажется, что это работает, код кажется уродливым. kafkaListenerErrorHandler
создает новый ConsumerRecord для перенаправления его в ErrorHandler, а «messageListenerContainer» имеет значение null, потому что я не выяснил, как получить его в моем контексте.
Должно быть или должно быть больше элегантный способ соединить ErrorHandler с KafkaListenerErrorHandler
Я также добавил обработчик ошибок десериализации в мою настройку.
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
Спасибо за любой совет.