Весенний опрос Kafka с @KafkaListener и ack-режимом слушателя, установленным как запись - PullRequest
1 голос
/ 03 ноября 2019

Я использую @KafkaListener и ConcurrentKafkaListenerContainerFactory для прослушивания 3 тем kafka, и каждая тема имеет 10 разделов. У меня есть несколько вопросов о том, как это работает.

    ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(30);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }
    @KafkaListener(topics = "topic1", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic2", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }
    @KafkaListener(topics = "topic3", containerFactory="kafkaListenerContainerFactory")
    public void handleMessage(final ConsumerRecord<Object, String> arg0) throws Exception {
    }

мой listener.ackmode имеет значение return , а enable.auto.commit имеет значение false и partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor

1) мое понимание параллелизма таково, поскольку я установил для моего параллелизма (на уровне фабрики) значение 30, и у меня есть в общей сложности 30 разделов (для всех трехтема вместе) для чтения, каждому потоку будет назначен один раздел. Правильно ли мое понимание? как это повлияет, если я снова переопределю параллелизм внутри аннотации @KafkaListener?

2) Когда Spring вызывает метод poll (), он опрашивает все три темы?

3) с момента установкиlistener.ackmode настроен на возврат, будет ли он ждать, пока все записи, которые были возвращены в одном poll (), завершены, прежде чем выдавать следующий poll ()? И что произойдет, если мои записи будут обрабатываться дольше, чем max.poll.interval.ms? Допустим, в одном вызове poll () возвращается 1-100 смещений, и мой код может обработать только 50 до того, как будет достигнуто значение max.poll.interval.ms, в этот раз будет проведен другой опрос, поскольку он уже достиг максимального значения. .interval.ms? если это так, то следующий опрос () вернет записи со смещением 51?

очень ценю ваше время и помощь

1 Ответ

1 голос
/ 04 ноября 2019

мой listener.ackmode возвращается

Нет такого ackmode;так как вы не устанавливаете его на заводе-изготовителе, ваш фактический режим подтверждения - BATCH (по умолчанию). Чтобы использовать запись в режиме подтверждения (если это именно то, что вы имеете в виду), вы должны сконфигурировать свойства фабричного контейнера.

Я понимаю, что такое параллелизм ...

Ваше пониманиеэто неверно;параллелизм не может быть больше количества разделов в теме с наибольшим количеством разделов (если слушатель слушает несколько тем) . Поскольку у вас есть только 10 разделов в каждой теме, ваш фактический параллелизм равен 10.

Переопределение concurrency на слушателе просто отменяет заводские настройки;вам всегда нужно как минимум столько разделов, сколько параллелизма.

Когда Spring вызывает метод poll (), он запрашивает все три темы?

Не с этой конфигурацией;у вас есть 3 одновременных контейнера, каждый с 30 потребителями, слушающими одну тему. У вас 90 потребителей.

Если у вас есть один слушатель для всех 3 тем, опрос вернет записи из всех 3 тем;но у вас все еще может быть 20 незанятых потребителей, в зависимости от того, как присваиватель разделов распределяет разделы - смотрите журналы «Назначенные разделы», чтобы точно определить, как распределены разделы. Циркулярный присваиватель должен распределить их в порядке.

в этот раз будет проводить другой опрос

Spring не контролирует - если вы берете слишком много, поток потребителянаходится в приемнике - получатель не является потокобезопасным, поэтому мы не можем выполнить асинхронный опрос.

Вы должны обработать max.poll.records в пределах max.poll.interval.ms, чтобы Kafka не смог восстановить балансировкуразделы.

Режим подтверждения не имеет значения;Все дело в своевременной обработке результатов опроса.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...