Я использую Kafka с Spring Listener. Ниже приведен фрагмент кода. В прошлом мы опубликовали более 100 тыс. Сообщений для тестирования topi c, и система, похоже, работает нормально. Но несколько дней назад я изменил groupId потребителя. После этого новый потребитель попытался обработать все сообщение с самого начала, что занимает много времени. Но через какое-то время может быть (10 se c) брокер откроет потребителю. так что в результате нет регистра kafka для прослушивания сообщения.
@KafkaListener(
topicPattern = "test",
groupId = "test",
id = "test",
containerFactory = "testKafkaListenerContainerFactory")
public void consume(@Payload String payload) throws IOException {
}
Конфигурация потребителя Kafka:
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put("security.protocol", "SSL");
Затем я использовал cli для чтения сообщения с помощью следующей команды и наблюдал такое же поведение. Ровно через 10 секунд c потребитель перестанет читать сообщение от kafka.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
Как увеличить время ожидания запроса для клиента kafka или какой-либо другой лучший подход к решению этой проблемы?