Контекст
Мы используем Kafka для обработки больших сообщений, очень редко до 10 МБ, но в основном в диапазоне 500 КБ.Обработка сообщения может занять до 30 секунд, а иногда и минуты.
Проблема
Обработка данных с меньшим количеством потребителей (дооколо 50) не вызывает повторной балансировки брокером, и обработка работает нормально.Любая повторная балансировка в этом масштабе также довольно быстрая, по данным журналов брокера, обычно менее минуты.
Как только потребители масштабируются до 100 или 200, потребители постоянно перебалансируются с интервалами примерно до5 минут.Это приводит к 5 минутам работы / потребления, за которыми следует 5 минут перебалансировки и затем снова то же самоеПотребительские услуги не терпят неудачу, просто перебалансировать без видимой причины.Это приводит к снижению пропускной способности при увеличении количества потребителей.
При масштабировании до двух потребителей, обработка выполняется со средней скоростью 2 сообщения в минуту на одного потребителя.Скорость обработки для отдельного потребителя, когда он не перебалансирован, составляет около 6 сообщений в минуту.
Я не подозреваю, что сеть центров обработки данных будет проблемой, поскольку у нас есть некоторые потребители, выполняющие разныесвоего рода обработка сообщений, и у них нет проблем с передачей от 100 до 1000 сообщений в минуту.
Кто-то еще испытал этот шаблон и нашел простое решение, например, изменение определенного параметра конфигурации?
Дополнительная информация
Брокеры Kafka версии 2.0, и их 10 в разных дата-центрах.Для репликации установлено значение 3. Разделов для этого раздела - 500. Выдержка из конкретной конфигурации broker , которая лучше подходит для обработки больших сообщений:
- compress.type = lz4
- message.max.bytes = 10000000 # 10 МБ
- replica.fetch.max.bytes = 10000000 # 10 МБ
- group.max.session.timeout.ms =1320000 # 22 мин
- offset.retention.minutes = 10080 # 7 дней
На стороне потребитель мы используем клиент Java с слушателем перебалансаэто очищает любые буферизованные сообщения от отозванных разделов.Этот буфер имеет размер 10 сообщений.Потребительские клиенты используют клиентский API версии 2.1, обновление java-клиента с версии 2.0 до 2.1, по-видимому, значительно сокращает журналы брокеров следующего типа по этим большим числам потребителей (мы получили их почти для каждого клиента и каждого повторного баланса до этого):
INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
Потребители находятся в другом центре обработки данных, чем брокеры.Фиксация смещений выполняется асинхронно.Периодический опрос выполняется в потоке, который заполняет буфер с тайм-аутом 15 секунд;когда буфер заполнен, поток спит несколько секунд и опрашивает только когда в буфере есть свободное место.Отрывок конфигурации для случая использования больших сообщений:
- max.partition.fetch.bytes.config = 200000000 # 200 МБ
- max.poll.records.config =2
- session.timeout.ms.config = 1200000 # 20 мин
Файл журнала
Ниже приводится выдержка из брокерафайл журнала, который управляет этой конкретной группой в течение 30 минут.Имена сокращены до my_group и mytopic.Есть также несколько записей из не связанной темы.
19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)
Большое спасибо за любую помощь в этом вопросе.