Я пытаюсь настроить Spring Cloud Kafka с помощью SASL_SSL, но не могу заставить его работать без проблем. Я считаю, что мой application.yml не настроен правильно, поэтому, пожалуйста, совет и помощь.
Вот моя конфигурация application.yml:
spring:
cloud:
stream:
default-binder: kafka
kafka:
binder:
brokers: localhost:9090
consumerProperties:
security.protocol: SASL_SSL
sasl.mechanism: SCRAM-SHA-512
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="userkk" password="admin-secret";
producerProperties:
security.protocol: SASL_SSL
sasl.mechanism: SCRAM-SHA-512
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="userkk" password="admin-secret";
bindings:
SINGAPOR_RECEIVER:
binder: kafka
destination: SINGAPOR_RECEIVER
group: output-group-2
content-type: text/plain
SINGAPOR_RESPOND:
binder: kafka
destination: SINGAPOR_RESPOND
group: output-group-1
content-type: text/plain
RESULT_RESPOND:
binder: kafka
destination: RESULT_RESPOND
group: output-group-3
content-type: text/plain
Вот что я получил в консоли Spring:
2020-02-04 16:58:20.687 INFO 35715 --- [ main] com.gdce.doca.ApplicationKt : Starting ApplicationKt on yourpc with PID 35715 (/home/yourpc/data/work-project/java/kafka-doc-a/build/classes/kotlin/main started by yourpc in /home/yourpc/data/work-project/java/kafka-doc-a)
2020-02-04 16:58:20.689 INFO 35715 --- [ main] com.gdce.doca.ApplicationKt : No active profile set, falling back to default profiles: default
2020-02-04 16:58:21.580 INFO 35715 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2020-02-04 16:58:21.586 INFO 35715 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2020-02-04 16:58:21.589 INFO 35715 --- [ main] faultConfiguringBeanFactoryPostProcessor : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2020-02-04 16:58:21.639 INFO 35715 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-04 16:58:21.642 INFO 35715 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-04 16:58:21.658 INFO 35715 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2020-02-04 16:58:21.980 INFO 35715 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 9088 (http)
2020-02-04 16:58:21.993 INFO 35715 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2020-02-04 16:58:21.993 INFO 35715 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.30]
2020-02-04 16:58:22.082 INFO 35715 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2020-02-04 16:58:22.083 INFO 35715 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1324 ms
2020-02-04 16:58:22.473 INFO 35715 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-02-04 16:58:22.722 INFO 35715 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService 'taskScheduler'
2020-02-04 16:58:22.878 INFO 35715 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.SINGAPORE_RESPOND' has 1 subscriber(s).
2020-02-04 16:58:22.880 INFO 35715 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-02-04 16:58:22.881 INFO 35715 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2020-02-04 16:58:22.881 INFO 35715 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2020-02-04 16:58:23.108 INFO 35715 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: SINGAPORE_RECEIVER
2020-02-04 16:58:23.110 INFO 35715 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [SASL_SSL://localhost:9090]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2020-02-04 16:58:23.213 INFO 35715 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
2020-02-04 16:58:23.214 INFO 35715 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
2020-02-04 16:58:23.214 INFO 35715 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1580810303212
Вот ошибка, которую я получаю от консоли Kafka:
[2020-02-04 16:12:27,471] INFO [SocketServer brokerId=0] Failed authentication with test.local/127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector]
Я попытался сгенерировать и использовать сообщение, используя kafka-console -roduction. sh, и оно отлично работает с приведенными ниже конфигурацией и командой: - kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="userkk"
password="admin-secret";
};
export KAFKA_OPTS="-Djava.security.auth.login.config=$KAFKA/config/kafka_client_jaas.conf"
$KAFKA/bin/./kafka-console-producer.sh --broker-list localhost:9090 --topic test --producer.config $KAFKA/config/producer.properties
$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server localhost:9090 --topic test --from-beginning --consumer.config $KAFKA/config/consumer.properties
Редактировать
А вот конфигурация server.properties Кафки, которую я использую:
# PLAINTEXT
#listeners=SASL_PLAINTEXT://localhost:9090
#security.inter.broker.protocol=SASL_PLAINTEXT
#sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
#sasl.enabled.mechanisms=SCRAM-SHA-512
# SSL + SASL/SCRAM
listeners=SASL_SSL://localhost:9090
advertised.listeners=SASL_SSL://localhost:9090
advertised.host.name=localhost
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
sasl.enabled.mechanisms=SCRAM-SHA-512
ssl.keystore.location=/home/yourpc/ssl-generator-tmp/ssl/server/server.p12
ssl.keystore.password=changeit
ssl.key.password=changeit
# ssl.truststore.location=/home/yourpc/server-truststore.jks
# ssl.truststore.password=123123
# If any of the SASL authentication mechanisms are enabled for a given listener, then SSL client authentication is disabled even if ssl.client.auth=required is configured, and the broker will authenticate clients only via SASL on that listener
ssl.client.auth=required
# topic control
auto.create.topics.enable=true
# delete.topic.enable=true
# zookeeper communication
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
# NETWORK MANAGEMENT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=10485760
num.partitions=1
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=30000000
log.flush.interval.ms=1800000
log.retention.minutes=30
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# ENABLE ACL
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin;User:user-a;User:Bob;User:Alice
ssl.endpoint.identification.algorithm=