Я обнаружил ошибку при изменении DestionationResolver DeadLetterPublishingRecoverer.
Когда я использую:
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".ERR", cr.partition());
, он работает отлично.
Однако, если вы используете _ERR вместо из .ERR возникает ошибка:
2020-08-05 12:53:10,277 [kafka-producer-network-thread | producer-kafka-tx-group1.ABC_TEST_XPTO.0] WARN o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0] Error while fetching metadata with correlation id 7 : {ABC_TEST_XPTO_ERR=INVALID_TOPIC_EXCEPTION}
2020-08-05 12: 53: 10,278 [kafka-продюсер-сетевой-поток | Producer-kafka-tx-group1.ABC_TEST_XPTO.0] ERROR org. apache .kafka.clients.Metadata - [Производитель clientId = Producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId = kafka-tx-group1.ABC_TEST_XP .0] Ответ метаданных сообщил о недопустимых темах [ABC_TEST_XPTO_ERR] 2020-08-05 12:53: 10,309 [org.springframework.kafka.KafkaListenerEndpointContainer # 0-0- C -1] ОШИБКА osksLoggingProducerListener - Исключение при отправке сообщения с key = 'null' и payload = 'XPTOEvent (super = Event (id = CAPBA2548, destination = ABC_TEST_XPTO, he ...' 'to topi c ABC_TEST_XPTO_ERR и раздел 0: org. apache .kafka.common.errors. InvalidTopicException: недопустимые темы: [ABC_TEST_XPTO_ERR] 2020-08-05 12:53: 10,320 [org.springframework.kafka.KafkaListenerEndpointContainer # 0-0- C -1] ОШИБКА osklDeadLetterPublishingRecoverer публикации темы для Dead-letterRecord = ABC_TEST_XPTO_ERR, partition = 0, headers = RecordHeaders (headers = .. org.springframework.kafka.KafkaException: Ошибка отправки; вложенное исключение - это org. apache .kafka.common.errors.InvalidTopicException: недопустимые темы: [ABC_TEST_XPTO_ERR] в org.springframework.kafka.core.KafkaTemplate.doSend (KafkaTemplate. java: 573) в orrame.core.spring. .KafkaTemplate.send (KafkaTemplate. java: 388) по адресу org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publi sh (DeadLetterPublishingRecoverer. java: 290) по адресу org.springframework. DeadLetterPublishingRecoverer. java: 226) по адресу org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept (DeadLetterPublishingRecoverer. java: 54) по адресу org.springframework.kafka.lcistener.FailedRecoverer. сильный текст
В моих темах используется _ в середине имени, например ABC_TEST_XPTO, поэтому я хотел бы настроить topi для мертвой буквы c с _ERR, если возможно
Моя среда
Spring Boot 2.3.2.RELEASE
Spring-Kafka 2.5.3.RELEASE, но та же проблема возникает с 2.5.4.RELEASE
Java 11
private stati c final BiFunction DESTINATION_RESOLVER = (cr, e) -> new TopicPartition (cr.topi c () + "_ERR", cr.partition ());
@ Класс компонента ContainerFactoryConfigurer {
ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
ChainedKafkaTransactionManager<?, ?> tm,
KafkaTemplate<Object, Object> template) {
factory.getContainerProperties().setTransactionManager(tm);
DefaultAfterRollbackProcessor rollbackProcessor = new DefaultAfterRollbackProcessor((record, exception) -> {
}, new FixedBackOff(0L, Long.valueOf(maxAttemps)), template, true);
factory.setAfterRollbackProcessor(rollbackProcessor);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(Collections.singletonMap(Object.class, template), DESTINATION_RESOLVER), new FixedBackOff(0L, Long.valueOf(maxAttemps)));
errorHandler.setCommitRecovered(true);
errorHandler.setAckAfterHandle(true);
factory.setErrorHandler(errorHandler);
}
}
Спасибо DPG