Я использую Spring Framework и Kafka, в котором есть 3 кластера. Я обнаружил, что потребитель не потребляет некоторые сообщения (скажем, 0,01 процента между всеми отправленными сообщениями), поэтому в коде производителя я записываю смещение сообщения, которое возвращает api:
ListenableFuture<SendResult<String, Message>> future = messageTemplate.sendDefault(id, message);
SendResult<String, Message> sendResult = future.get();
String offset = sendResult.getRecordMetadata().offset();
Я использую return offset для запроса темы kafka во всех разделах, но он не находит сообщение (я проверяю другие смещения, связанные с сообщениями, которые использовали потребители, и они находятся в kafka), в чем проблема и как я могу застраховать отправленное сообщение к кафке ??
Я также использовал messageTemplate.flush();
в производителе