Spring Kafka - выбор @TopicPartition инактивирует потребление в группе - PullRequest
2 голосов
/ 05 марта 2020

У меня есть две группы потребителей first и second, подписанные на my-topic topi c с использованием Spring Boot версии 2.1.8 . У меня есть 2 службы, использующие каждую отдельную группу потребителей.

Использование минимальной настройки Я определяю слушателя:

@KafkaListener(topics = "my-topic")
public void consume(@Payload String message) {
    logger.info(String.format("#### -> Consumed message -> %s", message));
}

В списке с использованием kafka-consumer-groups отображается следующее:

root@8d49c1b2c3bf:/# kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups

GROUP TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
first my-topic  2          0               0               0         consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9     consumer-2
first my-topic  0          0               0               0         consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9     consumer-2
first my-topic  1          0               0               0         consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9     consumer-2

GROUP TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
second my-topic  0          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  1          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  2          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2

Я sh потребитель в группе first использует только 0 и 2 раздела, поэтому я пытаюсь добиться этого с помощью слушателя:

@KafkaListener(topicPartitions = {
    @TopicPartition(
        topic = "my-topic",
        partitions = { "0", "2" }
    )}
)
public void consume(@Payload String message) {
    logger.info(String.format("#### -> Consumed message -> %s", message));
}

Почему листинг с использованием той же команды не показывает то же самое, что и выше, за исключением раздела 1 для группы first Также значения CONSUMER-ID и далее являются пустыми, а @KafkaListener вообще получает нет сообщений для группы first (слушатель в группе second работает так же). Как это исправить?

Consumer group 'first' has no active members.

GROUP TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
first my-topic  2          0               0               0               -               -               -
first my-topic  0          0               0               0               -               -               -

GROUP  TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
second my-topic  0          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  1          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2
second my-topic  2          0               0               0        consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8     consumer-2

1 Ответ

1 голос
/ 05 марта 2020

Назначение всех разделов вручную Поскольку вы назначаете потребительский поток для определенных разделов, Кафка будет использовать метод assign() и не будет использовать групповую координацию. Каждый потребитель действует независимо, даже если он разделяет идентификатор группы с другим потребителем

Допустим, вы хотите всегда читать все записи из всех разделов (например, при использовании сжатого topi c для загрузки распределенного кэша). ), может быть полезно вручную назначить разделы и не использовать управление группами Кафки.

Но команда для проверки позиции потребителя будет показывать только позиции потребителей, используя групповую координацию

Проверка позиции потребителя

Иногда полезно увидеть позицию ваших потребителей. У нас есть инструмент, который покажет положение всех потребителей в группе потребителей, а также то, как далеко они находятся за концом журнала . Чтобы запустить этот инструмент в группе потребителей с именем my-group, использующей топи c с именем my-topi c

KafkaConsumer

При ручном назначении разделов не используется групповая координация, поэтому сбои потребителей не приведут к перебалансированию назначенных разделов. Каждый потребитель действует независимо, даже если он разделяет идентификатор группы с другим потребителем. Чтобы избежать конфликтов фиксации смещения, обычно следует убедиться, что groupId уникален для каждого экземпляра потребителя.

И последняя причина, по которой слушатель не потребляет сообщения, заключается в смещении, смещение по умолчанию установлено на самое последнее, и вам необходимо указать позицию смещения, в частности, такую ​​как

@KafkaListener(id = "thing2", topicPartitions =
    { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
      @TopicPartition(topic = "topic2", partitions = "0",
         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })
 public void listen(ConsumerRecord<?, ?> record) {
      ...
 }

Для получения дополнительной информации о смещение

Первый конструктор принимает массив аргументов TopicPartitionOffset, чтобы явно указать контейнеру, какие разделы использовать (используя метод assign () потребителя) и с необязательным начальным смещением. Положительное значение по умолчанию является абсолютным смещением. Отрицательное значение относительно текущего последнего смещения в разделе по умолчанию. Предоставляется конструктор для TopicPartitionOffset, который принимает дополнительный логический аргумент. Если это правда, начальные смещения (положительные или отрицательные) относительно текущей позиции для этого потребителя. Смещения применяются при запуске контейнера.

Примечание: Вы можете указать каждый раздел в атрибуте partitions или partitionOffsets, но не оба сразу.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...