Spring-Kafka: При приостановке / возобновлении работы потребителя с использованием метода паузы / возобновления согласно документации, перебалансирование не должно происходить, когда используется автоматическое назначение, но оно не работает, происходит перебалансировка.Как приостановить / возобновить работу потребителя и сохранить опрос после периода без перебалансировки?
Вариант использования: Потребитель должен сделать паузу на период и продолжить опрос, чтобы дать сердцебиение и возобновить работу после истечения времени, но Кафка не должна перебалансировать, пока потребительпауза.
System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] stopped consumption.");
consumer.pause(Collections.singleton(topicPartition));
try {
Thread.sleep(60000);
consumer.resume(Collections.singleton(topicPartition));
System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] resumed consumption.");
} catch (InterruptedException e) {
e.printStackTrace();
}
Журналы: 2019-02-19 15: 19: 49.173 INFO 82272 --- [rTaskExecutor-1] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-2, groupId = customer] (Повторное присоединение к группе 2019-02-19 15: 19: 49.175 ИНФОРМАЦИЯ 82272 --- [rTaskExecutor-2] oakccinternals.AbstractCoordinator: [Consumer clientId = customer-3, groupId = customer] (Повторное присоединение) к группе 2019-02-19 15: 19: 49.181 INFO 82272 --- [rTaskExecutor-3] oakccinternals.AbstractCoordinator: [Consumer clientId = customer-4, groupId = customer] (повторно) присоединяющаяся группа
2019-02-1915: 19: 49.192 INFO 82272 --- [rTaskExecutor-1] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-2, groupId = customer] Успешно присоединился к группе с поколением 581 2019-02-19 15: 19: 49.192 INFO 82272 --- [rTaskExecutor-2] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-3, groupId = customer] Успешно присоединилась группа с поколением 581
2019-02-19 15: 19: 49.194 INFO 82272--- [rTaskExecutor-1] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-2, groupId = customer] Установка новых назначенных разделов [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15: 19: 49.194 ИНФОРМАЦИЯ 82272 --- [rTaskExecutor-2] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-3, groupId = customer] Настройка новых назначенных разделов [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15: 19: 49.218 INFO 82272 --- [rTaskExecutor-2] osklKafkaMessageListenerContainer: назначенные разделы: [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15: 19: 49.219 INFO 82272 --- [rTaskExecutor-1] osklKafkaMessageListenerContainer: назначенные разделы: [spring-kafka-topic-2, весна-кафка-тема-0, spring-kafka-topic-1] 2019-02-19 15: 19: 49.223 INFO 82272 --- [main] osbwembedded.tomcat.TomcatWebServer: Tomcat запущен на портах: 8080 (http) с контекстным путем ''2019-02-19 15: 19: 49.233 INFO 82272 --- [main] cgsSSpringKafkaSupportApplication: запущено SpringKafkaSupportApplication через 3,43 секунды (JVM работает в течение 3,85) Потребитель [customerTaskExecutor-1] получил сообщение [Customer (name =, phoneNumber = 20)] Потребитель [customerTaskExecutor-2] получил сообщение [Клиент (имя = тест 6, phoneNumber = 6)] Потребитель [customerTaskExecutor-1] Раздел [spring-kafka-topic-2] прекратил потребление.Потребительский [customerTaskExecutor-1] раздел [spring-kafka-topic-1] прекратил потребление.2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-2] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-3, groupId = customer] Попытка сердцебиения не удалась из-за перебалансировки группы 2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-1] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-2, groupId = customer] Попытка сердцебиения не удалась из-за ребалансировки группы 2019-02-19 15: 19: 52.200 INFO 82272--- [rTaskExecutor-1] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-2, groupId = customer] Отмена ранее назначенных разделов [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-2] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-3, groupId = customer] Отмена ранее назначенных разделов [spring-kafka-topic-4, весна-кафка-тема-5, весна-кафка-тема-3]2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-1] osklKafkaMessageListenerContainer: отозванные разделы: [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1]2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-2] osklKafkaMessageListenerContainer: отозванные разделы: [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3]2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-1] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-2, groupId = customer] (Re) присоединение к группе 2019-02-19 15:19: 52.200 INFO 82272 --- [rTaskExecutor-2] oakccinternals.AbstractCoordinator: [Consumer clientId = customer-3, groupId = customer] (Re) присоединение к группе 2019-02-19 15: 19: 52.209 INFO 82272 --- [rTaskExecutor-1] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-2, groupId = customer] Успешно присоединенная группа с поколением 582 2019-02-19 15: 19: 52.209 INFO 82272 --- [rTaskExecutor-2] oakccinternals.AbstractCoordinator:[Consumer clientId = потребитель-3, грoupId = customer] Успешно присоединилась группа с поколением 582 2019-02-19 15: 19: 52.209 INFO 82272 --- [rTaskExecutor-3] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-4, groupId = customer] Успешно присоединилась группа споколение 582 2019-02-19 15: 19: 52.209 ИНФОРМАЦИЯ 82272 --- [rTaskExecutor-3] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-4, groupId = customer] Установка новых назначенных разделов [spring-kafka-topic-4, spring-kafka-topic-5] 2019-02-19 15: 19: 52.210 INFO 82272 --- [rTaskExecutor-1] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-2, groupId = customer] Установка новых назначенных разделов [spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15: 19: 52.210 INFO 82272 --- [rTaskExecutor-2] oakccinternals.ConsumerCoordinator: [идентификатор клиента = потребитель-3, идентификатор группы =заказчик] Установка новых назначенных разделов [spring-kafka-topic-2, spring-kafka-topic-3] 2019-02-19 15: 19: 52.211 INFO 82272 --- [rTaskExecutor-3] osklKafkaMessageListenerContainer: назначенные разделы: [spring-kafka-topic-4, spring-kafka-topic-5] 2019-02-19 15: 19: 52.212 INFO 82272 --- [rTaskExecutor-1] osklKafkaMessageListenerContainer: назначенные разделы: [spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15: 19: 52.212 INFO 82272 --- [rTaskExecutor-2] osklKafkaMessageListenerContainer: назначенные разделы: [spring-kafka-topic-2, spring-kafka-topic-3] Потребитель [customerTaskExecutor-3] получил сообщение [Клиент (имя = тест 6, номер телефона = 6)]