Кафка: Несколько экземпляров в одной и той же группе потребителей слушают один и тот же раздел внутри для темы - PullRequest
0 голосов
/ 13 сентября 2018

У меня есть два экземпляра потребителя kafka, настроенных на одну и ту же группу потребителей и прослушивающих раздел 0 в той же теме. Проблема в том, когда я отправляю сообщение в тему. Сообщение используется обоими экземплярами, которые не должны происходить, поскольку они находятся в одной группе. Я использую класс конфигурации Spring Boot для их настройки.

Вот конфигурация:

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return props;
}

Вот слушатель:

@KafkaListener(topicPartitions = {@TopicPartition(topic = "${kafka.topic.orders}", partitions = "0")})
public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {

    log.info("message received at " + orderTopic + "at partition 0");
    processRecord(record, acknowledgment);
}

1 Ответ

0 голосов
/ 13 сентября 2018

Кафка так не работает; когда вы вручную назначаете такие разделы (@TopicPartition), вы явно сообщаете Kafka, что хотите получать сообщения от этого раздела - потребитель assign() присваивает разделы самому себе.

Другими словами, при назначении вручную вы берете на себя ответственность за распределение разделов.

Вам нужно использовать групповое управление и позволить Kafka назначать темы экземплярам.

используйте topics = "...", и Кафка выполнит задание. Если у вас недостаточно тем, экземпляры будут простаивать. Для участия всех экземпляров требуется как минимум столько же разделов, сколько и экземпляров.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...