В связи с: этот вопрос
Я пытаюсь прочитать компактную тему через @KafkaListener.Я бы хотел, чтобы каждый потребитель каждый раз читал всю тему.
Я не могу сгенерировать уникальный идентификатор группы для каждого потребителя.Поэтому я хотел использовать нулевой идентификатор группы.
Я пытался настроить контейнер и потребителя для установки значения groupId на ноль, но ни один из них не работал.
Вот моя конфигурация контейнера:
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
// Set ackMode to manual and never commit, we are reading from the beginning each time
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.getContainerProperties().setAckOnError(false);
// Remove groupId, we are consuming all partitions here
factory.getContainerProperties().setGroupId(null);
// Enable idle event in order to detect when init phase is over
factory.getContainerProperties().setIdleEventInterval(1000L);
Также попытался принудительно настроить конфигурацию потребителя:
Map<String, Object> consumerProperties = sprinfKafkaProperties.buildConsumerProperties();
// Override group id property to force "null"
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, null);
ConsumerFactory<Object, Object> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
Когда я устанавливаю значение параметра groupId контейнера равным нулю, используется значение по умолчанию с идентификатором прослушивателя.
Когда я принудительно обращаюсь к потребителю с нулевым свойством groupId, у меня появляется ошибка: не найден group.id в конфигурации потребителя, свойствах контейнера или аннотации @KafkaListener;group.id требуется, когда используется групповое управление.