Смещение сбрасывается, когда одна тема используется двумя разными группами потребителей.
Я использую Kafka версии 0.10.1.0 и Spring-Kafka версии 2.2.4.
Я создаю сообщение в теме "topic_X", которое должно быть использовано двумя разными группами потребителей "consumerA" и "consumerB", по одному в каждой.
Допустим, «потребительB» выключен. Я создал 100 сообщений для «topic_X», а «потребитель А» уже запущен и уничтожил их все. Когда я снова поднимаю "consumberB", смещение устанавливается равным 100, а не с 0.
Я попытался, установив авто-смещение-сброс: самое раннее, но оно все еще не работает. Ниже приведены журналы, которые я получил с консоли.
Когда я запускаю "consumberB", я не хочу сбрасывать смещение, как это можно сделать?
2019-05-15 11:28:44.309 INFO 16152 --- [eted-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=consumer_group] (Re-)joining group
2019-05-15 11:28:44.536 INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=consumer_group] Successfully joined group with generation 25
2019-05-15 11:28:44.539 INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=consumer_group] Setting newly assigned partitions [topic_x-0]
2019-05-15 11:28:44.788 INFO 16152 --- [ment-Data-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [topic_x-0]
2019-05-15 11:28:45.906 INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=consumer_group] Fetch offset 5411 is out of range for partition topic_x-0, resetting offset
2019-05-15 11:28:46.187 INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-2, groupId=consumer_group] Resetting offset for partition topic_x-0 to offset 5651.
2019-05-15 11:28:47.864 INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=consumer_group] Attempt to heartbeat failed since group is rebalancing
2019-05-15 11:28:48.142 INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=consumer_group] Revoking previously assigned partitions [topic_x-0]
2019-05-15 11:28:48.142 INFO 16152 --- [ment-Data-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [topic_x-0]
2019-05-15 11:28:48.142 INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=consumer_group] (Re-)joining group
2019-05-15 11:28:48.976 INFO 16152 --- [eted-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=consumer_group] Successfully joined group with generation 26
2019-05-15 11:28:48.976 INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=consumer_group] Successfully joined group with generation 26
2019-05-15 11:28:48.976 INFO 16152 --- [ment-Data-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=consumer_group] Setting newly assigned partitions [topic_x-0]
2019-05-15 11:28:48.976 INFO 16152 --- [eted-Data-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=consumer_group] Setting newly assigned partitions [topic_y-0]
2019-05-15 11:28:49.246 INFO 16152 --- [ment-Data-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [topic_x-0]
Ниже приведен код, который я использую для создания потребителя. Другой потребитель из группы «customerA» уже работает в другом приложении. Когда я запускаю своего потребителя приложения в группе "customerB", смещение сбрасывается.
@KafkaListener(id="client-1", topics= "topic_x", groupId = "consumerB")
public void completed(final byte[] bytes) throws IOException {
//Handler code
}