Назначение всех разделов вручную Поскольку вы назначаете потребительский поток для определенных разделов, Кафка будет использовать метод assign()
и не будет использовать групповую координацию. Каждый потребитель действует независимо, даже если он разделяет идентификатор группы с другим потребителем
Допустим, вы хотите всегда читать все записи из всех разделов (например, при использовании сжатого topi c для загрузки распределенного кэша). ), может быть полезно вручную назначить разделы и не использовать управление группами Кафки.
Но команда для проверки позиции потребителя будет показывать только позиции потребителей, используя групповую координацию
Проверка позиции потребителя
Иногда полезно увидеть позицию ваших потребителей. У нас есть инструмент, который покажет положение всех потребителей в группе потребителей, а также то, как далеко они находятся за концом журнала . Чтобы запустить этот инструмент в группе потребителей с именем my-group, использующей топи c с именем my-topi c
KafkaConsumer
При ручном назначении разделов не используется групповая координация, поэтому сбои потребителей не приведут к перебалансированию назначенных разделов. Каждый потребитель действует независимо, даже если он разделяет идентификатор группы с другим потребителем. Чтобы избежать конфликтов фиксации смещения, обычно следует убедиться, что groupId уникален для каждого экземпляра потребителя.
И последняя причина, по которой слушатель не потребляет сообщения, заключается в смещении, смещение по умолчанию установлено на самое последнее, и вам необходимо указать позицию смещения, в частности, такую как
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
Для получения дополнительной информации о смещение
Первый конструктор принимает массив аргументов TopicPartitionOffset, чтобы явно указать контейнеру, какие разделы использовать (используя метод assign () потребителя) и с необязательным начальным смещением. Положительное значение по умолчанию является абсолютным смещением. Отрицательное значение относительно текущего последнего смещения в разделе по умолчанию. Предоставляется конструктор для TopicPartitionOffset, который принимает дополнительный логический аргумент. Если это правда, начальные смещения (положительные или отрицательные) относительно текущей позиции для этого потребителя. Смещения применяются при запуске контейнера.
Примечание: Вы можете указать каждый раздел в атрибуте partitions или partitionOffsets, но не оба сразу.