Spring Cloud Stream SSL соединение с Кафкой - PullRequest
0 голосов
/ 29 августа 2018

У меня есть приложение Spring Cloud Stream, использующее Spring Boot 2.0.3.RELEASE и библиотеки Spring Cloud Stream под Finchley.SR1 . Со свойствами:

spring:
  kafka:
    ssl:
      protocol: SASL_SSL
      keystore-location: classpath:kafka.p12 // created from a PEM with private key and certificate
      keystore-password: passw0rd
      key-store-type: PKCS12
  cloud:
    stream:
      bindings:
      ...
      binders:
        kafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      brokers: myhost.mydomain.com

пытается установить SSL-соединение с Kafka со свойствами:

listeners=SASL_SSL://myhost.mydomain.com:9092
advertised.listeners=SASL_SSL://myhost.mydomain.com:9092

Но я получаю следующее исключение:

2018-08-29 17:49:13.421 ERROR [alef-data-connector,,,] 83300 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Cannot initialize Binder

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

2018-08-29 17:49:13.422 ERROR [alef-data-connector,,,] 83300 --- [           main] o.s.cloud.stream.binding.BindingService  : Failed to create producer binding; retrying in 30 seconds

org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:391) ~[spring-cloud-stream-binder-kafka-core-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:212) ~[spring-cloud-stream-binder-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:126) ~[spring-cloud-stream-binder-kafka-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:153) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:77) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:138) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:244) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:221) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:252) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:46) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[na:1.8.0_162]
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) ~[na:1.8.0_162]
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) ~[na:1.8.0_162]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:47) ~[spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:29) [spring-cloud-stream-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:52) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:157) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:121) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:885) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.finishRefresh(ReactiveWebServerApplicationContext.java:83) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:61) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1255) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1243) ~[spring-boot-2.0.3.RELEASE.jar:2.0.3.RELEASE]
    at com.alefeducation.connector.ConnectorApplicationKt.main(ConnectorApplication.kt:13) ~[classes/:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

Правильна ли моя конфигурация для приложения Spring Cloud Stream. Есть ли у нас документация Spring Cloud Stream, объясняющая SSL-соединение с Kafka?

1 Ответ

0 голосов
/ 29 августа 2018

При использовании поддержки нескольких связующих (через свойство environment) наследование корневых загрузочных свойств не происходит.

Вам необходимо либо переместить свойства spring.kafka.ssl в environment, либо, если у вас есть только один связыватель, удалить конфигурацию с несколькими связывателями и просто объявить свойства связывателя в корне.

EDIT

Это работает для меня ...

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: so52080276
          binder: kafka1
          group: so52080276
      binders:
        kafka1:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      brokers: myhost.mydomain.com
                      configuration:
                        security.protocol: SASL_SSL
                        ssl.keystore.location: classpath:kafka.p12 // created from a PEM with private key and certificate
                        ssl.keystore.password: passw0rd
                        ssl.key.store.type: PKCS12
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...