DeadLetterPublishingRecoverer не публикует «оригинальную полезную нагрузку» в теме .DLT - PullRequest
0 голосов
/ 17 октября 2019

Я использую spring-kafka-2.2.9.RELEASE и kafka_2.12-2.3.0. Я пытался получить original payload в теме .DLT, но все, что я получаю, это "ноль". Я уверен, что это можно сделать с помощью ErrorHandlingDeserializer2, SeekToCurrentErrorHandler и DeadLetterPublishingRecoverer, но я не уверен, чего мне не хватает.

Производитель и потребитель

Producer:
    @Autowired
    private ObjectMapper objectMapper;

    @Bean
    public ProducerFactory<Object, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put("acks","all");

        return new DefaultKafkaProducerFactory<Object, Object>(props, new JsonSerializer<Object>(objectMapper), new JsonSerializer<>());
        //return new DefaultKafkaProducerFactory<Object, Object>(props, new JsonSerializer<Object>(objectMapper), new JsonSerializer<Object>(objectMapper));
    }

    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate() {
        return new KafkaTemplate<Object, Object>(producerFactory());
    }

Consumer:
@Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(objectMapper);
        jsonDeserializer.addTrustedPackages("*");
        ErrorHandlingDeserializer2<Object> errorHandlingDeserializerKey = new ErrorHandlingDeserializer2<>(jsonDeserializer);
        ErrorHandlingDeserializer2<Object> errorHandlingDeserializerValue = new ErrorHandlingDeserializer2<>(jsonDeserializer);     

        return new DefaultKafkaConsumerFactory<>(props, errorHandlingDeserializerKey, errorHandlingDeserializerValue);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), 3));

        return factory;
    }

Я вижу "null" в теме. DLT:

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic.DLT --from-beginning
null
null

Заранее благодарен за любую помощь.

Ответы [ 2 ]

1 голос
/ 17 октября 2019

В настоящее время нельзя использовать записи DLT (опубликованные из-за исключения сериализации) с @KafkaListener в 2.2.x, поскольку контейнер обнаруживает заголовок DeserializationException, поэтому сама запись DLT отправляется обработчику ошибок. .

Я только что исправил это , и он будет доступен в 2.2.11.

Однако, если вы используете обычный KafkaConsumer, вы можете использовать этот код дляполучить исходное значение ...

Header exHeader = record.headers().lastHeader(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER);
DeserializationException ex = (DeserializationException) new ObjectInputStream(
        new ByteArrayInputStream(exHeader.value())).readObject();
System.out.println("DLT: " + new String(ex.getData()));

Примечание: если вы используете функцию, предложенную Артемом, запись будет отправлена ​​вашему главному слушателю, а не обработчику ошибок, поэтому вам придется иметь дело с ней там,Поэтому вам понадобится какое-то фиктивное значение, содержащее исходную полезную нагрузку.

Конечно, проще перейти на 2.3.1.

0 голосов
/ 17 октября 2019

В ErrorHandlingDeserializer2 есть логика, такая как:

private T recoverFromSupplier(String topic, Headers headers, byte[] data, Exception exception) {
    if (this.failedDeserializationFunction != null) {
        FailedDeserializationInfo failedDeserializationInfo =
                new FailedDeserializationInfo(topic, headers, data, this.isForKey, exception);
        return this.failedDeserializationFunction.apply(failedDeserializationInfo);
    }
    else {
        return null;
    }
}

Итак, если failedDeserializationFunction не предоставлено, данные десериализации возвращаются как null.

См. Документыдля получения дополнительной информации: https://docs.spring.io/spring-kafka/docs/2.3.1.RELEASE/reference/html/#error-handling-deserializer

Если делегату не удается десериализовать содержимое записи, ErrorHandlingDeserializer2 возвращает нулевое значение и DeserializationException в заголовке, который содержит причину и необработанные байты,

...