Как установить для groupId значение null в @KafkaListeners - PullRequest
1 голос
/ 13 мая 2019

В связи с: этот вопрос

Я пытаюсь прочитать компактную тему через @KafkaListener.Я бы хотел, чтобы каждый потребитель каждый раз читал всю тему.

Я не могу сгенерировать уникальный идентификатор группы для каждого потребителя.Поэтому я хотел использовать нулевой идентификатор группы.

Я пытался настроить контейнер и потребителя для установки значения groupId на ноль, но ни один из них не работал.

Вот моя конфигурация контейнера:

 ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        // Set ackMode to manual and never commit, we are reading from the beginning each time
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.getContainerProperties().setAckOnError(false);
        // Remove groupId, we are consuming all partitions here
        factory.getContainerProperties().setGroupId(null);
        // Enable idle event in order to detect when init phase is over
        factory.getContainerProperties().setIdleEventInterval(1000L);

Также попытался принудительно настроить конфигурацию потребителя:

Map<String, Object> consumerProperties = sprinfKafkaProperties.buildConsumerProperties();
        // Override group id property to force "null"
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, null);
        ConsumerFactory<Object, Object> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);

Когда я устанавливаю значение параметра groupId контейнера равным нулю, используется значение по умолчанию с идентификатором прослушивателя.

Когда я принудительно обращаюсь к потребителю с нулевым свойством groupId, у меня появляется ошибка: не найден group.id в конфигурации потребителя, свойствах контейнера или аннотации @KafkaListener;group.id требуется, когда используется групповое управление.

1 Ответ

1 голос
/ 13 мая 2019

Вы не можете использовать нуль group.id.

Из документации kafka .

group.id

Aуникальная строка, которая идентифицирует группу потребителей, к которой принадлежит этот потребитель.Это свойство обязательно , если потребитель использует функциональность управления группами с помощью подписки (тема) или стратегии управления смещениями на основе Kafka.

Если вы хотите читать изначиная каждый раз, вы можете либо добавить ConsumerAwareRebalanceListener к фабрике контейнеров, либо заставить слушателя реализовать ConsumerSeekAware.

В любом случае, когда вызывается onPartitionsAssigned, ищите каждую тему / раздел в начале.

Я не могу сгенерировать уникальный groupId для каждого потребителя.

Вы можете использовать выражение SpEL для генерации UUID.

РЕДАКТИРОВАТЬ

Вы можете вручную назначать темы / разделы, и group.id может быть нулевым.

@SpringBootApplication
public class So56114299Application {

    public static void main(String[] args) {
        SpringApplication.run(So56114299Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56114299", 10, (short) 0);
    }

    @KafkaListener(topicPartitions = @TopicPartition(topic = "so56114299",
                          partitions = "#{@finder.partitions('so56114299')}"))
    public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
        System.out.println(key + ":" + payload);
    }

    @Bean
    public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {
        return new PartitionFinder(consumerFactory);
    }

    public static class PartitionFinder {

        public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
            this.consumerFactory = consumerFactory;
        }

        private final ConsumerFactory<String, String> consumerFactory;

        public String[] partitions(String topic) {
            try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
                return consumer.partitionsFor(topic).stream()
                    .map(pi -> "" + pi.partition())
                    .toArray(String[]::new);
            }
        }

    }

}

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=manual
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...