Я новичок в Kafka и Spring Boot и пытаюсь заставить мое приложение читать из определенного раздела темы.
@KafkaListener(id = "singleLnr", groupId = "${kafka.consumer.group.id}",containerFactory = "singleFactory", topicPartitions = @TopicPartition(topic = "${kafka.topic.singleAttendance}", partitions = {"0"}))
public void consume2(ConsumerRecord attendanceInfo) {
System.out.println(attendanceInfo);
}
Один заводской код
@Bean(name = "singleFactory")
public KafkaListenerContainerFactory singleFactory() {
ConcurrentKafkaListenerContainerFactory<String, Map<String, String>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(false);
factory.setMessageConverter(converter());
return factory;
}
Это также моя фабричная конфигурация потребителя.
@Bean(name = "consumerFactory")
public ConsumerFactory<String, Map<String, String>> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapAddress);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroupId);
return new DefaultKafkaConsumerFactory<>(props);
}
Когда я пытаюсь запустить программу, она выдает ошибку
Ошибка смещения фиксации на разделе single.attendance-0 по смещению 308. Координатору не известен этот элемент.
и предупреждение
не удалось: фиксация не может быть завершена, поскольку группа уже перебалансировала и присвоила разделыдругому участнику. Это означает, что время между последующими вызовами poll () было больше, чем настроенный max.poll.interval.ms, что обычно означает, что цикл опроса тратит слишком много времени на обработку сообщений. Вы можете решить эту проблему, увеличив max.poll.interval.ms или уменьшив максимальный размер пакетов, возвращаемых в poll () с max.poll.records.
Как я могу заставить своего потребителячитать с конкретного раздела? Не могли бы вы дать хотя бы подсказку.