У меня есть две службы, которые должны общаться через Kafka
.Давайте назовем первый сервис WriteService , а второй сервис QueryService .
На стороне WriteService у меня есть следующая конфигурация для производителей.
@Configuration
public class KafkaProducerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Я пытаюсь отправить объект класса com.example.project.web.routes.dto.RouteDto
На стороне QueryService конфигурация потребителя определяется следующим образом.
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.groupid}")
private String serviceGroupId;
@Value("${spring.kafka.consumer.trusted-packages}")
private String trustedPackage;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Слушатель имеет следующее определение.Класс полезной нагрузки имеет полностью определенное имя - com.example.project.clientqueryview.module.routes.messaging.kafka.RouteDto
@KafkaListener(topics = "${spring.kafka.topics.routes}",
containerFactory = "kafkaListenerContainerFactory")
public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
@Payload RouteDto payload) {
logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
}
private static String typeIdHeader(Headers headers) {
return StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals("__TypeId__"))
.findFirst().map(header -> new String(header.value())).orElse("N/A");
}
При отправке сообщения появляется следующая ошибка
Причина: org.springframework.messaging.converter.MessageConversionException: не удалось разрешить имя класса.Класс не найден [com.example.project.web.routes.dto.RouteDto];Вложенное исключение: java.lang.ClassNotFoundException: com.example.project.web.routes.dto.RouteDto
Ошибка достаточно ясна.Однако я не могу понять, почему у него такое поведение по умолчанию.Я не ожидаю иметь один и тот же пакет в разных сервисах, что не имеет никакого смысла.
Я не нашел способа отключить это и использовать класс, предоставленный слушателю, помеченный @Payload
Как это можно решить, не настраивая преобразователь вручную?