Spring для Apache Kafka: поведение KafkaTemplate с асинхронными запросами, пакетированием и максимальным полетом 1 - PullRequest
0 голосов
/ 03 октября 2019

Сценарий / вариант использования: У меня есть приложение Spring Boot, использующее Spring для Kafka для отправки сообщений на темы Kafka. После завершения определенного события (инициируемого запросом http) создается новый поток (через Spring @Async), который вызывает kafkatemplate.send () и имеет обратный вызов для ListenableFuture, который он возвращает. Исходный поток, обработавший запрос http, возвращает ответ вызывающему клиенту и освобождается.

Нормальное поведение: При нормальной загрузке приложения я проверил, что все отдельные сообщения публикуются втема по желанию (записи журнала приложения при успешном или неудачном завершении обратного вызова, а также просмотр сообщения в теме в кластере kafka). Если я отключу всех брокеров kafka на 3-5 минут, а затем снова включу кластер, издатель приложения немедленно восстановит соединение с kafka и продолжит публикацию сообщений.

Поведение проблемы: Однако при выполнении нагрузочного тестирования, если я отключаю всех брокеров kafka на 3-5 минут, а затем снова включаю кластер, издатель приложения Spring продолжает показывать сбои при всех попытках публикации. Это продолжается примерно 7 часов, после чего издатель может снова восстановить связь с kafka (обычно этому предшествует исключение из-за разрыва трубопровода, но не всегда).

Текущие результаты: При проведении нагрузочного испытания в течение ок. Через 10 минут я подключился к приложению с помощью JConsole и проверил метрики продюсера , представленные через kafka.producer. В течение первого ок. 30 секунд большой нагрузки, буфер-доступные байты продолжает уменьшаться, пока не достигнет 0 и останется равным 0. Ожидающие потоки остаются между 6 и 10 (чередуются каждый раз, когда я нажимаю кнопку обновления)и количество доступных буферов остается равным 0 в течение прибл. 6,5 часовПосле этого в буфере «доступные байты» отображается восстановленная первоначально выделенная память, но попытки публикации kafka продолжаются безуспешно в течение прибл. еще за 30 минут до окончательного восстановления желаемого поведения.

Текущая конфигурация источника

request.timeout.ms=3000
max.retry.count=2
max.inflight.requests=1
max.block.ms=10000
retry.backoff.ms=3000

Все остальные свойства используют их значения по умолчанию

Вопросы:

  1. Учитывая мой вариант использования, изменение batch.size или linger.ms окажет какое-либо положительное влияниес точки зрения устранения проблемы, возникающей при большой нагрузке?
  2. Учитывая, что у меня есть отдельные потоки, все они вызывают kafkatemplate.send () с отдельными сообщениями и обратными вызовами, и у меня есть max.in.flight.requests. per.connection установлен в 1, игнорируются ли batch.size и linger.ms за пределами размера каждого сообщения? Насколько я понимаю, в этом сценарии пакетирование фактически не происходит и что каждое сообщение отправляется в виде отдельного запроса.
  3. Учитывая, что для max.block.ms установлено значение 10 секунд, почему буферная память остается используемой длятак долго и почему все сообщения продолжают не публиковаться в течение стольких часов. Насколько я понимаю, через 10 секунд каждая новая попытка публикации должна давать сбой и возвращать обратный вызов сбоя, который, в свою очередь, освобождает связанный поток

Обновление: Чтобы попытаться выяснить использование потока. Я использую один экземпляр производителя, как рекомендовано в JavaDocs. Существуют потоки, такие как https-jsse-nio-22443-exec- *, которые обрабатывают входящие запросы https. Когда запрос поступает, происходит некоторая обработка, и как только вся логика, не связанная с kafka, завершается, вызывается метод в другом классе, украшенном @Async. Этот метод делает вызов kafkatemplate.send (). Ответ клиенту отображается в журналах перед выполнением публикации на kafka (именно так я проверяю его выполнение в отдельном потоке, поскольку служба не ожидает публикации перед возвратом ответа). Существуют потоки task-scheduler- *, которые, по-видимому, обрабатывают обратные вызовы из kafkatemplate.send (). Я предполагаю, что один поток kafka-provider-network обрабатывает все публикации.

1 Ответ

0 голосов
/ 07 октября 2019

Мое приложение выполняло запрос http и отправляло каждое сообщение в таблицу рассылки на платформе базы данных при сбое каждой публикации kafka. Те же потоки, которые были запущены для выполнения публикации в kafka, повторно использовались для этого вызова базы данных. Я переместил логику вызова базы данных в другой класс и украсил ее собственным @Async и пользовательским TaskExecutor. После этого я провел мониторинг JConsole и вижу, что вызовы Kafka, похоже, повторно используют одни и те же 10 потоков (TaskExecutor: core Pool size - 10, QueueCapacity - 0, and MaxPoolSize - 80), а вызовы к службе базы данных теперь используют отдельный пул потоков (TaskExecutor: core Pool size - 10, QueueCapacity - 0, and MaxPoolSize - 80), который последовательнозакрытие и открытие новых тем, но оставаясь на относительно постоянном количестве потоков. С этим новым поведением буфер-доступные-байты остается на исправном постоянном уровне, и издатель kafka приложения успешно восстанавливает соединение, как только брокеры возвращаются в оперативный режим.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...