Kafka - Сброс смещения для раздела не работает - PullRequest
0 голосов
/ 19 июня 2020

У меня топи c "апельсины" с 10 перегородками, 2 потребителя в 1 группе потребителей. Я использую Spring Kafka.

Поскольку по какой-то причине мне нужно время от времени перечитывать данные, мне нужно сбрасывать смещения. Мой слушатель реализует ConsumerSeekAware, а в onPartitionsAssigned() я просто вызываю callback#seekToBeginning. Это работает нормально, так как в журнале я вижу сообщения от Kafka Client API (2.3.1), в которых говорится:

Сброс смещения для раздела oranges-X to offset 0. Это нормально для всех разделов.

Однако фактически сбрасывается только последний раздел (9) и время от времени, если мне повезет, и второй (1). Все остальные вообще не сбрасываются.

Что вызывает у меня настоящую головную боль: если я пропущу раздел 9 из списка разделов, которые нужно сбросить, все остальные разделы будут сброшены нормально, и все будет работать должным образом.

Код очень простой:

class ... implements ConsumerSeekAware {
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
...
        callback.seekToBeginning(topicPartition.topic(), topicPartition.partition());

}
...

Логи:

19 Jun 09:56:49.442] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-9 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-8 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-1 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-0 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-3 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-2 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-5 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-4 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-7 to offset 0.
[19 Jun 09:56:49.443] [INFO] [{}] [org.apache.kafka.clients.consumer.internals.SubscriptionState] - [Consumer clientId=orange-0, groupId=avaloq.fints.acpadapter] Resetting offset for partition orange-6 to offset 0.

1 Ответ

0 голосов
/ 24 июня 2020

Я не могу воспроизвести вашу проблему.

Вот мое тестовое приложение Spring Boot:

@SpringBootApplication
public class So62465345Application extends AbstractConsumerSeekAware {


    private static final Logger LOG = LoggerFactory.getLogger(So62465345Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So62465345Application.class, args);
    }

    @KafkaListener(id = "so62465345", topics = "so62465345")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62465345").partitions(10).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> IntStream.range(0, 9).forEach(i -> template.send("so62465345", i, null,
                System.currentTimeMillis() + ":foo:" + i));
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        try {
            Thread.sleep(5000);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        LOG.info("Seeking on assignment");
        callback.seekToBeginning(assignments.keySet());
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        LOG.info("Seeking on idle");
        callback.seekToBeginning(assignments.keySet());
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=30000
spring.kafka.listener.poll-timeout=2000

Я установил точку останова в onIdleContainer и, используя kafka-console-consumer, я вижу, что смещения фактически не сбрасываются до следующего poll().

Seeking to EARLIEST offset of partition so62465345-1 появляется, когда мы выполняем поиск, но Resetting offset for partition so62465345-0 to offset 0 не появляется, пока мы не вызовем poll() снова (а затем смещения фактически сбрасываются).

Итак, я вижу, что поиски не происходят в текущем опросе, который возвращает 0 записей, но следующий опрос начинается с начала.

...