Apache Kafka - повторный пересмотр с большим количеством потребителей - PullRequest
0 голосов
/ 24 ноября 2018

Контекст

Мы используем Kafka для обработки больших сообщений, очень редко до 10 МБ, но в основном в диапазоне 500 КБ.Обработка сообщения может занять до 30 секунд, а иногда и минуты.

Проблема

Обработка данных с меньшим количеством потребителей (дооколо 50) не вызывает повторной балансировки брокером, и обработка работает нормально.Любая повторная балансировка в этом масштабе также довольно быстрая, по данным журналов брокера, обычно менее минуты.

Как только потребители масштабируются до 100 или 200, потребители постоянно перебалансируются с интервалами примерно до5 минут.Это приводит к 5 минутам работы / потребления, за которыми следует 5 минут перебалансировки и затем снова то же самоеПотребительские услуги не терпят неудачу, просто перебалансировать без видимой причины.Это приводит к снижению пропускной способности при увеличении количества потребителей.

При масштабировании до двух потребителей, обработка выполняется со средней скоростью 2 сообщения в минуту на одного потребителя.Скорость обработки для отдельного потребителя, когда он не перебалансирован, составляет около 6 сообщений в минуту.

Я не подозреваю, что сеть центров обработки данных будет проблемой, поскольку у нас есть некоторые потребители, выполняющие разныесвоего рода обработка сообщений, и у них нет проблем с передачей от 100 до 1000 сообщений в минуту.

Кто-то еще испытал этот шаблон и нашел простое решение, например, изменение определенного параметра конфигурации?

Дополнительная информация

Брокеры Kafka версии 2.0, и их 10 в разных дата-центрах.Для репликации установлено значение 3. Разделов для этого раздела - 500. Выдержка из конкретной конфигурации broker , которая лучше подходит для обработки больших сообщений:

  • compress.type = lz4
  • message.max.bytes = 10000000 # 10 МБ
  • replica.fetch.max.bytes = 10000000 # 10 МБ
  • group.max.session.timeout.ms =1320000 # 22 мин
  • offset.retention.minutes = 10080 # 7 дней

На стороне потребитель мы используем клиент Java с слушателем перебалансаэто очищает любые буферизованные сообщения от отозванных разделов.Этот буфер имеет размер 10 сообщений.Потребительские клиенты используют клиентский API версии 2.1, обновление java-клиента с версии 2.0 до 2.1, по-видимому, значительно сокращает журналы брокеров следующего типа по этим большим числам потребителей (мы получили их почти для каждого клиента и каждого повторного баланса до этого):

INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

Потребители находятся в другом центре обработки данных, чем брокеры.Фиксация смещений выполняется асинхронно.Периодический опрос выполняется в потоке, который заполняет буфер с тайм-аутом 15 секунд;когда буфер заполнен, поток спит несколько секунд и опрашивает только когда в буфере есть свободное место.Отрывок конфигурации для случая использования больших сообщений:

  • max.partition.fetch.bytes.config = 200000000 # 200 МБ
  • max.poll.records.config =2
  • session.timeout.ms.config = 1200000 # 20 мин

Файл журнала

Ниже приводится выдержка из брокерафайл журнала, который управляет этой конкретной группой в течение 30 минут.Имена сокращены до my_group и mytopic.Есть также несколько записей из не связанной темы.

19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)

Большое спасибо за любую помощь в этом вопросе.

1 Ответ

0 голосов
/ 22 января 2019

После некоторого дальнейшего проектирования и тонкой настройки нам удалось взять проблему под контроль.

Сначала , похоже, что некоторые службы все еще обрабатывались выше предела, и это заставило ихочень редко терпят неудачу.Следующий уход вызвал перебалансирование, после чего последовало соединение примерно через 6-7 минут, что также привело к перебалансировке.Мы еще больше сократили это, оптимизировав наши услуги с точки зрения пропускной способности.

Коэффициент секунда был базовой докерной сетью, которую мы используем для расширения услуг.По умолчанию интервал сердцебиения очень короткий (5 секунд), поэтому любая тяжелая работа и сетевая нагрузка на потребительском узле могут удалить его из роя докеров за очень короткий интервал.Докер отвечает на это прерывание перемещением этих сервисов на другие узлы (повторная балансировка) с последующей повторной балансировкой, когда узел возвращается в оперативный режим.Поскольку службы имеют длительное время запуска 5-7 минут, это приводит к нескольким разам повторной балансировки для каждого из этих событий.

A третий фактор был ошибками в потребляющих службахэто иногда вызывало сбой одного из них, скажем, 1% в час.Это снова приводит к двум перебалансировкам, одному уходу, другому присоединению.

В совокупности эти проблемы в совокупности привели к наблюдаемой проблеме.Последняя версия Kafka также, кажется, выводит больше информации о том, почему служба покидает группу потребителей.Было бы неплохо, если бы Kafka продолжал предоставлять данные пользователям, которые все еще стабильны, я могу добавить запрос об этой функции.Тем не менее, теперь он работает стабильно с приличной производительностью.

...