Как сделать так, чтобы потребитель Kafka читал из раздела определенной темы Spring Boot - PullRequest
0 голосов
/ 18 октября 2019

Я новичок в Kafka и Spring Boot и пытаюсь заставить мое приложение читать из определенного раздела темы.

 @KafkaListener(id = "singleLnr", groupId = "${kafka.consumer.group.id}",containerFactory = "singleFactory", topicPartitions = @TopicPartition(topic = "${kafka.topic.singleAttendance}", partitions = {"0"}))
public void consume2(ConsumerRecord attendanceInfo) {
    System.out.println(attendanceInfo);
}

Один заводской код

@Bean(name = "singleFactory")
public KafkaListenerContainerFactory singleFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, String>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(false);
    factory.setMessageConverter(converter());
    return factory;
}

Это также моя фабричная конфигурация потребителя.

 @Bean(name = "consumerFactory")
public ConsumerFactory<String, Map<String, String>> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapAddress);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroupId);
    return new DefaultKafkaConsumerFactory<>(props);
}

Когда я пытаюсь запустить программу, она выдает ошибку

Ошибка смещения фиксации на разделе single.attendance-0 по смещению 308. Координатору не известен этот элемент.

и предупреждение

не удалось: фиксация не может быть завершена, поскольку группа уже перебалансировала и присвоила разделыдругому участнику. Это означает, что время между последующими вызовами poll () было больше, чем настроенный max.poll.interval.ms, что обычно означает, что цикл опроса тратит слишком много времени на обработку сообщений. Вы можете решить эту проблему, увеличив max.poll.interval.ms или уменьшив максимальный размер пакетов, возвращаемых в poll () с max.poll.records.

Как я могу заставить своего потребителячитать с конкретного раздела? Не могли бы вы дать хотя бы подсказку.

1 Ответ

0 голосов
/ 21 октября 2019

Кафка сам назначает потребителя для каждого раздела. В этой реализации нет необходимости настраивать его в @KafkaListener.

@KafkaListener(id = "singleLnr", groupId = "${kafka.consumer.group.id}",containerFactory = "singleFactory", topics = "${kafka.topic.singleAttendance}")
    public void consume2(ConsumerRecord attendanceInfo) {
        System.out.println(attendanceInfo);
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...