Приложение Spring загрузки с потоком kafka - PullRequest
0 голосов
/ 10 июля 2020

У кого-нибудь есть пример чтения сообщения в виде потока с использованием потока kafka и весенней загрузки.

Мой кластер kafka защищен SASL_ SSL. Итак, как мне подключить мое приложение Spring Boot Kafka Stream с. Что писать в файле application.properties.

Я не хочу использовать весенний облачный поток.

'server.port = 8084 topi c .name = test-topi c server .servlet.context-path = / api / v1 spring.application.name = kafkatest spring.kafka. bootstrap -servers = . com: 9093 spring.kafka.producer.key-serializer = org. apache .kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer = io.confluent.kafka.serializers.KafkaAvroSerializer spring.kafka.jaas .enabled = true spring.kafka.properties.security.protocol = SASL_SSL spring.kafka.properties.security.krb5.config = файл: /etc/krb5.conf spring.kafka.properties.sasl.mechanism = GSSAPI spring.kafka. properties.sasl.kerberos.service.name = kafka spring.kafka.properties.sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule требуется useTicketCache = false serviceName = "kafka" storeKey = true Principal = " "useKeyTab = true keyTab =" / home / api / con fig / kafkaclient.keytab "; spring.kafka.ssl.trust-store-location = файл: /home/api/config/truststore.p12 spring.kafka.ssl.trust-store-password = *********** ******** spring.kafka.ssl.trust-store-type = PKCS12

1 Ответ

0 голосов
/ 10 июля 2020

Я сделал так.

Добавьте конфигурацию sasl в свойствах.

>  spring:   
>   kafka:
>     client-id: ${spring.app.name}
>     bootstrap-servers: <cluster_url>:9092
>     properties:
>       ssl.endpoint.identification.algorithm: https
>       sasl.mechanism: PLAIN
>       sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule   required
> username="xxxxx"   password="xxxxxxx";
>       security.protocol: SASL_SSL

И затем создал bean-компонент, который инициализирует KafkaStreamsConfiguration

@Bean
public KafkaStreamsConfiguration streamsConfig(KafkaProperties kafkaProperties) {
    Map<String, Object> streamsProperties = kafkaProperties.buildStreamsProperties();
    streamsProperties.put(BOOTSTRAP_SERVERS_CONFIG, server);
    streamsProperties.put(APPLICATION_ID_CONFIG, applicationId);
    streamsProperties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsProperties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    return new KafkaStreamsConfiguration(streamsProperties);
}

Обратите внимание: я использую KafkaProperties.buildStreamsProperties() для получения конфигурации потоков из свойств

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...