Spring Kafka - десериализация Pojos без информации о типе - PullRequest
0 голосов
/ 17 июня 2020

Я работаю над приложением распределенных микросервисов, которое использует Kafka для внутренней связи. Приложения обмениваются POJO по темам. Когда производитель отправляет сообщение потребителю, по умолчанию добавляется заголовок, указывающий имя пакета и имя класса объекта в полезной нагрузке. Затем приложение-потребитель использует эту информацию для десериализации полезной нагрузки. Но это требует от меня определения одного и того же класса в одном пакете в обоих приложениях, что не дает мне хорошего дизайна. Если я установил конфигурацию (JsonSerializer.ADD_TYPE_INFO_HEADERS) на стороне производителя, чтобы не отправлять тип в заголовке, это приведет к ошибке на стороне потребителя. Также я не хочу использовать тип по умолчанию в потребительском приложении, поскольку у него есть несколько слушателей, которые ожидают разные типы объектов. Почему kafkalistener не может просто десериализовать полезные данные json до типа объекта, указанного в аргументе, зачем ему заголовок?

Чтобы обойти это, я определил ConsumerFactory с 'BytesDeserialser' и KafkaListenerContainerFactory с ' BytesJsonMessageConverter 'в приложении-потребителе. При этом он работал на стороне потребителя, но я не уверен, как заставить эту работу на стороне производителя при использовании replyingKafkaTemplate и десериализации ответа от потребителя.

Ниже приведены мои конфигурации - // конфигурации производителя

@Bean
public Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  props.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.common.adapter.model.response.AccountResponse");
  return props;
}

@Bean
public ProducerFactory<String, Object> replyProducerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Object> replyTemplate() {
  return new KafkaTemplate<>(replyProducerFactory());
}

//consumer configs
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate() {
    ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate =
    new ReplyingKafkaTemplate<>(requestProducerFactory(), replyListenerContainer());
    replyingKafkaTemplate.setReplyTimeout(10000);
    replyingKafkaTemplate.setMessageConverter(converter());
    return replyingKafkaTemplate;
}

@Bean
public KafkaMessageListenerContainer<String, Object> replyListenerContainer() {
    ContainerProperties containerProperties = new ContainerProperties(replyTopic);
    return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties);
}

@Bean
public ConsumerFactory<String, Object> replyConsumerFactory() {
    JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
    jsonDeserializer.addTrustedPackages("*");
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.trader.account.model.response.AccountResponse");
    return props;
}

1 Ответ

0 голосов
/ 17 июня 2020

Вы можете использовать сопоставление типов .

Производитель сопоставляет com.acme.Foo с foo, а потребитель сопоставляет foo с com.other.Bar.

типы должны быть совместимы на уровне JSON.

Если вы получаете только один тип, вы можете настроить десериализатор на его использование вместо поиска заголовков с информацией о типе.

https://docs.spring.io/spring-kafka/docs/2.5.2.RELEASE/reference/html/#serdes - json -config

  • JsonDeserializer.KEY_DEFAULT_TYPE: резервный тип для десериализации ключей, если информация заголовка отсутствует.

  • JsonDeserializer.VALUE_DEFAULT_TYPE: резервный тип для десериализации значений, если информация заголовка отсутствует.

Начиная с версии 2.5, вы можете добавить функцию, которая будет вызываться десериализатором, поэтому вы можете проанализировать данные, чтобы определить тип.

См. Использование методов для определения типов .

Это (и сопоставление типов) - единственный способ обрабатывать несколько типов в шаблон ответа. Со стороны потребителя мы можем определить тип на основе параметра метода (который является правильным механизмом для использования там - это не «обходной путь»).

...