Производитель Kafka: Обработка исключений в асинхронной отправке с обратным вызовом - PullRequest
1 голос
/ 17 мая 2019

Мне нужно отловить исключения в случае асинхронной отправки в Кафку.Api-производитель Kafka поставляется с функцией fuction send (запись ProducerRecord, обратный вызов Callback).Но когда я проверил это по следующим двум сценариям:

  • Kafka Broker Down
  • Тема не создана заранее. Обратные вызовы не вызывают.Скорее я получаю предупреждение в коде для неудачной отправки (как показано ниже).

Вопросы:

  • Так что обратные вызовы вызываются только для определенных исключений?

  • Когда клиент Kafka пытается подключиться к брокеру Kafka при асинхронной отправке: при каждой отправке пакета или периодически?

Изображение предупреждения Kafka

Примечание: Я также использую настройку linger.ms 25 секунд для пакетной отправки моих записей.


public class ProducerDemo {

    static KafkaProducer<String, String> producer;

    public static void main(String[] args) throws IOException {

         final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");

        producer = new KafkaProducer<String, String>(properties);
        String topic = "first_topic";

        for (int i = 0; i < 5; i++) {
            String value = "hello world " + Integer.toString(i);
            String key = "id_" + Integer.toString(i);

            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);

              producer.send(record, new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        //execute everytime a record is successfully sent or exception is thrown
                        if(e == null){
                           // No Exception
                        }else{
                            //Exception Handling
                        }
                    }
                });
        }
        producer.close();
    }
...