Spring-Kafka: изменение баланса происходит при использовании паузы / возобновления потребителя, что не должно соответствовать документации - PullRequest
0 голосов
/ 19 февраля 2019

Spring-Kafka: При приостановке / возобновлении работы потребителя с использованием метода паузы / возобновления согласно документации, перебалансирование не должно происходить, когда используется автоматическое назначение, но оно не работает, происходит перебалансировка.Как приостановить / возобновить работу потребителя и сохранить опрос после периода без перебалансировки?

Вариант использования: Потребитель должен сделать паузу на период и продолжить опрос, чтобы дать сердцебиение и возобновить работу после истечения времени, но Кафка не должна перебалансировать, пока потребительпауза.

            System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] stopped consumption.");
            consumer.pause(Collections.singleton(topicPartition));                    
            try {
                    Thread.sleep(60000);
                    consumer.resume(Collections.singleton(topicPartition));
                    System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] resumed consumption.");
            } catch (InterruptedException e) {              
                       e.printStackTrace();
            }

Журналы: 2019-02-19 15: 19: 49.173 INFO 82272 --- [rTaskExecutor-1] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-2, groupId = customer] (Повторное присоединение к группе 2019-02-19 15: 19: 49.175 ИНФОРМАЦИЯ 82272 --- [rTaskExecutor-2] oakccinternals.AbstractCoordinator: [Consumer clientId = customer-3, groupId = customer] (Повторное присоединение) к группе 2019-02-19 15: 19: 49.181 INFO 82272 --- [rTaskExecutor-3] oakccinternals.AbstractCoordinator: [Consumer clientId = customer-4, groupId = customer] (повторно) присоединяющаяся группа

2019-02-1915: 19: 49.192 INFO 82272 --- [rTaskExecutor-1] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-2, groupId = customer] Успешно присоединился к группе с поколением 581 2019-02-19 15: 19: 49.192 INFO 82272 --- [rTaskExecutor-2] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-3, groupId = customer] Успешно присоединилась группа с поколением 581

