нужна ваша помощь и руководство по этому поводу.
Я использовал spring-kafka версии 2.2.X в моем текущем проекте.
Обработка ошибок, которую я создал, выглядит следующим образом:
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> eventKafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new SeekToCurrentErrorHandler(createDeadLetterPublishingRecoverer(), 3));
return factory;
}
public DeadLetterPublishingRecoverer createDeadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(getEventKafkaTemplate(),
(record, ex) -> new TopicPartition("topic-undelivered", -1));
}
А затем я обновил все мои версии зависимостей проекта, такие как spring -boot и spring-kafka, до последней версии: 2.5.4 RELEASE
Я обнаружил что некоторые из методов устарели и изменены.
SeekToCurrentErrorHandler
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) -> {
// recover after 3 failures, woth no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
Мой вопрос в том, как создать DLQ с этими конфигурациями:
EDITED
@Bean("kafkaConsumer")
public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaConsumer() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(consumerConcurrencyCount);
factory.setErrorHandler(errorHandler());
return factory;
}
public SeekToCurrentErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler(
deadLetterPublishingRecoverer(),
new FixedBackOff(0L, 2L)
);
}
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer() {
return new DeadLetterPublishingRecoverer(
getEventKafkaTemplate(),
(record, ex) -> {
if (ex.getCause() instanceof BusinessException || ex.getCause() instanceof TechnicalException) {
return new TopicPartition("topic-undelivered", -1);
}
return new TopicPartition("topic-fail", -1);
});
}
public KafkaOperations<String, Object> getEventKafkaTemplate() { // producer to DLQ
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
}
Эти конфигурации работают, спасибо Гэри!
Заранее спасибо