конфигурация spring-cloud-stream-binder-kafka для несанкционированной ошибки реестра Confluent Cloud Schema - PullRequest
2 голосов
/ 26 мая 2020

У меня проблемы с настройкой подключения к Confluent при использовании spring-cloud-stream-binder-kafka. Возможно, кто-то может увидеть, что не так?

Когда я использую пример из https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/ Тогда он работает нормально, и я могу видеть сообщения в Confluent Cloud

Однако, когда добавление тех же сведений о подключении с использованием конфигурации spring-cloud-stream-binder-kafka, это возвращает неавторизованную ошибку.

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"MySchema","namespace":"org.test","fields":[{"name":"value","type":"double"}]}

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401

Моя конфигурация ниже дает указанную выше ошибку. Не уверены, что происходит не так?

  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: true
      kafka:
        binder:
          brokers: myinstance.us-east1.gcp.confluent.cloud:9092
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: https://myinstance.us-central1.gcp.confluent.cloud
            basic.auth.credentials.source: USER_INFO
            schema.registry.basic.auth.user.info: mySchemaKey:mySchemaSecret
          configuration:
            ssl.endpoint.identification.algorithm: https
            sasl.mechanism: PLAIN
            request.timeout.ms: 20000
            retry.backoff.ms: 500
            sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="myKey" password="MySecret";
            security.protocol: SASL_SSL
      bindings:
        normals-out:
          destination: normals
          contentType: application/*+avro

Пример из Confluent, который работает нормально:

  kafka:
    bootstrap-servers:
      - myinstance.us-east1.gcp.confluent.cloud:9092
    properties:
      ssl.endpoint.identification.algorithm: https
      sasl.mechanism: PLAIN
      request.timeout.ms: 20000
      retry.backoff.ms: 500
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="myKey" password="MySecret";
      security.protocol: SASL_SSL
      schema.registry.url: https://myinstance.us-central1.gcp.confluent.cloud
      basic.auth.credentials.source: USER_INFO
      schema.registry.basic.auth.user.info: mySchemaKey:mySchemaSecret
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    template:
      default-topic:
logging:
  level:
    root: info

1 Ответ

0 голосов
/ 27 мая 2020

Моя проблема заключалась только в том, что мне не хватало зависимости в моем pom.

Я должен удалить свой вопрос, но я оставляю его здесь как ссылку на то, что конфигурация действительно работает, как указано выше. *

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry-client</artifactId>
  <version>5.3.0</version>
</dependency>
...