Я использовал kafka продуцента, версия которого 0.8.2.1, для записи брокеру kafka, версия 1.0.1 асинхронна.мой код похож на приведенный ниже:
KafkaProducer producer = new KafkaProducer(configs);
ProducerRecord producerRecord = new ProducerRecord("topic", "key", "value");
producer.send(producerRecord, new CallBack(){
@override
public void onCompletion(RecordMetadata metadata,
java.lang.Exception exception){
if(metadata != null){
System.out.println(metadata.partition() + "|" + metadata.offset());
}
});
я обнаружил, что смещение раздела, напечатанное в журнале приложения моего производителя по методу onCompletion, было больше, чем смещение брокера kafka, которое запрашивалось командой оболочки "./kafka-run-class.sh kafka.tools.GetOffsetShell ".
мой производитель был настроен, например, с конфигурацией" acks = all "
, например, смещение раздела 0 составляет 30000 в журнале, но 10000 запрошеноКоманда оболочки.
это вызвано проблемой совместимости версий?