Сценарий / вариант использования: У меня есть приложение Spring Boot, использующее Spring для Kafka для отправки сообщений на темы Kafka. После завершения определенного события (инициируемого запросом http) создается новый поток (через Spring @Async), который вызывает kafkatemplate.send () и имеет обратный вызов для ListenableFuture, который он возвращает. Исходный поток, обработавший запрос http, возвращает ответ вызывающему клиенту и освобождается.
Нормальное поведение: При нормальной загрузке приложения я проверил, что все отдельные сообщения публикуются втема по желанию (записи журнала приложения при успешном или неудачном завершении обратного вызова, а также просмотр сообщения в теме в кластере kafka). Если я отключу всех брокеров kafka на 3-5 минут, а затем снова включу кластер, издатель приложения немедленно восстановит соединение с kafka и продолжит публикацию сообщений.
Поведение проблемы: Однако при выполнении нагрузочного тестирования, если я отключаю всех брокеров kafka на 3-5 минут, а затем снова включаю кластер, издатель приложения Spring продолжает показывать сбои при всех попытках публикации. Это продолжается примерно 7 часов, после чего издатель может снова восстановить связь с kafka (обычно этому предшествует исключение из-за разрыва трубопровода, но не всегда).
Текущие результаты: При проведении нагрузочного испытания в течение ок. Через 10 минут я подключился к приложению с помощью JConsole и проверил метрики продюсера , представленные через kafka.producer. В течение первого ок. 30 секунд большой нагрузки, буфер-доступные байты продолжает уменьшаться, пока не достигнет 0 и останется равным 0. Ожидающие потоки остаются между 6 и 10 (чередуются каждый раз, когда я нажимаю кнопку обновления)и количество доступных буферов остается равным 0 в течение прибл. 6,5 часовПосле этого в буфере «доступные байты» отображается восстановленная первоначально выделенная память, но попытки публикации kafka продолжаются безуспешно в течение прибл. еще за 30 минут до окончательного восстановления желаемого поведения.
Текущая конфигурация источника
request.timeout.ms=3000
max.retry.count=2
max.inflight.requests=1
max.block.ms=10000
retry.backoff.ms=3000
Все остальные свойства используют их значения по умолчанию
Вопросы:
- Учитывая мой вариант использования, изменение batch.size или linger.ms окажет какое-либо положительное влияниес точки зрения устранения проблемы, возникающей при большой нагрузке?
- Учитывая, что у меня есть отдельные потоки, все они вызывают kafkatemplate.send () с отдельными сообщениями и обратными вызовами, и у меня есть max.in.flight.requests. per.connection установлен в 1, игнорируются ли batch.size и linger.ms за пределами размера каждого сообщения? Насколько я понимаю, в этом сценарии пакетирование фактически не происходит и что каждое сообщение отправляется в виде отдельного запроса.
- Учитывая, что для max.block.ms установлено значение 10 секунд, почему буферная память остается используемой длятак долго и почему все сообщения продолжают не публиковаться в течение стольких часов. Насколько я понимаю, через 10 секунд каждая новая попытка публикации должна давать сбой и возвращать обратный вызов сбоя, который, в свою очередь, освобождает связанный поток
Обновление: Чтобы попытаться выяснить использование потока. Я использую один экземпляр производителя, как рекомендовано в JavaDocs. Существуют потоки, такие как https-jsse-nio-22443-exec- *, которые обрабатывают входящие запросы https. Когда запрос поступает, происходит некоторая обработка, и как только вся логика, не связанная с kafka, завершается, вызывается метод в другом классе, украшенном @Async. Этот метод делает вызов kafkatemplate.send (). Ответ клиенту отображается в журналах перед выполнением публикации на kafka (именно так я проверяю его выполнение в отдельном потоке, поскольку служба не ожидает публикации перед возвратом ответа). Существуют потоки task-scheduler- *, которые, по-видимому, обрабатывают обратные вызовы из kafkatemplate.send (). Я предполагаю, что один поток kafka-provider-network обрабатывает все публикации.