У меня топи c "апельсины" с 10 перегородками, 2 потребителя в 1 группе потребителей. Я использую Spring Kafka.
Поскольку по какой-то причине мне нужно время от времени перечитывать данные, мне нужно сбрасывать смещения. Мой слушатель реализует ConsumerSeekAware
, а в onPartitionsAssigned()
я просто вызываю callback#seekToBeginning
. Это работает нормально, так как в журнале я вижу сообщения от Kafka Client API (2.3.1), в которых говорится:
Сброс смещения для раздела oranges-X to offset 0
. Это нормально для всех разделов.
Однако фактически сбрасывается только последний раздел (9) и время от времени, если мне повезет, и второй (1). Все остальные вообще не сбрасываются.
Что вызывает у меня настоящую головную боль: если я пропущу раздел 9 из списка разделов, которые нужно сбросить, все остальные разделы будут сброшены нормально, и все будет работать должным образом.
Код очень простой:
class ... implements ConsumerSeekAware {
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
...
callback.seekToBeginning(topicPartition.topic(), topicPartition.partition());
}
...
Логи:
19 Jun 09:56:49.442] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-9 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-8 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-1 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-0 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-3 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-2 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-5 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-4 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-7 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-6 to offset 0.