Наконец-то разобрался, как это исправить.Публикация здесь на случай, если это будет полезно кому-то еще.
Наш вызов producer.send
определил обратный вызов, который в случае ошибки попытается повторно отправить сообщение, используя того же производителя, чтобыдругая тема.Этот механизм был построен таким образом, чтобы, если бы вызов send привел к ошибке, мы, по крайней мере, сохранили сообщение в теме ошибок, где мы могли бы проверить данные и выяснить, что происходит.
Этот механизм обычно работал хорошо, но всякий раз, когда мы временно теряли брокера (приходилось снимать его для обслуживания / обновления / и т. Д.), Все наши приложения зависали и в конечном итоге уничтожались ELB.
ИзОфициальный Javadoc для KafkaProducer.send()
, я нашел это:
Обратите внимание, что обратные вызовы обычно выполняются в потоке ввода-вывода производителя и поэтому должны быть достаточно быстрыми, или они будут задерживать отправку сообщений отдругие темы.Если вы хотите выполнить блокирующие или вычислительно дорогие обратные вызовы, рекомендуется использовать своего собственного исполнителя в теле обратного вызова для распараллеливания обработки.
https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
producer.send
внутриобратный вызов квалифицируется как дорогой звонок, ИМХО.После некоторой дальнейшей отладки и профилирования выясняется, что все потоки XNIO застряли во время ожидания обратного вызова producer.send()
для устаревших метаданных, я полагаю, с момента, когда брокер присоединился к кластеру.В конечном итоге это привело к тому, что приложение перестало отвечать.
Исправление в нашем случае состояло в том, чтобы просто поставить сообщение об ошибке в ConcurrentLinkedDeque
и обработать эти сообщения отдельным потоком, единственной задачей которого было take
сообщения от deque и попытайтесь отправить их заново.
После развертывания этого изменения во всех наших средах приложение работало отлично при повторных перезапусках и отключениях нескольких брокеров.