Смещения Kafka Broker / сохранение журнала и сброса потребителей в самом раннем режиме - PullRequest
0 голосов
/ 19 декабря 2018

Описание проблемы:

Наш потребитель Kafka (разработанный в Spring Boot 2.x) работает в течение нескольких дней.Когда мы перезапускаем этого потребителя, все сообщения темы потребляются снова, но только при определенных условиях.

Условия:

Мы предлагаем комбинацию брокер / тему конфигурации ( log.retention. *, offsets.retention. *) и потребительский конфиг ( auto.offset.reset = самое раннее ) вызываютэто поведение.
Очевидно, что мы не можем установить потребителя на "последний" , потому что, если потребитель остановлен и приходят новые сообщения, когда потребитель запускается снова, эти сообщения не будут использованы.

Вопрос:

Как правильно настроить, чтобы избежать этой ситуации?
В последнем выпуске Kafka Broker (2.x) значения по умолчанию для log.retention.* и offsets.retention. * одинаковы (https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days)

Может ли эта новая настройка конфигурации решить проблему?

Конфигурация потребителя (auto.commit , делегированный в Spring Cloud Stream Framework):

           auto.commit.interval.ms = 100
           auto.offset.reset = earliest
           bootstrap.servers = [server1:9092]
           check.crcs = true
           client.id = 
           connections.max.idle.ms = 540000
           enable.auto.commit = false
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = consumer_group1
           heartbeat.interval.ms = 3000
           interceptor.classes = null
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 305000
           retry.backoff.ms = 100
           value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

Brконфигурация okers:

           log.retention.ms = 86400000
           log.retention.minutes = 10080
           log.retention.hours = 168
           log.retention.bytes = -1

           offsets.retention.ms = 864000000
           offsets.retention.minutes = 14400
           offsets.retention.hours = 240 

           unclean.leader.election.enable = false
           log.cleaner.enable = true
           auto.leader.rebalance.enable = true
           leader.imbalance.check.interval.seconds = 300
           log.retention.check.interval.ms = 300000
           log.cleaner.delete.retention.ms = 604800000

Спасибо и всего наилучшего

Ответы [ 2 ]

0 голосов
/ 19 декабря 2018

Если вы поддерживаете приложение работающим 24x7 (например, в выходные дни, когда данных нет), одним из вариантов будет установка idleInterval и добавление ApplicationListener (или @EventListener) для прослушивания ListenerContainerIdleEvents.

Затем, если свойство idleTime приближается к сохранению вашего журнала, вы можете повторно зафиксировать смещения, используя Consumer в событии - получить назначенные разделы, найти их текущие position() изатем повторите.

0 голосов
/ 19 декабря 2018

Вы правы, у вас возникла эта проблема из-за различных значений log.retention.* и offsets.retention.* (7 дней и 1 день соответственно) для версий Kafka до 2.0, пожалуйста, проверьте описание здесь .это происходит из-за редких сообщений, приходящих в вашу тему, и данные о смещении уже истекли.

это не совсем правильно в отношении вашей фразы Obviously we can't set consumer to "latest"если вы получили последние сообщения менее чем за 1 день до этого (как несколько часов назад), вы можете смело обновлять значение auto.offset.reset до latest и с тем же идентификатором группы (или application.id).в таком случае вы не потеряете сообщения.

В качестве другого варианта вы можете изменить значение срока хранения журнала для определенной темы на 1 день.Также вы можете обновить значение offsets.retention.*, но с этим вам нужно протестировать его с точки зрения производительности, оно может ухудшиться.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...