Kafka Consumer multi tenancy - PullRequest
       5

Kafka Consumer multi tenancy

0 голосов
/ 12 марта 2020

Я новичок в написании потребителя Kafka, у меня есть сценарий на случай, если у меня два потребителя работают под одним и тем же идентификатором группы, и у меня есть два раздела.

Предположим, что;

Потребитель 1 ===> Связан с ====> Разделом 1

Потребитель 2 ===> Связан с ====> Разделом 2

Если мой потребитель-2 не работает, как я могу убедиться, что мой потребитель-1 перечитал все событие, которое снова пришло в раздел 2, я просто наткнулся на кое-что относительно setConsumerRebalanceListener, поэтому я для этого я установил свойство контейнера и для метода onPartitionsAssigned я задаю consumer.seekToBeginning(consumer.assignment())

Это правильно, означает ли эта строка, что мой потребитель-1 будет читать все события из раздела-2, когда потребитель-2 не работает, а раздел-2 переназначен на потребителя?

Я также буду спрашивать, может ли кто-нибудь поделиться хорошими ссылками, где я могу прочитать основы о ConsumerRebalanceListener.

public ConcurrentKafkaListenerContainerFactory<String, MultiTenancyOrgDataMessage> kafkaListenerContainerFactory() {
        LOG.debug("ConcurrentKafkaListenerContainerFactory executing");
        ConcurrentKafkaListenerContainerFactory<String, MultiTenancyOrgDataMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
            @Override
            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                consumer.seekToBeginning(consumer.assignment()); // read topic from beginning on service restart

            }
        });

1 Ответ

0 голосов
/ 12 марта 2020

Для этого и используется фиксация - если потребитель 2 выходит из строя, то все записи, которые он израсходовал, но не зафиксировал, будут получены потребителем 1 после восстановления баланса.

Это одна из причин того, что Kafka поддерживает хотя бы один раз семантику - после перебалансировки потребитель 1 извлечет из последнего зафиксированного смещения и, следовательно, может обрабатывать записи, которые были успешно обработаны потребителем 2, если он умер до фиксации.

Примером того, почему вы можете использовать ConsumerRebalanceListener, является борьба с перебалансировкой - об этом я писал на https://chrisg23.blogspot.com/2020/02/why-is-pausing-kafka-consumer-so.html?m=1

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...