Первые N вызовов на опрос ничего не возвращают, когда я регистрирую потребителя с новым идентификатором группы.
Я хочу проверить, что при вызове службы публикуется событие Kafka.Проблема в том, что всякий раз, когда я меняю groupId, первые N опросов ничего не возвращают.Я понимаю, что Kafka сначала регистрирует потребителя при опросе, но я считаю, что число опросов (время), необходимое для регистрации потребителя, слишком случайное.
Конфигурация потребителя:
Properties props = new Properties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
Шаги:
- Перед каждым тестом я звоню
consumer.poll(Duration.ofSeconds(5))
просто чтобы убедиться, что потребитель зарегистрирован и смещение установлено. - Я вызываю службу и утверждаю в ответе.Если я проверяю Кафку с помощью пользовательского интерфейса, событие публикуется.
- Я звоню
consumer.poll(Duration.ofSeconds(5))
и, надеюсь, получу некоторые записи. Это неудачный шаг .
Есть ли способ убедиться, что второй опрос всегда возвращает запись?Я пытался сделать первый опрос длиться в течение 1 минуты (и я уже думаю, что 5 секунд - это слишком много, чтобы ждать каждого теста), и он все равно будет иногда работать, а иногда нет.
Спасибо.