Потребитель Spring kafka не фиксирует сервер kafka после смены лидера - PullRequest
0 голосов
/ 24 января 2019

Я использую spring-kafka 2.1.10. ВЫПУСК.У меня есть потребитель со следующими свойствами (скопировал почти все):

    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [kafka1.local:9093, kafka2.local:9093, kafka3.local:9093]
    check.crcs = true
    client.id = kafkaListener-0
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = kafkaLisneterContainer
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    max.poll.interval.ms = 300000
    max.poll.records = 50
    metadata.max.age.ms = 300000
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

Версия Apache Kafka на моем производстве - 2.11-1.0.0-0pan4.Существует кластер с 3 узлами кафки внутри:

enter image description here

Столкнулся с серьезной проблемой и даже не может воспроизвести ее локально.И вот что произошло:

  1. Я запустил свое приложение как с Кафкой, так и с Потребителем внутри.

  2. Все работало нормально, пока не был изменен узел-лидер для моей темы в 2019-01-17 06: 47: 39:

2019-01-17 / controller.2019-01-17-03.aaa-aa3.gz: 2019-01-17 06: 47: 39,365 +0000 [поток событий контроллера] [kafka.controller.KafkaController]ИНФОРМАЦИЯ [ID контроллера = 3] Новый руководитель и ISR для раздела topic_name-0: {"leader": 1, "leader_epoch": 3, "isr": [1,3]} (kafka.controller.KafkaController)

После этого мой потребитель прекратил передавать смещения на Кафку.Последний коммит произошел в тот же час и в ту же минуту, когда сменился лидер - 17 января 2019 г. 06:47.enter image description here

4) САМЫЕ ТАЙНЫЕ: в приложении все работает - все в порядке.Spring-Потребитель читает новые сообщения и отправляет их Кафке.Я вижу такие журналы.Похоже, пружинный потребитель сохраняет свое смещение в памяти и отправляет коммит удаленной кафке (без ошибок и т. Д.):

2019-01-23 14: 03: 20,975 +0000 [kafkaLisneterContainer-0-C-1] [Fetcher] DEBUG [Consumer clientId = kafkaListener-0, groupId = kafkaLisneterContainer] Извлечение READ_UNCOMMITTED со смещением 164871 для раздела aaa-1 вернуло данные выборки (ошибка = NONE, highWaterMark = 164871, lastStableOffset = -1, lastStableOffset = -1, lastStableOffset = -1abortedTransactions = null, recordsSizeInBytes = 0) 2019-01-23 14: 03: 20,975 + 0000
[externalbetting] [kafkaLisneterContainer-0-C-1] [Fetcher] DEBUG [Consumer clientId = kafkaListener-0, groupId = kafkaListainter] Добавлен запрос извлечения READ_UNCOMMITTED для раздела eaaa-1 со смещением 164871 на узел aaa-aa1.local: 9093 (id: 1 rack: null) 2019-01-23 14: 03: 20,975

5) Но все равно Лаг в Apache kafka растет.И если я перезапущу свое приложение, потребитель bean-компонента Spring будет воссоздан и потеряет сохраненное в памяти смещение.Он будет читать этот лаг из kafka и обрабатывать записи во второй раз.

Пожалуйста, помогите найти ключ!

1 Ответ

0 голосов
/ 24 января 2019

Когда вы включаете автоматическую фиксацию (по умолчанию Kafka), коммиты полностью управляются клиентами kafka, и Spring не контролирует их.

Установка его на false позволит контейнеру слушателя фиксировать смещения, которые он будет делать после каждой партии записей (результат опроса) по умолчанию или после каждой записи, если для свойства контейнера AckMode установлено значение RECORD .

Контейнер также будет надежно фиксировать любые ожидающие смещения, когда разделы отменены из-за перебалансировки.

Я обычно рекомендую не использовать автоматическую фиксацию.

...