Кафка - прекратить повторную попытку на ConnectException - PullRequest
0 голосов
/ 13 сентября 2018

У меня есть производитель кафки, определенный как

public KafkaMessageProducer(String kafkaHost, String kafkaPort, Map<String, String> map) {
        this.kafkaTopic = map;
        final Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", kafkaHost + ":" + kafkaPort);
        producer = new KafkaProducer<String, String>(properties);
    }

И я отправляю сообщение, используя следующий код.(пробовал также использовать обратный вызов).

public void sendMessage(String topic, RestCommonResource resultToken) {

        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode  jsonNode = objectMapper.valueToTree(resultToken);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, jsonNode.toString());
        producer.send(record);

    }

Но если сервер kafka не работает и производитель публикует сообщение, программа застревает в бесконечном цикле со следующим исключением:

WARN  [2018-09-13 06:27:59,589] org.apache.kafka.common.network.Selector: Error in I/O with localhost/127.0.0.1
! java.net.ConnectException: Connection refused: no further information
! at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_80]
! at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744) ~[na:1.7.0_80]
! at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.1.jar:na]
! at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.1.jar:na]
! at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]

Если естьсвойство, которое может быть установлено, чтобы остановить эту повторную попытку и удалить сообщение.

Ответы [ 2 ]

0 голосов
/ 13 сентября 2018

Вам также необходимо включить следующее свойство Producer

props.setProperty(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "10000");

При reconnect.backoff.ms WARN появляется только один раз.

Относительно Кафка документов ,

reconnect.backoff.ms
Базовое количество времени ожидания перед попыткой переподключиться к данному хосту. Это позволяет избежать повторного подключения к Хост в тесной петле. Этот откат применяется ко всем попыткам подключения клиентом брокеру.

0 голосов
/ 13 сентября 2018

В настоящее время, если клиент Kafka теряет соединение с брокерами, он будет ждать reconnect.backoff.ms миллисекунд, прежде чем пытаться восстановить соединение.

Хотя эта стратегия хорошо работает, когда клиент на короткое время отключается, если один брокер или весь кластер на долгое время становятся недоступными, все клиенты быстро создадут множество соединений.

Кроме того, разработчики имеют ограниченный контроль над клиентом, который постоянно теряет связи с кластером.

Я думаю, что эта тема полезна для вас: Добавление пользовательских политик для попыток повторного подключения к NetworkdClient

reconnect.backoff.ms : базовое время ожидания перед попыткой повторного подключения к данному хосту. Это позволяет избежать повторного подключения к узлу в тесной петле. Этот откат применяется ко всем попыткам подключения клиента к брокеру.

reconnect.backoff.max.ms : максимальное время ожидания в миллисекундах при повторном подключении к посреднику, который неоднократно не мог подключиться. Если это предусмотрено, откат на хост будет экспоненциально увеличиваться при каждом последующем сбое соединения, вплоть до этого максимума. После расчета увеличения отсрочки добавляется 20% случайный джиттер, чтобы избежать шторма соединения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...