Я пытаюсь использовать Spring Kafka с реестром схемы Confluent и Kafka Avro Deserializer. Однако в документе Spring Kafka такого нет.
Вот что я делаю для своего потребителя Spring Kafka:
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Bean
ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Customer> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put("schema.registry.url", "http://127.0.0.1:8081");
props.put("specific.avro.reader", "true");
// props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // disable auto commit of offsets
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); // disable auto commit of offsets
return props;
}
}
и
@Component
@Slf4j
public class KafkaConsumerService {
private CustomerConverter customerConverter;
public KafkaConsumerService(CustomerConverter customerConverter) {
this.customerConverter = customerConverter;
}
@KafkaListener(id = "demo-consumer-1st-group", topics = "kafka-demo-avro")
public void process(@Payload Customer customer,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
LOGGER.info("topic: {}, partition: {}, offset: {}", topics.get(0), partitions.get(0), offsets.get(0));
CustomerModel customerModel = customerConverter.convertToModel(customer);
LOGGER.info("customer: {}", customerModel);
}
}
Мой производитель может без проблем создать сообщение Avro. Но ошибка, которую я получил для потребителя:
Method [public void kafkademo.kafka.KafkaConsumerService.process(kafkademo.generated.avro.Customer)]
Bean [kafkademo.kafka.KafkaConsumerService@238258a0]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [kafkademo.generated.avro.Customer] to [kafkademo.generated.avro.Customer] for GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}], failedMessage=GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [kafkademo.generated.avro.Customer] to [kafkademo.generated.avro.Customer] for GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}], failedMessage=GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1324) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1229) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1200) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1120) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:935) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:751) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [kafkademo.generated.avro.Customer] to [kafkademo.generated.avro.Customer] for GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}], failedMessage=GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:292) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1275) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1258) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1219) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
... 8 common frames omitted