Мне нужно отловить исключения в случае асинхронной отправки в Кафку.Api-производитель Kafka поставляется с функцией fuction send (запись ProducerRecord, обратный вызов Callback).Но когда я проверил это по следующим двум сценариям:
- Kafka Broker Down
- Тема не создана заранее. Обратные вызовы не вызывают.Скорее я получаю предупреждение в коде для неудачной отправки (как показано ниже).
Вопросы:
Изображение предупреждения Kafka
Примечание: Я также использую настройку linger.ms 25 секунд для пакетной отправки моих записей.
public class ProducerDemo {
static KafkaProducer<String, String> producer;
public static void main(String[] args) throws IOException {
final Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");
producer = new KafkaProducer<String, String>(properties);
String topic = "first_topic";
for (int i = 0; i < 5; i++) {
String value = "hello world " + Integer.toString(i);
String key = "id_" + Integer.toString(i);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//execute everytime a record is successfully sent or exception is thrown
if(e == null){
// No Exception
}else{
//Exception Handling
}
}
});
}
producer.close();
}