Могут ли сообщения быть потеряны на стороне производителя kafka во время развертывания? - PullRequest
0 голосов
/ 10 июля 2020

Мы сталкиваемся со специфической проблемой и видим, что, когда мы отправляем сообщения Kafka, это иногда не обнаруживается на стороне потребителя. Мы попытались отладить это дальше и включили обратные вызовы onSuccess () и onFailure (). Мы получили эту основную проблему:

org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Чтобы решить эту проблему, мы увеличили количество повторных попыток до 10, и это помогло решить проблему почти полностью.

Однако мы нашли 3 сообщения (каждое по адресу другое время), для которого у нас не было ни обратного вызова onSuccess (), ни onFailure (). Просто потерялась связь, так сказать!

Так вот, это произошло незадолго до того, как приложение было снято для повторного развертывания. Я понимаю из Конфигурация производителя Kafka , размер пакета по умолчанию составляет 16 КБ, и он ожидает, пока пакет будет заполнен, прежде чем фактически отправить сообщение брокеру (я сознательно исключил соображения linger.ms для простоты) .

У меня вопрос, может ли случиться так, что все сообщения в пакете Kafka теряются, когда система принудительно выключается для развертывания? Если да, то как мы можем решить эту проблему?

Пожалуйста, помогите мне здесь, так как это проблема, с которой мы сталкиваемся в производственной среде.

Заранее большое спасибо!

1 Ответ

1 голос
/ 10 июля 2020

Если вы используете пакетную обработку и сервер умирает (kill -9, System.exit() или сбой питания), вы можете потерять сообщения.

Если вы используете Spring Boot и выполняете упорядоченное завершение работы (Ctrl- C) или иным образом закрыть Spring ApplicationContext (например, с помощью ShutDownHook), вы не должны ничего потерять, потому что производитель будет закрыт во время закрытия контекста, заставляя pu sh частичной партии.

Если ожидающие отправки не могут быть выполнены, вы должны увидеть сообщение журнала:

log.info("Proceeding to force close the producer since pending requests could not be completed " +
                "within timeout {} ms.", timeoutMs);

Вы можете увидеть код close () в KafkaProducer.

...