Я использую Kafka topi c для межсервисного взаимодействия в приложении микросервиса. Я могу отправлять данные в topi c успешно, ниже приведен мой код производителя:
Код конфигурации
@Bean
public Map<String, Object> myProducerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return props;
}
Код отправителя
Gson gson = new Gson();
String myJson = gson.toJson(myObject);
log.info("Json data::" + myJson );
myKafaTemplate.send("myTopic", "myKey",
myJson.getBytes());
Конфигурация получателя
@Bean
public Map<String, Object> myConsumerConfigs() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
Код получателя
//other codes and annotations
public void receiver(ConsumerRecord<?, ?> consumerRecord) {
Object jsonObject = consumerRecord.value();
MyObjectClass myObj= gson.fromJson(String.valueOf(jsonObject), MyObjectClass.class);
}
Я получаю сообщение об ошибке
java.lang.ClassCastException: java.lang.String cannot be cast to mypackage.myobject
Может у кого-нибудь есть решение этого ???