Я сделал так.
Добавьте конфигурацию sasl в свойствах.
> spring:
> kafka:
> client-id: ${spring.app.name}
> bootstrap-servers: <cluster_url>:9092
> properties:
> ssl.endpoint.identification.algorithm: https
> sasl.mechanism: PLAIN
> sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required
> username="xxxxx" password="xxxxxxx";
> security.protocol: SASL_SSL
И затем создал bean-компонент, который инициализирует KafkaStreamsConfiguration
@Bean
public KafkaStreamsConfiguration streamsConfig(KafkaProperties kafkaProperties) {
Map<String, Object> streamsProperties = kafkaProperties.buildStreamsProperties();
streamsProperties.put(BOOTSTRAP_SERVERS_CONFIG, server);
streamsProperties.put(APPLICATION_ID_CONFIG, applicationId);
streamsProperties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsProperties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return new KafkaStreamsConfiguration(streamsProperties);
}
Обратите внимание: я использую KafkaProperties.buildStreamsProperties()
для получения конфигурации потоков из свойств