Потребитель Kafka читает только после того, как «достаточно» данных было произведено - PullRequest
1 голос
/ 17 мая 2019

Я реализую конечную точку в весенней загрузке, которая при вызове будет делать дамп всех сообщений, находящихся в теме кафки (для тестирования).

Я ожидаю, что когда производитель пишет в тему 'testTopic', а затем потребитель опрашивает, он должен прочитать только что созданное сообщение.

Наблюдаемое мной поведение заключается в том, что потребитель не может использовать созданное сообщение. Кроме того, если производитель создает намного больше сообщений (скажем, 10-15), то потребитель сбросит их все за один раз. С этого момента, если производитель выдает хотя бы одно сообщение, потребитель будет потреблять, как ожидалось.

Интуитивно я подумал, что настройка FETCH_MIN_BYTES_CONFIG может быть как-то связана с этим - возможно, потребитель ждал достаточного количества байтов для записи. Но это уже установлено в 1 байт (по умолчанию) и не объясняет последующие успешные отдельные чтения сообщений.

Затем я подумал, что, возможно, я регистрировал потребителя до создания темы (слишком быстро вызывая регистрирующую конечную точку). Но я подтвердил от kafka-topics.sh, что тема существует до регистрации потребителя.

Я заметил, что если я включаю автокоммит смещений, то поведение иногда такое, как ожидалось, а иногда нет. При ручной коррекции смещений (не показано в коде ниже) поведение очень странное, как описано выше.

Я также знаю, что производитель работает, как ожидалось, подтвердив это, используя kafka-console-consumer.

Также попытался увеличить время ожидания опроса до 1 секунды, но безуспешно.

// Consumer
@Component
public class TestConsumer{
    private KafkaConsumer testConsumer = null;

    public void registerConsumer(final String consumerId) {
        if (consumer == null) {
            Properties props = new Properties();
            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some_address>:<some_port>");
            props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");

            testConsumer = new KafkaConsumer<String, String>(props);
            testConsumer.subscribe(Collections.singletonList("testTopic"));
        }
        else{
            logger.debug("Consumer already registered");
        }
    }

    public Map<String, List<String>> consume() {
        Map<String, List<String>> messages = new HashMap<>();
        if (testConsumer == null){
            logger.error("testConsumer was not instantiated");
            return null;
        }
        ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(100));
        List<String> buffer = new ArrayList<>(); 
        for (ConsumerRecord<String, String> record: records){
            logger.debug(String.format("Consuming %s", record.value()));
            buffer.add(record.value());
        }
        messages.put("data", buffer);
        return messages;
    }
}

Последовательность шагов: 1. запускается приложение весенней загрузки 2. тема кафки создана, могу подтвердить через консоль кафки 3. Я регистрирую производителя и потребителя 4. Производитель производит, и я могу подтвердить это с помощью консоли kafka (другая группа потребителей). 5. Потребитель не потребляет

Я ожидаю, что результат будет следующим:

{
    "data" : ["message1"]
}

Я получаю

{
    "data" : []
}

Есть идеи, почему потребитель не потребляет записи, пока не будет написано предельное количество сообщений?

EDIT_1: Добавлено свойство props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); для потребителя без эффекта.

1 Ответ

0 голосов
/ 17 мая 2019

Поскольку вы вручную вызываете это testConsumer.poll(Duration.ofMillis(100)).Вам нужно постоянно объединяться из темы.Как внутри бесконечного цикла while.Например:

while (true) {
   Map records = consume();
   logger.debug("received records: {}", records);
}

Посмотрите эту ссылку: Потребитель Кафки

...