Я использую Spring Cloud Stream вместе с реестром схемы Aiven , который использует реестр схемы конфлюента . Реестр схемы Aiven защищен паролем. На основании этих инструкций эти два параметра конфигурации должны быть установлены для успешного доступа к серверу реестра схемы.
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "avnadmin:schema-reg-password");
Все нормально, когда я использую только драйверы vanilla java kafka, но если я использую Spring cloud stream, я не знаю, как ввести эти два параметра. На данный момент я помещаю "basic.auth.user.info"
и "basic.auth.credentials.source"
в "spring.cloud.stream.kafka.binder.configuration"
в файле application.yml
.
Делая это, я получаю "401 Unauthorized"
на линии, где схема хочет зарегистрироваться.
Обновление 1:
Основываясь на предложении Алина, я обновил способ, которым был сконфигурирован bean-компонент SchemaRegistryClient, чтобы ему стало известно о контексте SSL.
@Bean
public SchemaRegistryClient schemaRegistryClient(
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
try {
final KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(new FileInputStream(
new File("path/to/client.keystore.p12")),
"secret".toCharArray());
final KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(
new File("path/to/client.truststore.jks")),
"secret".toCharArray());
TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;
SSLContext sslContext = SSLContextBuilder
.create()
.loadKeyMaterial(keyStore, "secret".toCharArray())
.loadTrustMaterial(trustStore, acceptingTrustStrategy)
.build();
HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
httpClient);
ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
new RestTemplate(requestFactory));
schemaRegistryClient.setEndpoint(endpoint);
return schemaRegistryClient;
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}
Это помогло избавиться от ошибки при запуске приложения и зарегистрировало схему. Однако всякий раз, когда приложение хотело отправить сообщение Кафке, снова возникала новая ошибка. Наконец это также было исправлено ответом Ммельсена.