Я использую Flink для обработки потоковых данных от Kafka. Поток довольно базовый c, потребляет от Kafka, обогащает данные, а затем опускается до FS.
В моем случае количество разделов больше, чем уровень параллелизма Flink. Я заметил, что Flink не потребляет равномерно из всех разделов.
Время от времени в некоторых разделах Kafka создаются лаги. Перезапуск приложения помогает Flink «перебалансировать» потребление, и лаги быстро закрываются. Однако через некоторое время я вижу лаги в других разделах и т. Д.
Увидев такое поведение, я попытался перебалансировать скорость потребления с помощью rebalance (), как предложено в документации Flink:
«Разбиение элементов на циклический перебор, создавая равную нагрузку для каждого раздела. Полезно для оптимизации производительности при наличии перекоса данных.»
dataStream.rebalance ();
Изменить в коде было второстепенным, просто добавьте rebalance () к источнику потока данных. Запуск приложения с rebalance () вызвал очень странное поведение Flink:
Я установил уровень параллелизма на 260 и отправил задание, но по какой-то причине менеджер заданий умножил количество слотов на 4. Глядя На графике плана выполнения я понял, что теперь все данные потребляются 260 ядрами, а затем отправляются в 3 приемника (надеюсь, равномерно). Задания завершились неудачно из-за нехватки ресурсов.
Since I wanted to use 260 cores I tried to submit the job again, this time with a parallelism level of 65 (=260/4).
The job runs fine, but the processing rate is low. In the web UI, I discovered that the total number of slots does not equal available task slots + running tasks. But if I refer to rtbJsonRequest (the job I submitted) as a job with 65 (=260/4) tasks slot, instead of 260 as it's written, it equals.
Long story short, I am trying to find a way to balance the consumption over Kafka partition. According to Flink documentation rebalance() is what I need, but apparently I am using it wrong.
Adding more inputs. There are 520 partitions in the topic and the parallelism level is 260 (each core has 2 partitions).
I can see clearly that few partitions have a very low consumption rate:
введите описание изображения здесь