Весенняя Кафка с Десериализатором Confluent Kafka Avro - PullRequest
0 голосов
/ 08 июля 2019

Я пытаюсь использовать Spring Kafka с реестром схемы Confluent и Kafka Avro Deserializer. Однако в документе Spring Kafka такого нет.

Вот что я делаю для своего потребителя Spring Kafka:

@Configuration
@EnableKafka
public class KafkaConfiguration {

  @Bean
  ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
  }

  @Bean
  public ConsumerFactory<String, Customer> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
    props.put("schema.registry.url", "http://127.0.0.1:8081");
    props.put("specific.avro.reader", "true");

//    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // disable auto commit of offsets
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); // disable auto commit of offsets
    return props;
  }
}

и

@Component
@Slf4j
public class KafkaConsumerService {

  private CustomerConverter customerConverter;

  public KafkaConsumerService(CustomerConverter customerConverter) {
    this.customerConverter = customerConverter;
  }

  @KafkaListener(id = "demo-consumer-1st-group", topics = "kafka-demo-avro")
  public void process(@Payload Customer customer,
                      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                      @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                      @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    LOGGER.info("topic: {}, partition: {}, offset: {}", topics.get(0), partitions.get(0), offsets.get(0));
    CustomerModel customerModel = customerConverter.convertToModel(customer);
    LOGGER.info("customer: {}", customerModel);
  }
}

Мой производитель может без проблем создать сообщение Avro. Но ошибка, которую я получил для потребителя:

Method [public void kafkademo.kafka.KafkaConsumerService.process(kafkademo.generated.avro.Customer)]
Bean [kafkademo.kafka.KafkaConsumerService@238258a0]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [kafkademo.generated.avro.Customer] to [kafkademo.generated.avro.Customer] for GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}], failedMessage=GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [kafkademo.generated.avro.Customer] to [kafkademo.generated.avro.Customer] for GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}], failedMessage=GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1324) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1229) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1200) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1120) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:935) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:751) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [kafkademo.generated.avro.Customer] to [kafkademo.generated.avro.Customer] for GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}], failedMessage=GenericMessage [payload={"id": 7, "key": "key-789", "name": "kafka"}, headers={kafka_offset=6, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7e280b0e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=key-789, kafka_receivedPartitionId=0, kafka_receivedTopic=kafka-demo-avro, kafka_receivedTimestamp=1562593667254}]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:292) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1275) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1258) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1219) ~[spring-kafka-2.2.7.RELEASE.jar:2.2.7.RELEASE]
    ... 8 common frames omitted

1 Ответ

0 голосов
/ 08 июля 2019

Однако в документе Spring Kafka такого нет.

Нет ничего, чтобы документировать; это просто еще один сериализатор / десериализатор.

Невозможно преобразовать из [kafkademo.generated.avro.Customer] в [kafkademo.generated.avro.Customer] для ...

Это похоже на проблему с загрузчиком классов.

Попробуйте использовать это вместо

  @Bean
  public ConsumerFactory<String, Object> consumerFactory() {
    KafkaAvroDeserializer avroDeser = new KafkaAvroDeserializer();
    avroDeser.configure(consumerConfigs(), false);
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), 
                   new StringDeserializer(), 
                   avroDeser);
  }

чтобы слушатель и десериализатор были созданы в одном загрузчике классов.

...