Я работаю над приложением распределенных микросервисов, которое использует Kafka для внутренней связи. Приложения обмениваются POJO по темам. Когда производитель отправляет сообщение потребителю, по умолчанию добавляется заголовок, указывающий имя пакета и имя класса объекта в полезной нагрузке. Затем приложение-потребитель использует эту информацию для десериализации полезной нагрузки. Но это требует от меня определения одного и того же класса в одном пакете в обоих приложениях, что не дает мне хорошего дизайна. Если я установил конфигурацию (JsonSerializer.ADD_TYPE_INFO_HEADERS) на стороне производителя, чтобы не отправлять тип в заголовке, это приведет к ошибке на стороне потребителя. Также я не хочу использовать тип по умолчанию в потребительском приложении, поскольку у него есть несколько слушателей, которые ожидают разные типы объектов. Почему kafkalistener не может просто десериализовать полезные данные json до типа объекта, указанного в аргументе, зачем ему заголовок?
Чтобы обойти это, я определил ConsumerFactory с 'BytesDeserialser' и KafkaListenerContainerFactory с ' BytesJsonMessageConverter 'в приложении-потребителе. При этом он работал на стороне потребителя, но я не уверен, как заставить эту работу на стороне производителя при использовании replyingKafkaTemplate и десериализации ответа от потребителя.
Ниже приведены мои конфигурации - // конфигурации производителя
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.common.adapter.model.response.AccountResponse");
return props;
}
@Bean
public ProducerFactory<String, Object> replyProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> replyTemplate() {
return new KafkaTemplate<>(replyProducerFactory());
}
//consumer configs
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate() {
ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate =
new ReplyingKafkaTemplate<>(requestProducerFactory(), replyListenerContainer());
replyingKafkaTemplate.setReplyTimeout(10000);
replyingKafkaTemplate.setMessageConverter(converter());
return replyingKafkaTemplate;
}
@Bean
public KafkaMessageListenerContainer<String, Object> replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties);
}
@Bean
public ConsumerFactory<String, Object> replyConsumerFactory() {
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.trader.account.model.response.AccountResponse");
return props;
}