Kafka consumer.poll не возвращает никаких записей - PullRequest
2 голосов
/ 02 мая 2019

Первые N вызовов на опрос ничего не возвращают, когда я регистрирую потребителя с новым идентификатором группы.

Я хочу проверить, что при вызове службы публикуется событие Kafka.Проблема в том, что всякий раз, когда я меняю groupId, первые N опросов ничего не возвращают.Я понимаю, что Kafka сначала регистрирует потребителя при опросе, но я считаю, что число опросов (время), необходимое для регистрации потребителя, слишком случайное.

Конфигурация потребителя:

Properties props = new Properties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_URL);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

KafkaConsumer<S, T> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));

Шаги:

  1. Перед каждым тестом я звоню consumer.poll(Duration.ofSeconds(5)) просто чтобы убедиться, что потребитель зарегистрирован и смещение установлено.
  2. Я вызываю службу и утверждаю в ответе.Если я проверяю Кафку с помощью пользовательского интерфейса, событие публикуется.
  3. Я звоню consumer.poll(Duration.ofSeconds(5)) и, надеюсь, получу некоторые записи. Это неудачный шаг .

Есть ли способ убедиться, что второй опрос всегда возвращает запись?Я пытался сделать первый опрос длиться в течение 1 минуты (и я уже думаю, что 5 секунд - это слишком много, чтобы ждать каждого теста), и он все равно будет иногда работать, а иногда нет.

Спасибо.

1 Ответ

1 голос
/ 02 мая 2019

Причина, по которой он не работает с вашим «новым groupId», заключается в том, что вы находитесь в «последнем» режиме.

Значение по умолчанию - «последний», вам нужно либо быть в «самом раннем» режиме, либоПервый опрос с вашим «новым groupId» или смещение фиксации для этого «нового groupId» для этой темы.

Вам необходимо зарегистрировать «groupId» для темы, а не для потребителя.

...