Доступ к kafka из весеннего загрузочного приложения, работающего на kubernetes - PullRequest
1 голос
/ 11 марта 2019

У меня есть инсталляция kafka в кластере kubernetes. У меня есть модуль, на котором запущено приложение весенней загрузки, которое использует конфигурацию bootstrap.servers по умолчанию (localhost: 9092), а не ту, которая была передана (bootstrap.kafka.svc.cluster.local: 9092). Модуль не запускается, так как kafka не работает на localhost.

Вот моя весенняя конфигурация загрузки

spring:
  kafka:
    consumer:
      group-id: spring-template
      auto-offset-reset: earliest
      bootstrap-servers: "bootstrap.kafka.svc.cluster.local:9092"
    producer:
      bootstrap-servers: "bootstrap.kafka.svc.cluster.local:9092"
    bootstrap-servers: "bootstrap.kafka.svc.cluster.local:9092"

При настройке потребителей при запуске идентификатор группы и автоматический сброс смещения корректно передаются из вышеуказанной конфигурации. Однако конфигурация загрузочных серверов отсутствует, и приложение фактически использует localhost: 9092 согласно журналу ниже

2019-03-11 07:34:36.826  INFO 1 --- [  restartedMain] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = spring-template
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2019-03-11 07:34:36.942  INFO 1 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2019-03-11 07:34:36.945  INFO 1 --- [  restartedMain] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14705e51bd2ce5
2019-03-11 07:34:37.149  WARN 1 --- [  restartedMain] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=spring-template] Connection to node -1 could not be established. Broker may not be available.

У меня есть служба kubernetes, которая называется bootstrap и работает в пространстве имен kafka в кластере kubernetes. Вот фрагмент файла журнала. Почему приложение весенней загрузки не получает настроенную конфигурацию bootstrap.servers

1 Ответ

1 голос
/ 11 марта 2019

Свойство bootstrap-servers ожидает список адресов сервера Kafka. Поместив кавычки вокруг значения, вы указали, что значение является строкой, и, следовательно, создаете конфликт типов.

Чтобы решить эту проблему, вы должны либо удалить кавычки, либо явно указать значение в виде списка. например:

spring:
  kafka:
    consumer:
      bootstrap-servers: ["bootstrap.kafka.svc.cluster.local:9092"]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...