Кажется, я не могу использовать сообщения в их конкретной реализации avro, я получаю следующее исключение:
class org.apache.avro.generic.GenericData$Record cannot be cast to class my.package.MyConcreteClass
Вот код (я использую Spring Boot
)
MyProducer. java
private final KafkaTemplate<String, MyConcreteClass> kafkaTemplate;
public PositionProducer(KafkaTemplate<String, MyConcreteClass> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(MyConcreteClass myConcreteClass) {
this.kafkaTemplate.send(topic, myConcreteClass);
}
MyConsumer. java
@KafkaListener(topics = "#{'${consumer.topic.name}'}", groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void listen(MyConcreteClass incomingMsg) {
//handle
}
Обратите внимание, что если я изменю все на GenericRecord
, десериализация работает должным образом, поэтому я знаю, что все конфигурации (не вставленные) настроены правильно.
Также может быть важно отметить, что я не зарегистрировал схему самостоятельно, и вместо этого позволил своему клиентскому коду сделать это для меня.
Есть идеи?
РЕДАКТИРОВАТЬ:
Конфиг:
@Bean
public ConsumerFactory<String, MyConcreteClass> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return new DefaultKafkaConsumerFactory<>(props);
}