Кафка: потреблять все сообщения по требованию - PullRequest
0 голосов
/ 13 декабря 2018

Цель: прочитать все сообщения из темы, а затем прекратить процесс.

Я могу непрерывно читать сообщения со следующими данными:

props.put("bootstrap.servers", kafkaBootstrapSrv);
props.put("group.id", group_id);
props.put("max.poll.records", 1); // Only get one record at a time. I understand that to read all messages this will need to be increased
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("MY_TOPIC"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(500);

    for (ConsumerRecord<String, String> record : records) {
        process_record(record);
    }

    consumer.commitSync();
}

Но в этом случае процесс никогда не выполняется.завершается.Когда я избавляюсь от цикла

while (true)

и запускаю программу, она не берет запись из темы (я бы ожидал одну запись).Почему это так?

1 Ответ

0 голосов
/ 13 декабря 2018

А темы Кафки в основном материализуют бесконечный поток событий.

Так когда же остановиться при потреблении из темы?Как вы узнали, что достигли конца?Короткий ответ - нет!Теоретически, продюсер всегда может отправить новое сообщение в тему.

На практике есть несколько вещей, которые можно сделать, чтобы остановиться в конце, если не добавлено / добавлено несколько новых записей.

Используя endOffsets(), вы можете найти текущее последнее смещение разделов.Как только потребитель достигнет этого смещения для всех разделов, которым он назначен, вы можете прекратить опрос (или обновить его и посмотреть, были ли отправлены новые сообщения).

Вы можете получить текущую позицию в каждом разделе, используяposition() метод.При использовании каждая запись также выставляет свое собственное смещение через offset().Таким образом, вы можете использовать их для отслеживания прогресса в направлении конечных смещений.

Относительно вашего второго вопроса о poll() ничего не возвращая при первом вызове.Ожидается, что в основном poll() заставляет клиента работать и при первом вызове он инициирует соединение с кластером и запускает групповой протокол (который занимает несколько секунд), поэтому маловероятно, что сообщения будут получены до poll() возвращается.

...