Продюсер Kafka борется с потерей связи с брокером - PullRequest
0 голосов
/ 12 июля 2020

С конфигурацией производителя, как показано ниже, я создаю производителя Singleton, который используется во всем приложении:

properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.consul1:9092,kafka.consul2:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ACKS_CONFIG, "1");

Я подключен к кластеру Kafka, размещенному на k8s. advertised.listeners брокера настроен так, чтобы возвращать мне IP-адреса, а не имена хостов. Хотя обычно все работает как положено, проблема возникает при перезапуске Kafka, иногда IP-адрес меняется. Поскольку производитель знает только о старом IP-адресе, он продолжает попытки подключиться к этому хосту для отправки сообщений, и ни одно из сообщений go через.

Я заметил, что при сбое отправки возникает исключение org.apache.kafka.common.errors.TimeoutException . В настоящее время сообщения отправляются asyn c:

producer.send(data,
                (RecordMetadata recordMetadata, Exception e) -> {
                    if (e != null) {
                        LOGGER.error("Exception while sending message to kafka", e);
                    }
                });

Как теперь следует обрабатывать Timeoutexception? Учитывая, что производитель является общим для всего приложения, закрытие и воссоздание в обратном вызове звучит неправильно.

1 Ответ

0 голосов
/ 13 июля 2020

Согласно JavaDocs в интерфейсе обратного вызова TimeoutException является повторным исключением, которое может быть обработано путем увеличения числа retries Производителя.

В документации Kafka вы найдете подробную информацию о конфигурации retries:

повторных попыток (по умолчанию 0): установка значения больше нуля приведет к тому, что клиент повторно отправит любую запись, отправка которой не удалась с потенциально временной ошибкой. Обратите внимание, что эта попытка ничем не отличается от повторной отправки записи клиентом после получения ошибки. Разрешение повторных попыток без установки max.in.flight.requests.per.connection на 1 потенциально изменит порядок записей, потому что если две партии отправляются в один раздел, и первая не удается и повторяется, но вторая успешна, тогда записи во второй партии может появиться первым.

...