У меня проблемы с настройкой подключения к Confluent при использовании spring-cloud-stream-binder-kafka
. Возможно, кто-то может увидеть, что не так?
Когда я использую пример из https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/ Тогда он работает нормально, и я могу видеть сообщения в Confluent Cloud
Однако, когда добавление тех же сведений о подключении с использованием конфигурации spring-cloud-stream-binder-kafka
, это возвращает неавторизованную ошибку.
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"MySchema","namespace":"org.test","fields":[{"name":"value","type":"double"}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
Моя конфигурация ниже дает указанную выше ошибку. Не уверены, что происходит не так?
cloud:
stream:
default:
producer:
useNativeEncoding: true
kafka:
binder:
brokers: myinstance.us-east1.gcp.confluent.cloud:9092
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: https://myinstance.us-central1.gcp.confluent.cloud
basic.auth.credentials.source: USER_INFO
schema.registry.basic.auth.user.info: mySchemaKey:mySchemaSecret
configuration:
ssl.endpoint.identification.algorithm: https
sasl.mechanism: PLAIN
request.timeout.ms: 20000
retry.backoff.ms: 500
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="myKey" password="MySecret";
security.protocol: SASL_SSL
bindings:
normals-out:
destination: normals
contentType: application/*+avro
Пример из Confluent, который работает нормально:
kafka:
bootstrap-servers:
- myinstance.us-east1.gcp.confluent.cloud:9092
properties:
ssl.endpoint.identification.algorithm: https
sasl.mechanism: PLAIN
request.timeout.ms: 20000
retry.backoff.ms: 500
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="myKey" password="MySecret";
security.protocol: SASL_SSL
schema.registry.url: https://myinstance.us-central1.gcp.confluent.cloud
basic.auth.credentials.source: USER_INFO
schema.registry.basic.auth.user.info: mySchemaKey:mySchemaSecret
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
template:
default-topic:
logging:
level:
root: info