Как настроить проект Spring Cloud Kafka с подключением SASL_SSL Apache Kafka? - PullRequest
0 голосов
/ 04 февраля 2020

Я пытаюсь настроить Spring Cloud Kafka с помощью SASL_SSL, но не могу заставить его работать без проблем. Я считаю, что мой application.yml не настроен правильно, поэтому, пожалуйста, совет и помощь.

Вот моя конфигурация application.yml:

spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers: localhost:9090
          consumerProperties:
            security.protocol: SASL_SSL
            sasl.mechanism: SCRAM-SHA-512
            sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="userkk" password="admin-secret";
          producerProperties:
            security.protocol: SASL_SSL
            sasl.mechanism: SCRAM-SHA-512
            sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="userkk" password="admin-secret";
      bindings:
        SINGAPOR_RECEIVER:
          binder: kafka
          destination: SINGAPOR_RECEIVER
          group: output-group-2
          content-type: text/plain
        SINGAPOR_RESPOND:
          binder: kafka
          destination: SINGAPOR_RESPOND
          group: output-group-1
          content-type: text/plain
        RESULT_RESPOND:
          binder: kafka
          destination: RESULT_RESPOND
          group: output-group-3
          content-type: text/plain

Вот что я получил в консоли Spring:

2020-02-04 16:58:20.687  INFO 35715 --- [           main] com.gdce.doca.ApplicationKt              : Starting ApplicationKt on yourpc with PID 35715 (/home/yourpc/data/work-project/java/kafka-doc-a/build/classes/kotlin/main started by yourpc in /home/yourpc/data/work-project/java/kafka-doc-a)
2020-02-04 16:58:20.689  INFO 35715 --- [           main] com.gdce.doca.ApplicationKt              : No active profile set, falling back to default profiles: default
2020-02-04 16:58:21.580  INFO 35715 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-02-04 16:58:21.586  INFO 35715 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2020-02-04 16:58:21.589  INFO 35715 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-02-04 16:58:21.639  INFO 35715 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-04 16:58:21.642  INFO 35715 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-04 16:58:21.658  INFO 35715 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-04 16:58:21.980  INFO 35715 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port(s): 9088 (http)
2020-02-04 16:58:21.993  INFO 35715 --- [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2020-02-04 16:58:21.993  INFO 35715 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet engine: [Apache Tomcat/9.0.30]
2020-02-04 16:58:22.082  INFO 35715 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2020-02-04 16:58:22.083  INFO 35715 --- [           main] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 1324 ms
2020-02-04 16:58:22.473  INFO 35715 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2020-02-04 16:58:22.722  INFO 35715 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
2020-02-04 16:58:22.878  INFO 35715 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application.SINGAPORE_RESPOND' has 1 subscriber(s).
2020-02-04 16:58:22.880  INFO 35715 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-02-04 16:58:22.881  INFO 35715 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application.errorChannel' has 1 subscriber(s).
2020-02-04 16:58:22.881  INFO 35715 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
2020-02-04 16:58:23.108  INFO 35715 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: SINGAPORE_RECEIVER
2020-02-04 16:58:23.110  INFO 35715 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
        bootstrap.servers = [SASL_SSL://localhost:9090]
        client.dns.lookup = default
        client.id = 
        connections.max.idle.ms = 300000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 120000
        retries = 5
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS

2020-02-04 16:58:23.213  INFO 35715 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-02-04 16:58:23.214  INFO 35715 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-02-04 16:58:23.214  INFO 35715 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1580810303212

Вот ошибка, которую я получаю от консоли Kafka:

[2020-02-04 16:12:27,471] INFO [SocketServer brokerId=0] Failed authentication with test.local/127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector]

Я попытался сгенерировать и использовать сообщение, используя kafka-console -roduction. sh, и оно отлично работает с приведенными ниже конфигурацией и командой: - kafka_client_jaas.conf

KafkaClient {
   org.apache.kafka.common.security.scram.ScramLoginModule required
   username="userkk"
   password="admin-secret";
};
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA/config/kafka_client_jaas.conf"
$KAFKA/bin/./kafka-console-producer.sh --broker-list localhost:9090 --topic test --producer.config $KAFKA/config/producer.properties
$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server localhost:9090 --topic test --from-beginning --consumer.config $KAFKA/config/consumer.properties

Редактировать

А вот конфигурация server.properties Кафки, которую я использую:

# PLAINTEXT
#listeners=SASL_PLAINTEXT://localhost:9090
#security.inter.broker.protocol=SASL_PLAINTEXT
#sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
#sasl.enabled.mechanisms=SCRAM-SHA-512


# SSL + SASL/SCRAM
listeners=SASL_SSL://localhost:9090
advertised.listeners=SASL_SSL://localhost:9090
advertised.host.name=localhost
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512


ssl.keystore.location=/home/yourpc/ssl-generator-tmp/ssl/server/server.p12
ssl.keystore.password=changeit
ssl.key.password=changeit
# ssl.truststore.location=/home/yourpc/server-truststore.jks
# ssl.truststore.password=123123


# If any of the SASL authentication mechanisms are enabled for a given listener, then SSL client authentication is disabled even if ssl.client.auth=required is configured, and the broker will authenticate clients only via SASL on that listener
ssl.client.auth=required


# topic control
auto.create.topics.enable=true
# delete.topic.enable=true


# zookeeper communication
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000


# NETWORK MANAGEMENT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=10485760


num.partitions=1
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=30000000
log.flush.interval.ms=1800000
log.retention.minutes=30
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000


# ENABLE ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin;User:user-a;User:Bob;User:Alice


ssl.endpoint.identification.algorithm=

1 Ответ

0 голосов
/ 04 февраля 2020

попробуйте использовать следующий конфиг на разных уровнях свойств:

spring.kafka. bootstrap -servers = $ {BROKERS} spring.kafka.properties.security.protocol = SASL_SSL spring. kafka.properties.sasl.mechanism = SCRAM-SHA-256 spring.kafka.properties.sasl.jaas.config = org. apache .kafka.common.security.scram.ScramLoginModule требуется имя пользователя = "$ {USERNAME}" пароль = "$ {ПАРОЛЬ}";

запомните, замените переменные $

...