Кафка в теме не получила некоторых записей от издателя send () - PullRequest
0 голосов
/ 19 сентября 2019

Java-приложение отправляет или публикует сообщения в кластер Kafka.Код для отправки записей приведен ниже.Поскольку это приложение отправляет огромные объемы записанных сообщений на kafka, его немного сложно отладить.Но я вижу, что некоторые сообщения не были отправлены в kafka, потому что я не вижу некоторых из них в подписчике брокера kafka на ту же тему.Похоже, что в этой трубе есть утечка данных.

Для этого я добавил java.util.concurrent.Future.isDone() метод, чтобы увидеть ответ.И он отвечает как ложный.Так что я запутался, где именно утечка существует.не удается отправить запись на kafka?или в брокере Kafka не удается обработать запись, прежде чем поместить ее в тему?

// setting properties from config xml..
propertiess.put("security.protocol", properties.getProperty("security_protocol"));
properties.put("acks", properties.getProperty("kafka_acks"));

...
...
//Producer initialization
Producer<String, String> producer = new KafkaProducer<>(kprops);
    ...
    ...

    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, key, newValue);
    Future<RecordMetadata> kafkaResponse = producer.send(producerRecord);
    String kafkaSuccessStatus = kafkaResponse.isDone() ? "Sending message to kafka completed" : "Sending message to kafka not completed";
    LOGGER.debug(kafkaSuccessStatus);

Я использую isDone () для проверки успешности события.Это правильный способ сделать?Если нет, то есть ли другой способ получить этот ответ с дополнительной информацией.Поскольку я не вижу каких-либо ошибок или информации в журналах, становится все труднее выяснить, что именно происходит и куда сбрасываются данные.Я использую версию kafka-0.11.

Информация о приложении: Это приложение должно ежедневно обрабатывать очень большие объемы (в миллиардах) записей и публиковать их в брокере Kafka.Это многопоточное приложение, считывающее строки из файла и отправляющее его в kafka.

1 Ответ

0 голосов
/ 20 сентября 2019

Стандартный клиент Kafka упаковывает сообщения перед их отправкой.Он работает как клиентский кеш.Если ваше приложение выключает клиент, не сбрасывая буфер (и не ожидая, пока он действительно не очистится), вы рискуете потерять сообщения.Это цена, которую вы платите за производительность.

Если вы цените надежность больше, чем производительность, вы можете вместо этого использовать устаревшую очередь.

...