Как получить доступ к серверу реестра конфликтующей схемы, защищенному паролем, с помощью облачного потока Spring? - PullRequest
1 голос
/ 16 апреля 2019

Я использую Spring Cloud Stream вместе с реестром схемы Aiven , который использует реестр схемы конфлюента . Реестр схемы Aiven защищен паролем. На основании этих инструкций эти два параметра конфигурации должны быть установлены для успешного доступа к серверу реестра схемы.

 props.put("basic.auth.credentials.source", "USER_INFO");
 props.put("basic.auth.user.info", "avnadmin:schema-reg-password");

Все нормально, когда я использую только драйверы vanilla java kafka, но если я использую Spring cloud stream, я не знаю, как ввести эти два параметра. На данный момент я помещаю "basic.auth.user.info" и "basic.auth.credentials.source" в "spring.cloud.stream.kafka.binder.configuration" в файле application.yml.

Делая это, я получаю "401 Unauthorized" на линии, где схема хочет зарегистрироваться.

Обновление 1:

Основываясь на предложении Алина, я обновил способ, которым был сконфигурирован bean-компонент SchemaRegistryClient, чтобы ему стало известно о контексте SSL.

@Bean
public SchemaRegistryClient schemaRegistryClient(
    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
  try {
    final KeyStore keyStore = KeyStore.getInstance("PKCS12");
    keyStore.load(new FileInputStream(
            new File("path/to/client.keystore.p12")),
        "secret".toCharArray());

    final KeyStore trustStore = KeyStore.getInstance("JKS");
    trustStore.load(new FileInputStream(
            new File("path/to/client.truststore.jks")),
        "secret".toCharArray());

    TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;

    SSLContext sslContext = SSLContextBuilder
        .create()
        .loadKeyMaterial(keyStore, "secret".toCharArray())
        .loadTrustMaterial(trustStore, acceptingTrustStrategy)
        .build();

    HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
    ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
        httpClient);
    ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
        new RestTemplate(requestFactory));
    schemaRegistryClient.setEndpoint(endpoint);
    return schemaRegistryClient;
  } catch (Exception ex) {
    ex.printStackTrace();
    return null;
  }
}

Это помогло избавиться от ошибки при запуске приложения и зарегистрировало схему. Однако всякий раз, когда приложение хотело отправить сообщение Кафке, снова возникала новая ошибка. Наконец это также было исправлено ответом Ммельсена.

Ответы [ 3 ]

1 голос
/ 16 апреля 2019

Поскольку Aiven использует SSL для протокола безопасности Kafka, для аутентификации необходимо использовать сертификаты.

Вы можете просмотреть эту страницу , чтобы понять, как она работает.В двух словах, вам нужно выполнить следующую команду для генерации и импорта сертификатов:

openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks

Затем вы можете использовать следующие свойства для использования сертификатов:

spring.cloud.stream.kafka.streams.binder:
  configuration:
    security.protocol: SSL
    ssl.truststore.location: client.truststore.jks
    ssl.truststore.password: secret
    ssl.keystore.type: PKCS12
    ssl.keystore.location: client.keystore.p12
    ssl.keystore.password: secret
    ssl.key.password: secret
    key.serializer: org.apache.kafka.common.serialization.StringSerializer
    value.serializer: org.apache.kafka.common.serialization.StringSerializer
1 голос
/ 01 мая 2019

Я столкнулся с той же проблемой, что и ситуация, в которой я находился, состоял в подключении к реестру защищенной схемы, размещенному в aiven и защищенному базовой аутентификацией.Чтобы заставить его работать, мне нужно было настроить следующие свойства:

spring.kafka.properties.schema.registry.url=https://***.aiven***.com:port
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.basic.auth.user.info=username:password

другие свойства для моего связующего:

spring.cloud.stream.binders.input.type=kafka
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.brokers=https://***.aiven***.com:port <-- different from the before mentioned port
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=truststore.jks
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=PKCS12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=clientkeystore.p12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.key.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.streams.binder.autoCreateTopics=false

, что на самом деле происходит, это поток облака Springдобавит spring.kafka.properties.basic * в DefaultKafkaConsumerFactory и добавит конфигурацию в KafkaConsumer.В какой-то момент во время инициализации пружинной кафки создается CachedSchemaRegistryClient, которому предоставляются эти свойства.Этот клиент содержит метод с именем configureRestService, который проверяет, содержит ли карта свойств «basic.auth.credentials.source».Поскольку мы предоставляем это через spring.kafka.properties, оно найдет это свойство и позаботится о создании соответствующих заголовков при доступе к конечной точке реестра схемы.

надеюсь, что это сработает и для вас.

Я использую весеннюю облачную версию Greenwich.SR1, spring-boot-starter 2.1.4.RELEASE, avro-версию 1.8.2 и confluent.version 5.2.1

1 голос
/ 16 апреля 2019

Конфигурация связывателя обрабатывает только известные свойства потребителя и производителя.

Вы можете установить произвольные свойства на уровне привязки.

spring.cloud.stream.kafka.binding.<binding>.consumer.configuration.basic.auth...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...