2019-02-19 15: 19: 49.194 INFO 82272--- [rTaskExecutor-1] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-2, groupId = customer] Установка новых назначенных разделов [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15: 19: 49.194 ИНФОРМАЦИЯ 82272 --- [rTaskExecutor-2] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-3, groupId = customer] Настройка новых назначенных разделов [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15: 19: 49.218 INFO 82272 --- [rTaskExecutor-2] osklKafkaMessageListenerContainer: назначенные разделы: [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15: 19: 49.219 INFO 82272 --- [rTaskExecutor-1] osklKafkaMessageListenerContainer: назначенные разделы: [spring-kafka-topic-2, весна-кафка-тема-0, spring-kafka-topic-1] 2019-02-19 15: 19: 49.223 INFO 82272 --- [main] osbwembedded.tomcat.TomcatWebServer: Tomcat запущен на портах: 8080 (http) с контекстным путем ''2019-02-19 15: 19: 49.233 INFO 82272 --- [main] cgsSSpringKafkaSupportApplication: запущено SpringKafkaSupportApplication через 3,43 секунды (JVM работает в течение 3,85) Потребитель [customerTaskExecutor-1] получил сообщение [Customer (name =, phoneNumber = 20)] Потребитель [customerTaskExecutor-2] получил сообщение [Клиент (имя = тест 6, phoneNumber = 6)] Потребитель [customerTaskExecutor-1] Раздел [spring-kafka-topic-2] прекратил потребление.Потребительский [customerTaskExecutor-1] раздел [spring-kafka-topic-1] прекратил потребление.2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-2] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-3, groupId = customer] Попытка сердцебиения не удалась из-за перебалансировки группы 2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-1] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-2, groupId = customer] Попытка сердцебиения не удалась из-за ребалансировки группы 2019-02-19 15: 19: 52.200 INFO 82272--- [rTaskExecutor-1] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-2, groupId = customer] Отмена ранее назначенных разделов [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-2] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-3, groupId = customer] Отмена ранее назначенных разделов [spring-kafka-topic-4, весна-кафка-тема-5, весна-кафка-тема-3]2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-1] osklKafkaMessageListenerContainer: отозванные разделы: [spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1]2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-2] osklKafkaMessageListenerContainer: отозванные разделы: [spring-kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3]2019-02-19 15: 19: 52.200 INFO 82272 --- [rTaskExecutor-1] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-2, groupId = customer] (Re) присоединение к группе 2019-02-19 15:19: 52.200 INFO 82272 --- [rTaskExecutor-2] oakccinternals.AbstractCoordinator: [Consumer clientId = customer-3, groupId = customer] (Re) присоединение к группе 2019-02-19 15: 19: 52.209 INFO 82272 --- [rTaskExecutor-1] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-2, groupId = customer] Успешно присоединенная группа с поколением 582 2019-02-19 15: 19: 52.209 INFO 82272 --- [rTaskExecutor-2] oakccinternals.AbstractCoordinator:[Consumer clientId = потребитель-3, грoupId = customer] Успешно присоединилась группа с поколением 582 2019-02-19 15: 19: 52.209 INFO 82272 --- [rTaskExecutor-3] oakccinternals.AbstractCoordinator: [Consumer clientId = consumer-4, groupId = customer] Успешно присоединилась группа споколение 582 2019-02-19 15: 19: 52.209 ИНФОРМАЦИЯ 82272 --- [rTaskExecutor-3] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-4, groupId = customer] Установка новых назначенных разделов [spring-kafka-topic-4, spring-kafka-topic-5] 2019-02-19 15: 19: 52.210 INFO 82272 --- [rTaskExecutor-1] oakccinternals.ConsumerCoordinator: [Consumer clientId = consumer-2, groupId = customer] Установка новых назначенных разделов [spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15: 19: 52.210 INFO 82272 --- [rTaskExecutor-2] oakccinternals.ConsumerCoordinator: [идентификатор клиента = потребитель-3, идентификатор группы =заказчик] Установка новых назначенных разделов [spring-kafka-topic-2, spring-kafka-topic-3] 2019-02-19 15: 19: 52.211 INFO 82272 --- [rTaskExecutor-3] osklKafkaMessageListenerContainer: назначенные разделы: [spring-kafka-topic-4, spring-kafka-topic-5] 2019-02-19 15: 19: 52.212 INFO 82272 --- [rTaskExecutor-1] osklKafkaMessageListenerContainer: назначенные разделы: [spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15: 19: 52.212 INFO 82272 --- [rTaskExecutor-2] osklKafkaMessageListenerContainer: назначенные разделы: [spring-kafka-topic-2, spring-kafka-topic-3] Потребитель [customerTaskExecutor-3] получил сообщение [Клиент (имя = тест 6, номер телефона = 6)]

Ответы [ 2 ]

0 голосов
/ 10 марта 2019

Просто была такая же «группа» сообщений с потребителями и Spring Kafka.Те же результаты с @KafkaListener и без аннотированной Spring с ConcurrentMessageListenerContainer.Корректировка параметров не работает точно так же, как прямая Java.

Переписана с прямой Java с использованием consumer.poll () и запущена нить с параметрами, настроенными ExecutorService для Гари Рассела, и все работает правильно.Больше не получать эти сообщения и потерянное сердцебиение при перебалансировке.Прямые примеры Java с сайтов Clouderable и Confluent:

http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html

https://docs.confluent.io/current/clients/consumer.html#

0 голосов
/ 19 февраля 2019

Прочитайте документацию Kafka.

Приостановка потребителя просто означает, что последующие poll() s не будут возвращать никаких записей, пока вы не наберете resume(), но вам все равно придется звонить poll() в пределах max.poll.interval.ms вЧтобы предотвратить перебаланс.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...