Кафка: Как считать успехи / неудачи с асинхронной отправкой - PullRequest
1 голос
/ 15 октября 2019

Цель: Подсчитать все успехи и неудачи производителя Kafka при отправке async .

Проблема: Иногда общее суммирование неудач и успехов не совпадает с общим количеством записей

Пример кода

//class defined below
Counter counter = new Counter();

//we'll say itr has 10 iterations
while(itr.hasNext()){

 counter.incTotal();     

 //busy work
 ProducerRecord record = new ProducerRecord("myTopic", itr.next());

 //async send
 producer.send(record, new Callback(){
     @Override
     public void synchronized onComplete(RecordMetadata rm, Exception ex) {
        if(ex == null)
          counter.incSuccess();
        else
          counter.incFailure();
     }
 });
}

//this should ensure all async calls have completed
producer.flush();

System.out.println("Total entries " + counter.getTotal() + " : success " + counter.getSuccess() + " failure " + counter.getFailure());

Counter.java

public class Counter {

  private int total = 0;
  private int success = 0;
  private int failure = 0;

  public Counter(){}

  public void incTotal(){
    this.total ++;
  }

  public int getTotal(){
    return total;
  }

  public void incSuccess(){
    this.success ++;
  }

  public int getSuccess(){
    return success;
  }
  public void incFailure(){
    this.failure ++;
  }

  public int getFailure(){
    return failure ;
  }
}

Пример вывода: «Всего записей 10: неудача 3, неудача 5»

...