Java-приложение отправляет или публикует сообщения в кластер Kafka.Код для отправки записей приведен ниже.Поскольку это приложение отправляет огромные объемы записанных сообщений на kafka, его немного сложно отладить.Но я вижу, что некоторые сообщения не были отправлены в kafka, потому что я не вижу некоторых из них в подписчике брокера kafka на ту же тему.Похоже, что в этой трубе есть утечка данных.
Для этого я добавил java.util.concurrent.Future.isDone()
метод, чтобы увидеть ответ.И он отвечает как ложный.Так что я запутался, где именно утечка существует.не удается отправить запись на kafka?или в брокере Kafka не удается обработать запись, прежде чем поместить ее в тему?
// setting properties from config xml..
propertiess.put("security.protocol", properties.getProperty("security_protocol"));
properties.put("acks", properties.getProperty("kafka_acks"));
...
...
//Producer initialization
Producer<String, String> producer = new KafkaProducer<>(kprops);
...
...
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, key, newValue);
Future<RecordMetadata> kafkaResponse = producer.send(producerRecord);
String kafkaSuccessStatus = kafkaResponse.isDone() ? "Sending message to kafka completed" : "Sending message to kafka not completed";
LOGGER.debug(kafkaSuccessStatus);
Я использую isDone () для проверки успешности события.Это правильный способ сделать?Если нет, то есть ли другой способ получить этот ответ с дополнительной информацией.Поскольку я не вижу каких-либо ошибок или информации в журналах, становится все труднее выяснить, что именно происходит и куда сбрасываются данные.Я использую версию kafka-0.11.
Информация о приложении: Это приложение должно ежедневно обрабатывать очень большие объемы (в миллиардах) записей и публиковать их в брокере Kafka.Это многопоточное приложение, считывающее строки из файла и отправляющее его в kafka.