С конфигурацией производителя, как показано ниже, я создаю производителя Singleton, который используется во всем приложении:
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.consul1:9092,kafka.consul2:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
Я подключен к кластеру Kafka, размещенному на k8s. advertised.listeners
брокера настроен так, чтобы возвращать мне IP-адреса, а не имена хостов. Хотя обычно все работает как положено, проблема возникает при перезапуске Kafka, иногда IP-адрес меняется. Поскольку производитель знает только о старом IP-адресе, он продолжает попытки подключиться к этому хосту для отправки сообщений, и ни одно из сообщений go через.
Я заметил, что при сбое отправки возникает исключение org.apache.kafka.common.errors.TimeoutException
. В настоящее время сообщения отправляются asyn c:
producer.send(data,
(RecordMetadata recordMetadata, Exception e) -> {
if (e != null) {
LOGGER.error("Exception while sending message to kafka", e);
}
});
Как теперь следует обрабатывать Timeoutexception? Учитывая, что производитель является общим для всего приложения, закрытие и воссоздание в обратном вызове звучит неправильно.