Проблема с Spring Cloud Stream при использовании клиента реестра схемы по умолчанию вместо клиента реестра Avro Schema - PullRequest
0 голосов
/ 05 ноября 2018

Мы используем Kafka с Spring Cloud Stream, и нам нужно подключиться к реестру Confluent Schema в нашем компоненте Spring Boot, см. https://github.com/donalthurley/KafkaConsumeScsAndConfluent.

Мы добавили следующую конфигурацию для создания необходимого компонента ConfluentSchemaRegistryClient, см. https://github.com/donalthurley/KafkaConsumeScsAndConfluent/blob/master/src/main/java/com/example/kafka/KafkaConfig.java, который должен переопределить реестр схемы по умолчанию из Spring Cloud Stream.

Однако после некоторых развертываний мы периодически видим следующую ошибку.

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel

Основная причина показывает эту трассировку стека

Caused by: java.lang.NullPointerException
    at     org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:71)
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:238)
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:179)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:201)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:191)
    at org.springframework.messaging.converter.CompositeMessageConverter.toMessage(CompositeMessageConverter.java:83)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:322)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:351)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:611)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)

Тот факт, что DefaultSchemaRegistryClient вызывается AvroSchemaRegistryClientMessageConverter, указывает на то, что существует проблема с подключением нашего bean-объекта ConfluentSchemaRegistryClient.

Есть ли что-то еще, что требуется в нашей конфигурации, чтобы гарантировать, что bean-компонент ConfluentSchemaRegistryClient подключен правильно?

Ответы [ 2 ]

0 голосов
/ 06 ноября 2018

Теперь я переместил аннотацию @EnableSchemaRegistryClient в класс приложения проекта, см. https://github.com/donalthurley/KafkaConsumeScsAndConfluent/commit/b4cf5427d7ab0a4fed619fe54b042890f5ccb594, и повторно развернул ее, и это устранило проблему, возникшую у меня при развертывании в нашей среде.

Я аннотировал свои классы производителей и потребителей с помощью аннотации @EnableSchemaRegistryClient.

Во всех моих локальных тестах это работало против моего локального реестра схемы слияния докеров. Однако при развертывании в наших средах он работал большую часть времени, но иногда происходил сбой после некоторых развертываний.

Мне не удалось воспроизвести это локально.

Я также заметил, что при локальном тестировании я получаю ту же трассировку стека исключений нулевого указателя, если удаляю конфигурацию из реестра Confluent Schema.

Итак, проблема, которую, я думаю, я вижу, состоит в том, что компонент AvroSchemaRegistryClientMessageConverter не был связан с компонентом реестра сливающейся схемы, когда аннотация @EnableSchemaRegistryClient не присутствовала в классе приложения проекта.

Я не понимаю, почему именно это необходимо, но думаю, что это могло решить проблему.

0 голосов
/ 05 ноября 2018

Это сработало для меня. Вот что я сделал:

  • использовал класс конфигурации точно так же, как вы
  • использовал аннотацию @EnableSchemaRegistryClient в проекте
  • добавил авро-сериализатор в путь к классам: io.confluent:kafka-avro-serializer
  • установить свойства следующим образом:

    spring.cloud.stream.kafka.bindings.channel.consumer.configuration.schema.registry.url = адрес вашего реестра spring.cloud.stream.kafka.bindings.channel.consumer.configuration.specific.avro.reader = истина

где канал соответствует названию канала в вашем приложении.

Мне кажется, последнее свойство очень важно, чтобы сказать Spring, чтобы он использовал сериализатор Avro вместо сериализатора по умолчанию.

Я использую Spring Cloud Stream Elmhurst.RELEASE, поэтому имена свойств могут немного отличаться, если вы используете другую версию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...