DeadLetterPublishingRecoverer - Публикация недоставленных писем завершилась ошибкой с исключением InvalidTopicException для имени topi c в TopicPartition, оканчивающемся на _ERR - PullRequest
0 голосов
/ 05 августа 2020

Я обнаружил ошибку при изменении DestionationResolver DeadLetterPublishingRecoverer.

Когда я использую:

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
        DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".ERR", cr.partition());

, он работает отлично.

Однако, если вы используете _ERR вместо из .ERR возникает ошибка:

2020-08-05 12:53:10,277 [kafka-producer-network-thread | producer-kafka-tx-group1.ABC_TEST_XPTO.0] WARN  o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId=kafka-tx-group1.ABC_TEST_XPTO.0] Error while fetching metadata with correlation id 7 : {ABC_TEST_XPTO_ERR=INVALID_TOPIC_EXCEPTION}

2020-08-05 12: 53: 10,278 [kafka-продюсер-сетевой-поток | Producer-kafka-tx-group1.ABC_TEST_XPTO.0] ERROR org. apache .kafka.clients.Metadata - [Производитель clientId = Producer-kafka-tx-group1.ABC_TEST_XPTO.0, transactionalId = kafka-tx-group1.ABC_TEST_XP .0] Ответ метаданных сообщил о недопустимых темах [ABC_TEST_XPTO_ERR] 2020-08-05 12:53: 10,309 [org.springframework.kafka.KafkaListenerEndpointContainer # 0-0- C -1] ОШИБКА osksLoggingProducerListener - Исключение при отправке сообщения с key = 'null' и payload = 'XPTOEvent (super = Event (id = CAPBA2548, destination = ABC_TEST_XPTO, he ...' 'to topi c ABC_TEST_XPTO_ERR и раздел 0: org. apache .kafka.common.errors. InvalidTopicException: недопустимые темы: [ABC_TEST_XPTO_ERR] 2020-08-05 12:53: 10,320 [org.springframework.kafka.KafkaListenerEndpointContainer # 0-0- C -1] ОШИБКА osklDeadLetterPublishingRecoverer публикации темы для Dead-letterRecord = ABC_TEST_XPTO_ERR, partition = 0, headers = RecordHeaders (headers = .. org.springframework.kafka.KafkaException: Ошибка отправки; вложенное исключение - это org. apache .kafka.common.errors.InvalidTopicException: недопустимые темы: [ABC_TEST_XPTO_ERR] в org.springframework.kafka.core.KafkaTemplate.doSend (KafkaTemplate. java: 573) в orrame.core.spring. .KafkaTemplate.send (KafkaTemplate. java: 388) по адресу org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publi sh (DeadLetterPublishingRecoverer. java: 290) по адресу org.springframework. DeadLetterPublishingRecoverer. java: 226) по адресу org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept (DeadLetterPublishingRecoverer. java: 54) по адресу org.springframework.kafka.lcistener.FailedRecoverer. сильный текст

В моих темах используется _ в середине имени, например ABC_TEST_XPTO, поэтому я хотел бы настроить topi для мертвой буквы c с _ERR, если возможно

Моя среда

  1. Spring Boot 2.3.2.RELEASE

  2. Spring-Kafka 2.5.3.RELEASE, но та же проблема возникает с 2.5.4.RELEASE

  3. Java 11

    private stati c final BiFunction DESTINATION_RESOLVER = (cr, e) -> new TopicPartition (cr.topi c () + "_ERR", cr.partition ());

    @ Класс компонента ContainerFactoryConfigurer {

    ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
                               ChainedKafkaTransactionManager<?, ?> tm,
                               KafkaTemplate<Object, Object> template) {
    
        factory.getContainerProperties().setTransactionManager(tm);
        DefaultAfterRollbackProcessor rollbackProcessor = new DefaultAfterRollbackProcessor((record, exception) -> {
        }, new FixedBackOff(0L, Long.valueOf(maxAttemps)), template, true);
        factory.setAfterRollbackProcessor(rollbackProcessor);
        SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer(Collections.singletonMap(Object.class, template), DESTINATION_RESOLVER), new FixedBackOff(0L, Long.valueOf(maxAttemps)));
        errorHandler.setCommitRecovered(true);
        errorHandler.setAckAfterHandle(true);
        factory.setErrorHandler(errorHandler);
    }
    

    }

Спасибо DPG

1 Ответ

0 голосов
/ 05 августа 2020

У меня это отлично работает ...

@SpringBootApplication
public class So63270367Application {

    public static void main(String[] args) {
        SpringApplication.run(So63270367Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("ABC_TEST_XPTO_ERR").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("ABC_TEST_XPTO_ERR", "foo");
    }

    @KafkaListener(id = "so63270367", topics = "ABC_TEST_XPTO_ERR")
    public void listen(String in) {
        System.out.println(in);
    }

}
spring.kafka.consumer.auto-offset-reset=earliest

Возможно, у ваших брокеров есть какие-то правила относительно имен c topi; может быть, посмотрите журналы брокера?

EDIT

Как я сказал в своем комментарии; не имеет значения, откуда опубликована запись; это все еще работает для меня ...

@SpringBootApplication
public class So63270367Application {

    public static void main(String[] args) {
        SpringApplication.run(So63270367Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("ABC_TEST_XPTO").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topicErr() {
        return TopicBuilder.name("ABC_TEST_XPTO_ERR").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> template.send("ABC_TEST_XPTO_ERR", "foo");
    }

    @KafkaListener(id = "so63270367", topics = "ABC_TEST_XPTO")
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @KafkaListener(id = "so63270367err", topics = "ABC_TEST_XPTO_ERR")
    public void listenErr(String in) {
        System.out.println("From DLT:" + in);
    }

    @Bean
    public SeekToCurrentErrorHandler eh(KafkaOperations<String, String> template) {
        return new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(
                template,
                (cr, e) -> new TopicPartition(cr.topic() + "_ERR", cr.partition())),
                new FixedBackOff(0L, 0L));
    }

}
From DLT:foo
...