Потребитель Кафки нуждается в длительном опросе - PullRequest
1 голос
/ 08 марта 2019

Использование Kafka / Java со следующей конфигурацией

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName());
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT));

У меня есть простой цикл опроса вроде:

consumer.poll(Duration.ofMillis(200));

Я заметил странное поведение.При длительности 0 он не возвращает результата.Локально с длительностью 200 мс я получаю некоторые результаты, но в другой производственной среде он никогда не возвращает результаты, ему нужно как минимум 1 с.

В моем понимании, метод poll будет ждать, пока не найдет хотя бы результат.При нулевой продолжительности он должен по крайней мере возвращать результаты, которые уже получены, он не должен всегда возвращать никакого результата.

Какое объяснение?

1 Ответ

0 голосов
/ 08 марта 2019

Основная проблема в том, что вы, возможно, не сохранили вывод poll().

ConsumerRecords records = consumer.poll(10000);

Вам необходимо сохранить результаты poll в ConsumerRecords. Вы можете найти примеры кода здесь

...