Равномерное потребление событий с помощью коннектора Flink-Kafka - PullRequest
1 голос
/ 16 июня 2020

Я использую Flink для обработки потоковых данных от Kafka. Поток довольно базовый c, потребляет от Kafka, обогащает данные, а затем опускается до FS.

В моем случае количество разделов больше, чем уровень параллелизма Flink. Я заметил, что Flink не потребляет равномерно из всех разделов.

Время от времени в некоторых разделах Kafka создаются лаги. Перезапуск приложения помогает Flink «перебалансировать» потребление, и лаги быстро закрываются. Однако через некоторое время я вижу лаги в других разделах и т. Д.

Увидев такое поведение, я попытался перебалансировать скорость потребления с помощью rebalance (), как предложено в документации Flink:

«Разбиение элементов на циклический перебор, создавая равную нагрузку для каждого раздела. Полезно для оптимизации производительности при наличии перекоса данных.»

dataStream.rebalance ();

Изменить в коде было второстепенным, просто добавьте rebalance () к источнику потока данных. Запуск приложения с rebalance () вызвал очень странное поведение Flink:

Я установил уровень параллелизма на 260 и отправил задание, но по какой-то причине менеджер заданий умножил количество слотов на 4. Глядя На графике плана выполнения я понял, что теперь все данные потребляются 260 ядрами, а затем отправляются в 3 приемника (надеюсь, равномерно). Задания завершились неудачно из-за нехватки ресурсов.

enter image description here

enter image description here

Since I wanted to use 260 cores I tried to submit the job again, this time with a parallelism level of 65 (=260/4). The job runs fine, but the processing rate is low. In the web UI, I discovered that the total number of slots does not equal available task slots + running tasks. But if I refer to rtbJsonRequest (the job I submitted) as a job with 65 (=260/4) tasks slot, instead of 260 as it's written, it equals.

enter image description here Long story short, I am trying to find a way to balance the consumption over Kafka partition. According to Flink documentation rebalance() is what I need, but apparently I am using it wrong.

Adding more inputs. There are 520 partitions in the topic and the parallelism level is 260 (each core has 2 partitions).

I can see clearly that few partitions have a very low consumption rate: введите описание изображения здесь

Ответы [ 2 ]

1 голос
/ 25 июня 2020

Я обнаружил, что 2 моих диспетчера задач Flink имеют очень низкую скорость обработки по сравнению с другими рабочими. 37 КБ:

введите описание изображения здесь

Это действительно помогло мне понять, что у меня проблема с окружающей средой, а не проблема Flink. В моем случае установка регулятора ЦП и перезагрузка машины решили проблему.

Очень важная вещь, которую я узнал в процессе, по умолчанию Flink не обнаруживает разделы Kafka. Если вы добавляете wi sh, просто добавьте к своим свойствам:

"flink.partition-discovery.interval-millis", "time_interval"

1 голос
/ 16 июня 2020

Вставка перебалансировки после источников не уравновесит сами источники, а скорее уравновесит входные данные для того, что следует, путем вставки кругового перебалансирования сети в граф заданий. Максимум, что это может сделать sh - это выровнять нагрузку на раковины, что не помогает решить вашу проблему.

Сколько всего разделов Kafka вы потребляете? Вы используете topi c или обнаружение разделов? Кажется странным, что перезапуск задания полезен.

...