Spring Kafka JsonDesirialization MessageConversionException не удалось разрешить имя класса Класс не найден - PullRequest
0 голосов
/ 14 февраля 2019

У меня есть две службы, которые должны общаться через Kafka.Давайте назовем первый сервис WriteService , а второй сервис QueryService .

На стороне WriteService у меня есть следующая конфигурация для производителей.

@Configuration
public class KafkaProducerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Я пытаюсь отправить объект класса com.example.project.web.routes.dto.RouteDto

На стороне QueryService конфигурация потребителя определяется следующим образом.

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.groupid}")
    private String serviceGroupId;

    @Value("${spring.kafka.consumer.trusted-packages}")
    private String trustedPackage;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

}

Слушатель имеет следующее определение.Класс полезной нагрузки имеет полностью определенное имя - com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto

@KafkaListener(topics = "${spring.kafka.topics.routes}",
            containerFactory = "kafkaListenerContainerFactory")
    public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
                               @Payload RouteDto payload) {
        logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
    }

    private static String typeIdHeader(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false)
                .filter(header -> header.key().equals("__TypeId__"))
                .findFirst().map(header -> new String(header.value())).orElse("N/A");
    }

При отправке сообщения появляется следующая ошибка

Причина: org.springframework.messaging.converter.MessageConversionException: не удалось разрешить имя класса.Класс не найден [com.example.project.web.routes.dto.RouteDto];Вложенное исключение: java.lang.ClassNotFoundException: com.example.project.web.routes.dto.RouteDto

Ошибка достаточно ясна.Однако я не могу понять, почему у него такое поведение по умолчанию.Я не ожидаю иметь один и тот же пакет в разных сервисах, что не имеет никакого смысла.

Я не нашел способа отключить это и использовать класс, предоставленный слушателю, помеченный @Payload

Как это можно решить, не настраивая преобразователь вручную?

1 Ответ

0 голосов
/ 14 февраля 2019

Если вы используете spring-kafka-2.2.x, вы можете отключить заголовок по умолчанию с помощью перегруженных конструкторов JsonDeserializer docs

Начиная с версии 2.2, вы можетеявно сконфигурируйте десериализатор для использования предоставленного целевого типа и игнорируйте информацию о типе в заголовках, используя один из перегруженных конструкторов с логическим значением useHeadersIfPresent (который по умолчанию имеет значение true).В следующем примере показано, как это сделать:

DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
    new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

Если с более низкой версией используйте MessageConverter (вы можете увидеть эту проблему в spring-kafka-2.1.x и выше)

Spring для Apache Kafka предоставляет абстракцию MessageConverter с реализацией MessagingMessageConverter и его настройками StringJsonMessageConverter и BytesJsonMessageConverter.Вы можете внедрить MessageConverter в экземпляр KafkaTemplate напрямую и с помощью определения bean-компонента AbstractKafkaListenerContainerFactory для свойства @ KafkaListener.containerFactory ().В следующем примере показано, как это сделать:

@Bean
 public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
  }

  @KafkaListener(topics = "jsonData",
            containerFactory = "kafkaJsonListenerContainerFactory")
    public void jsonListener(RouteDto dto) {
     ...
   }

Примечание: Вывод этого типа может быть достигнут только тогда, когда аннотация @KafkaListener объявлена ​​на уровне метода.При использовании класса @KafkaListener на уровне класса тип полезной нагрузки используется для выбора метода @KafkaHandler для вызова, поэтому он должен быть уже преобразован до того, как метод будет выбран.

...