У меня есть задание по быстрому чтению, которое читает данные из Kafka, выполняет определенные агрегации и записывает результаты в индексы эластичного поиска. Я вижу высокое противодавление на источнике. Высокое противодавление приводит к медленному чтению данных из Kafka, я вижу, что данные помещаются в очередь в сетевом стеке (netstat RecvQ показывает десятки тысяч байтов данных, застрявших в исходных соединениях kafka, данные в конечном итоге читаются), что в свою очередь вызывает данные, которые будут погружены в эластичный поиск после задержки, и эта задержка продолжает увеличиваться.
Источник производит ~ 17 500 записей в минуту, задание Flink назначает метки времени (события) для каждой входящей записи, выполняет 12 различных типов keyBy, помещает события в одно-минутное падающее окно, выполняет операции агрегации в этом потоке окон с ключами. и, наконец, записывает результаты в 12 различных индексов эластичного поиска (каждая запись является вставкой).
Проблема в том, что данные, записываемые вasticsearch, начинают отставать, поэтому результаты панели мониторинга (построенные поверх эластичного поиска) больше не отображаются в реальном времени. Я понимаю, что это происходит из-за увеличения противодавления. Не уверен, как решить эту проблему. Сам кластер представляет собой отдельный кластер на основе виртуальной машины с одним узлом, с 64 ГБ ОЗУ (диспетчер задач настроен на использование 20 ГБ) и 16 виртуальными ЦП. Нет никаких свидетельств (как видно из htop) о том, что процессор или память ограничены. Имеется только один диспетчер задач, и это единственное задание на мгновение в этом кластере.
Я не уверен, связана ли эта проблема с проблемой локальных ресурсов в кластере или из-за медленной записи вasticsearch. Я установил setBulkFlushMaxActions на 1 (как это делается во всех примерах кода, которые я видел где-либо), нужно ли мне также устанавливать setBulkFlushInterval и / или setBulkFlushMaxSizeinMB?
Я прошел https://www.da -platform.com / flink-forward-berlin / resources / улучшая пропускную способность и задержку с flinks-network-stack , но еще не пробовал настраивать параметры, перечисленные на слайде 19, не уверены, какие значения установить для этих параметров.
Наконец, я не думаю, что вижу ту же проблему при запуске одного и того же задания из IntelliJ IDE.
Я собираюсь исключить все агрегации, а затем добавить их один за другим, чтобы посмотреть, существует ли конкретная агрегация, вызывающая эту проблему?
Любые конкретные указатели будут высоко оценены, также попробуйте setBulkFlushInterval и setBulkFlushMaxSizeinMB.
Обновление 1, 01/29/2019
Похоже, что оба узла работают с очень высокой кучей, поэтому GC постоянно работает, пытаясь освободить место в JVM. Будет увеличение физической памяти с 16 до 32 ГБ, а затем перезагрузить узлы. Это должно, надеюсь, решить проблему, узнаем через 24 часа.