Смещение вне диапазона сброса смещения 2-й группы потребителей - PullRequest
0 голосов
/ 16 мая 2019

Смещение сбрасывается, когда одна тема используется двумя разными группами потребителей.

Я использую Kafka версии 0.10.1.0 и Spring-Kafka версии 2.2.4. Я создаю сообщение в теме "topic_X", которое должно быть использовано двумя разными группами потребителей "consumerA" и "consumerB", по одному в каждой.

Допустим, «потребительB» выключен. Я создал 100 сообщений для «topic_X», а «потребитель А» уже запущен и уничтожил их все. Когда я снова поднимаю "consumberB", смещение устанавливается равным 100, а не с 0.

Я попытался, установив авто-смещение-сброс: самое раннее, но оно все еще не работает. Ниже приведены журналы, которые я получил с консоли.

Когда я запускаю "consumberB", я не хочу сбрасывать смещение, как это можно сделать?

2019-05-15 11:28:44.309  INFO 16152 --- [eted-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=consumer_group] (Re-)joining group
2019-05-15 11:28:44.536  INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=consumer_group] Successfully joined group with generation 25
2019-05-15 11:28:44.539  INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=consumer_group] Setting newly assigned partitions [topic_x-0]
2019-05-15 11:28:44.788  INFO 16152 --- [ment-Data-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [topic_x-0]
2019-05-15 11:28:45.906  INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-2, groupId=consumer_group] Fetch offset 5411 is out of range for partition topic_x-0, resetting offset
2019-05-15 11:28:46.187  INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-2, groupId=consumer_group] Resetting offset for partition topic_x-0 to offset 5651.
2019-05-15 11:28:47.864  INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=consumer_group] Attempt to heartbeat failed since group is rebalancing
2019-05-15 11:28:48.142  INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=consumer_group] Revoking previously assigned partitions [topic_x-0]
2019-05-15 11:28:48.142  INFO 16152 --- [ment-Data-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [topic_x-0]
2019-05-15 11:28:48.142  INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=consumer_group] (Re-)joining group
2019-05-15 11:28:48.976  INFO 16152 --- [eted-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-4, groupId=consumer_group] Successfully joined group with generation 26
2019-05-15 11:28:48.976  INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=consumer_group] Successfully joined group with generation 26
2019-05-15 11:28:48.976  INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=consumer_group] Setting newly assigned partitions [topic_x-0]
2019-05-15 11:28:48.976  INFO 16152 --- [eted-Data-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-4, groupId=consumer_group] Setting newly assigned partitions [topic_y-0]
2019-05-15 11:28:49.246  INFO 16152 --- [ment-Data-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [topic_x-0]

Ниже приведен код, который я использую для создания потребителя. Другой потребитель из группы «customerA» уже работает в другом приложении. Когда я запускаю своего потребителя приложения в группе "customerB", смещение сбрасывается.

@KafkaListener(id="client-1", topics= "topic_x", groupId = "consumerB")
    public void completed(final byte[] bytes) throws IOException {

        //Handler code

    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...