Spring Kafka - Когда именно Consumer.poll () вызывается из-за колпака? - PullRequest
0 голосов
/ 09 мая 2018

У меня есть приложение с весенней загрузкой, в котором у меня есть один Kafka Consumer.

Я использую DefaultKafkaConsumerFactory со стандартными конфигурациями потребителей. У меня есть ConcurrentListenerContainerFactory с параллелизмом, равным 1, и у меня есть метод, аннотированный @ KafkaListener.

Я слушаю тему с 3 разделами, и у меня есть 3 таких потребителя, каждый из которых развернут в разных приложениях. Следовательно, каждый потребитель слушает один раздел.

Допустим, опрос у потребителя вызван из-под колпака и получено 40 записей. Затем каждая запись предоставляется методу, аннотированному @KafkaListener последовательно, то есть предоставляется запись 1, дождитесь окончания обработки метода, предоставлена ​​запись 2, дождитесь окончания обработки метода и так далее. Происходит ли указанное выше или для каждой полученной записи создается отдельный поток, и вызов метода происходит в отдельном потоке, поэтому основной поток не блокируется и может опрашивать записи быстрее.

Мне также хотелось бы получить больше ясности о том, что такое контейнер прослушивателя сообщений и возможный прослушиватель сообщений.

Спасибо заранее.

Ответы [ 2 ]

0 голосов
/ 09 мая 2018

Ну, это точно позиция Apache Kafka - гарантируйте обработку заказов из одного и того же раздела в одном потоке. Поэтому, когда вы распределяете свою тему с 3 разделами между 3 экземплярами, каждый из них получает свой собственный раздел и выполняет опрос в одном потоке.

KafkaMessageListenerContainer - управляемая событиями, самоконтролирующая оболочка вокруг KafkaConsumer. Он действительно вызывает poll() в цикле while (isRunning()) {, который запланирован в TaskExecutor:

this.listenerConsumerFuture = containerProperties
            .getConsumerTaskExecutor()
            .submitListenable(this.listenerConsumer);

И он обрабатывает ConsumerRecords вызов слушателя:

private void invokeListener(final ConsumerRecords<K, V> records) {
        if (this.isBatchListener) {
            invokeBatchListener(records);
        }
        else {
            invokeRecordListener(records);
        }
    }
0 голосов
/ 09 мая 2018

В версии 1.3 и выше для каждого потребителя существует одна нить; следующий poll() выполняется после того, как слушатель обработал последнее сообщение из предыдущего опроса.

В более ранних версиях было два потока, и второй (и, возможно, третий) опрос был выполнен, пока поток слушателя обрабатывает первый пакет. Это было необходимо, чтобы избежать перебалансировки из-за медленного слушателя. Модель многопоточности была очень сложной, и мы должны были приостановить / возобновить работу потребителя, когда это необходимо. KIP-62 устранил проблему с перебалансировкой, поэтому мы смогли использовать гораздо более простую модель потоков, используемую сегодня.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...