Я новичок в Kafka и Spring Cloud и отправляю сообщение с ключом в тему kafka и пытаюсь прочитать его слушателем.Я получаю исключение, когда пытаюсь получить ключ Кафки в слушателе.
В продюсере я пишу в тему, как показано ниже:
@Slf4j
@Service
@EnableBinding(FirstStream.class)
public class FirstProducer {
private final FirstStream firstStream;
@Autowired
public FirstProducer(FirstStream firstStream){
this.firstStream= firstStream;
}
public void send(MyKey key, Record record) {
log.info("Sending {}", record);
MessageChannel messageChannel = firstStream.testOut();
messageChannel.send(MessageBuilder.withPayload(record).setHeader(KafkaHeaders.MESSAGE_KEY, key).build());
}
}
В слушателе я ловлюсообщение, но позже получено исключение:
@StreamListener(FirstStream.INPUT)
public void process(Message<?> message){
Record record = (Record ) message.getPayload(); // here I get exception
MyKey kafkaKey = message.getHeaders().get(KafkaHeaders.MESSAGE_KEY, MyKey.class);
log.info("Received message");
log.info("Processing: {} ", record );
......
}
org.springframework.messaging.MessagingException: исключение, выдаваемое при вызове rg.spring.adapters.MyProcessor # process [1 аргументов];вложенным исключением является java.lang.ClassCastException: класс java.lang.String не может быть приведен к классу rg.spring.dto.raw.Record (java.lang.String находится в модуле java.base загрузчика 'bootstrap'; rg.spring.dto.raw.Record находится в безымянном модуле загрузчика 'app') ....
Причина: java.lang.ClassCastException: класс java.lang.String не может быть приведен к классу rg.spring.dto.raw.Record (java.lang.String находится в модуле java.base загрузчика 'bootstrap'; rg.spring.dto.raw.Record находится в неназванном модуле загрузчика 'app')
Почему я получаю это исключение?Как я вижу, полезная нагрузка является формой строки json, но вместо этого она не десериализуется в объект, она выдает исключение classCastException.
Я использовал:
свойств-производителей: key.serializer: org.springframework.kafka.support.serializer.springframework.kafka.support.serializer.JsonDeserializer