Отправка Json данных в теме Kafka - PullRequest
1 голос
/ 14 июля 2020

Я использую Kafka topi c для межсервисного взаимодействия в приложении микросервиса. Я могу отправлять данные в topi c успешно, ниже приведен мой код производителя:

Код конфигурации

@Bean
public Map<String, Object> myProducerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    return props;
}

Код отправителя

        Gson gson = new Gson();
        String myJson = gson.toJson(myObject);
        log.info("Json data::" + myJson );
        myKafaTemplate.send("myTopic", "myKey",
                myJson.getBytes());

Конфигурация получателя

@Bean
public Map<String, Object> myConsumerConfigs() {
    Map<String, Object> props = new HashMap<String, Object>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return props;
}

Код получателя

//other codes and annotations
public void receiver(ConsumerRecord<?, ?> consumerRecord) {
  Object jsonObject = consumerRecord.value();
  MyObjectClass myObj= gson.fromJson(String.valueOf(jsonObject), MyObjectClass.class);
 }

Я получаю сообщение об ошибке

java.lang.ClassCastException: java.lang.String cannot be cast to mypackage.myobject

Может у кого-нибудь есть решение этого ???

...