Цель: Подсчитать все успехи и неудачи производителя Kafka при отправке async .
Проблема: Иногда общее суммирование неудач и успехов не совпадает с общим количеством записей
Пример кода
//class defined below
Counter counter = new Counter();
//we'll say itr has 10 iterations
while(itr.hasNext()){
counter.incTotal();
//busy work
ProducerRecord record = new ProducerRecord("myTopic", itr.next());
//async send
producer.send(record, new Callback(){
@Override
public void synchronized onComplete(RecordMetadata rm, Exception ex) {
if(ex == null)
counter.incSuccess();
else
counter.incFailure();
}
});
}
//this should ensure all async calls have completed
producer.flush();
System.out.println("Total entries " + counter.getTotal() + " : success " + counter.getSuccess() + " failure " + counter.getFailure());
Counter.java
public class Counter {
private int total = 0;
private int success = 0;
private int failure = 0;
public Counter(){}
public void incTotal(){
this.total ++;
}
public int getTotal(){
return total;
}
public void incSuccess(){
this.success ++;
}
public int getSuccess(){
return success;
}
public void incFailure(){
this.failure ++;
}
public int getFailure(){
return failure ;
}
}
Пример вывода: «Всего записей 10: неудача 3, неудача 5»